Skip to main content

videocall_cli/producers/
encoder_thread.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19use std::{
20    sync::{atomic::AtomicBool, Arc},
21    thread::JoinHandle,
22    time::Instant,
23};
24
25use crate::{
26    producers::camera::{since_the_epoch, transform_video_chunk, CameraPacket, THRESHOLD_MILLIS},
27    video_encoder::VideoEncoderBuilder,
28};
29use protobuf::Message;
30use tokio::sync::mpsc::{Receiver, Sender};
31use tracing::{debug, error};
32
33use super::camera::CameraConfig;
34
35pub fn encoder_thread(
36    mut cam_rx: Receiver<Option<CameraPacket>>,
37    wt_tx: Sender<Vec<u8>>,
38    quit: Arc<AtomicBool>,
39    camera_config: CameraConfig,
40    user_id: String,
41) -> JoinHandle<()> {
42    let width = camera_config.width;
43    let height = camera_config.height;
44    std::thread::spawn(move || {
45        let mut video_encoder =
46            VideoEncoderBuilder::new(camera_config.framerate, camera_config.cpu_used)
47                .set_resolution(width, height)
48                .build()
49                .unwrap();
50        video_encoder
51            .update_bitrate_kbps(camera_config.bitrate_kbps)
52            .unwrap();
53        let mut sequence = 0;
54        // the video encoder only supports I420 format, so whatever the camera gives us, we need to convert it
55        while let Some(data) = cam_rx.blocking_recv() {
56            if quit.load(std::sync::atomic::Ordering::Relaxed) {
57                return;
58            }
59            let CameraPacket {
60                data,
61                _format: _,
62                age,
63            } = data.unwrap();
64
65            // If age older than threshold, throw it away.
66            let image_age = since_the_epoch().as_millis() - age;
67            if image_age > THRESHOLD_MILLIS {
68                debug!("throwing away old image with age {} ms", image_age);
69                continue;
70            }
71            let encoding_time = Instant::now();
72            let frames = match video_encoder.encode(sequence, data.as_slice()) {
73                Ok(frames) => frames,
74                Err(e) => {
75                    error!("Error encoding frame: {:?}", e);
76                    continue;
77                }
78            };
79            sequence += 1;
80            debug!("encoding took {:?}", encoding_time.elapsed());
81            for frame in frames {
82                // Frame size kbit
83                let frame_size = frame.data.len() as f64 / 1000f64;
84                debug!("Frame size: {:.2} kbit", frame_size);
85                let packet_wrapper = transform_video_chunk(&frame, &user_id);
86                let packet_bytes = packet_wrapper.write_to_bytes().unwrap();
87                debug!(
88                    "Queueing VIDEO packet: {} bytes, frame_type: {}, sequence: {}",
89                    packet_bytes.len(),
90                    if frame.key { "key" } else { "delta" },
91                    sequence - 1
92                );
93                if let Err(e) = wt_tx.try_send(packet_bytes) {
94                    error!("Unable to send video packet: {:?}", e);
95                } else {
96                    debug!("Video packet queued successfully");
97                }
98            }
99        }
100    })
101}