videocall_cli/producers/
encoder_thread.rs

1use std::{
2    sync::{atomic::AtomicBool, Arc},
3    thread::JoinHandle,
4    time::Instant,
5};
6
7use crate::{
8    producers::camera::{since_the_epoch, transform_video_chunk, CameraPacket, THRESHOLD_MILLIS},
9    video_encoder::VideoEncoderBuilder,
10};
11use protobuf::Message;
12use tokio::sync::mpsc::{Receiver, Sender};
13use tracing::{debug, error};
14
15use super::camera::CameraConfig;
16
17pub fn encoder_thread(
18    mut cam_rx: Receiver<Option<CameraPacket>>,
19    quic_tx: Arc<Sender<Vec<u8>>>,
20    quit: Arc<AtomicBool>,
21    camera_config: CameraConfig,
22    user_id: String,
23) -> JoinHandle<()> {
24    let width = camera_config.width;
25    let height = camera_config.height;
26    std::thread::spawn(move || {
27        let mut video_encoder =
28            VideoEncoderBuilder::new(camera_config.framerate, camera_config.cpu_used)
29                .set_resolution(width, height)
30                .build()
31                .unwrap();
32        video_encoder
33            .update_bitrate_kbps(camera_config.bitrate_kbps)
34            .unwrap();
35        let mut sequence = 0;
36        // the video encoder only supports I420 format, so whatever the camera gives us, we need to convert it
37        while let Some(data) = cam_rx.blocking_recv() {
38            if quit.load(std::sync::atomic::Ordering::Relaxed) {
39                return;
40            }
41            let CameraPacket {
42                data,
43                _format: _,
44                age,
45            } = data.unwrap();
46
47            // If age older than threshold, throw it away.
48            let image_age = since_the_epoch().as_millis() - age;
49            if image_age > THRESHOLD_MILLIS {
50                debug!("throwing away old image with age {} ms", image_age);
51                continue;
52            }
53            let encoding_time = Instant::now();
54            let frames = match video_encoder.encode(sequence, data.as_slice()) {
55                Ok(frames) => frames,
56                Err(e) => {
57                    error!("Error encoding frame: {:?}", e);
58                    continue;
59                }
60            };
61            sequence += 1;
62            debug!("encoding took {:?}", encoding_time.elapsed());
63            for frame in frames {
64                // Frame size kbit
65                let frame_size = frame.data.len() as f64 / 1000f64;
66                debug!("Frame size: {:.2} kbit", frame_size);
67                let packet_wrapper = transform_video_chunk(&frame, &user_id);
68                if let Err(e) = quic_tx.try_send(packet_wrapper.write_to_bytes().unwrap()) {
69                    error!("Unable to send packet: {:?}", e);
70                }
71            }
72        }
73    })
74}