videocall_cli/producers/
encoder_thread.rs1use std::{
2 sync::{atomic::AtomicBool, Arc},
3 thread::JoinHandle,
4 time::Instant,
5};
6
7use crate::{
8 producers::camera::{since_the_epoch, transform_video_chunk, CameraPacket, THRESHOLD_MILLIS},
9 video_encoder::VideoEncoderBuilder,
10};
11use protobuf::Message;
12use tokio::sync::mpsc::{Receiver, Sender};
13use tracing::{debug, error};
14
15use super::camera::CameraConfig;
16
17pub fn encoder_thread(
18 mut cam_rx: Receiver<Option<CameraPacket>>,
19 quic_tx: Arc<Sender<Vec<u8>>>,
20 quit: Arc<AtomicBool>,
21 camera_config: CameraConfig,
22 user_id: String,
23) -> JoinHandle<()> {
24 let width = camera_config.width;
25 let height = camera_config.height;
26 std::thread::spawn(move || {
27 let mut video_encoder =
28 VideoEncoderBuilder::new(camera_config.framerate, camera_config.cpu_used)
29 .set_resolution(width, height)
30 .build()
31 .unwrap();
32 video_encoder
33 .update_bitrate_kbps(camera_config.bitrate_kbps)
34 .unwrap();
35 let mut sequence = 0;
36 while let Some(data) = cam_rx.blocking_recv() {
38 if quit.load(std::sync::atomic::Ordering::Relaxed) {
39 return;
40 }
41 let CameraPacket {
42 data,
43 _format: _,
44 age,
45 } = data.unwrap();
46
47 let image_age = since_the_epoch().as_millis() - age;
49 if image_age > THRESHOLD_MILLIS {
50 debug!("throwing away old image with age {} ms", image_age);
51 continue;
52 }
53 let encoding_time = Instant::now();
54 let frames = match video_encoder.encode(sequence, data.as_slice()) {
55 Ok(frames) => frames,
56 Err(e) => {
57 error!("Error encoding frame: {:?}", e);
58 continue;
59 }
60 };
61 sequence += 1;
62 debug!("encoding took {:?}", encoding_time.elapsed());
63 for frame in frames {
64 let frame_size = frame.data.len() as f64 / 1000f64;
66 debug!("Frame size: {:.2} kbit", frame_size);
67 let packet_wrapper = transform_video_chunk(&frame, &user_id);
68 if let Err(e) = quic_tx.try_send(packet_wrapper.write_to_bytes().unwrap()) {
69 error!("Unable to send packet: {:?}", e);
70 }
71 }
72 }
73 })
74}