videocall_cli/producers/
encoder_thread.rs1use std::{
20 sync::{atomic::AtomicBool, Arc},
21 thread::JoinHandle,
22 time::Instant,
23};
24
25use crate::{
26 producers::camera::{since_the_epoch, transform_video_chunk, CameraPacket, THRESHOLD_MILLIS},
27 video_encoder::VideoEncoderBuilder,
28};
29use protobuf::Message;
30use tokio::sync::mpsc::{Receiver, Sender};
31use tracing::{debug, error};
32
33use super::camera::CameraConfig;
34
35pub fn encoder_thread(
36 mut cam_rx: Receiver<Option<CameraPacket>>,
37 wt_tx: Sender<Vec<u8>>,
38 quit: Arc<AtomicBool>,
39 camera_config: CameraConfig,
40 user_id: String,
41) -> JoinHandle<()> {
42 let width = camera_config.width;
43 let height = camera_config.height;
44 std::thread::spawn(move || {
45 let mut video_encoder =
46 VideoEncoderBuilder::new(camera_config.framerate, camera_config.cpu_used)
47 .set_resolution(width, height)
48 .build()
49 .unwrap();
50 video_encoder
51 .update_bitrate_kbps(camera_config.bitrate_kbps)
52 .unwrap();
53 let mut sequence = 0;
54 while let Some(data) = cam_rx.blocking_recv() {
56 if quit.load(std::sync::atomic::Ordering::Relaxed) {
57 return;
58 }
59 let CameraPacket {
60 data,
61 _format: _,
62 age,
63 } = data.unwrap();
64
65 let image_age = since_the_epoch().as_millis() - age;
67 if image_age > THRESHOLD_MILLIS {
68 debug!("throwing away old image with age {} ms", image_age);
69 continue;
70 }
71 let encoding_time = Instant::now();
72 let frames = match video_encoder.encode(sequence, data.as_slice()) {
73 Ok(frames) => frames,
74 Err(e) => {
75 error!("Error encoding frame: {:?}", e);
76 continue;
77 }
78 };
79 sequence += 1;
80 debug!("encoding took {:?}", encoding_time.elapsed());
81 for frame in frames {
82 let frame_size = frame.data.len() as f64 / 1000f64;
84 debug!("Frame size: {:.2} kbit", frame_size);
85 let packet_wrapper = transform_video_chunk(&frame, &user_id);
86 let packet_bytes = packet_wrapper.write_to_bytes().unwrap();
87 debug!(
88 "Queueing VIDEO packet: {} bytes, frame_type: {}, sequence: {}",
89 packet_bytes.len(),
90 if frame.key { "key" } else { "delta" },
91 sequence - 1
92 );
93 if let Err(e) = wt_tx.try_send(packet_bytes) {
94 error!("Unable to send video packet: {:?}", e);
95 } else {
96 debug!("Video packet queued successfully");
97 }
98 }
99 }
100 })
101}