1use crate::cli_args::IndexKind;
20use crate::video_encoder::Frame;
21use anyhow::Result;
22use protobuf::Message;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25use std::thread::JoinHandle;
26use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27use tokio::sync::mpsc::{self, Sender};
28use tracing::{debug, error, info};
29use videocall_nokhwa::pixel_format::I420Format;
30use videocall_nokhwa::utils::RequestedFormat;
31use videocall_nokhwa::utils::RequestedFormatType;
32use videocall_nokhwa::{
33 utils::{ApiBackend, CameraFormat, CameraIndex, FrameFormat},
34 Camera,
35};
36
37use videocall_types::protos::media_packet::media_packet::MediaType;
38use videocall_types::protos::media_packet::{MediaPacket, VideoMetadata};
39use videocall_types::protos::packet_wrapper::{packet_wrapper::PacketType, PacketWrapper};
40
41use super::encoder_thread::encoder_thread;
42use super::producer::Producer;
43
44pub struct CameraPacket {
45 pub data: Vec<u8>,
46 pub _format: FrameFormat,
47 pub age: u128,
48}
49
50impl CameraPacket {
51 pub fn new(data: Vec<u8>, format: FrameFormat, age: u128) -> CameraPacket {
52 CameraPacket {
53 data,
54 _format: format,
55 age,
56 }
57 }
58}
59
60pub fn transform_video_chunk(frame: &Frame, email: &str) -> PacketWrapper {
61 let frame_type = if frame.key {
62 "key".to_string()
63 } else {
64 "delta".to_string()
65 };
66 let media_packet: MediaPacket = MediaPacket {
67 data: frame.data.to_vec(),
68 frame_type,
69 email: email.to_owned(),
70 media_type: MediaType::VIDEO.into(),
71 timestamp: since_the_epoch().as_micros() as f64,
72 video_metadata: Some(VideoMetadata {
73 sequence: frame.pts as u64,
74 ..Default::default()
75 })
76 .into(),
77 ..Default::default()
78 };
79 let data = media_packet.write_to_bytes().unwrap();
80 PacketWrapper {
81 data,
82 email: media_packet.email,
83 packet_type: PacketType::MEDIA.into(),
84 ..Default::default()
85 }
86}
87
88pub static THRESHOLD_MILLIS: u128 = 1000;
89
90pub fn since_the_epoch() -> Duration {
91 SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
92}
93
94#[derive(Clone, Debug)]
95pub struct CameraConfig {
96 pub width: u32,
97 pub height: u32,
98 pub framerate: u32,
99 pub video_device_index: IndexKind,
100 pub frame_format: FrameFormat,
101 pub bitrate_kbps: u32,
102 pub cpu_used: u8,
103}
104
105pub struct CameraDaemon {
106 config: CameraConfig,
107 user_id: String,
108 cam_rx: Option<mpsc::Receiver<Option<CameraPacket>>>,
109 cam_tx: Arc<mpsc::Sender<Option<CameraPacket>>>,
110 quic_tx: Arc<Sender<Vec<u8>>>,
111 quit: Arc<AtomicBool>,
112 handles: Vec<JoinHandle<()>>,
113}
114
115impl Producer for CameraDaemon {
116 fn start(&mut self) -> Result<()> {
117 self.handles.push(self.camera_thread()?);
118 let encoder = encoder_thread(
119 self.cam_rx.take().unwrap(),
120 self.quic_tx.clone(),
121 self.quit.clone(),
122 self.config.clone(),
123 self.user_id.clone(),
124 );
125 self.handles.push(encoder);
126 Ok(())
127 }
128
129 fn stop(&mut self) -> Result<()> {
130 self.quit.store(true, std::sync::atomic::Ordering::Relaxed);
131 for handle in self.handles.drain(..) {
132 handle.join().unwrap();
133 }
134 Ok(())
135 }
136}
137
138impl CameraDaemon {
139 pub fn from_config(
140 config: CameraConfig,
141 user_id: String,
142 quic_tx: Sender<Vec<u8>>,
143 ) -> CameraDaemon {
144 let (cam_tx, cam_rx) = mpsc::channel(100);
145 CameraDaemon {
146 config,
147 user_id,
148 cam_rx: Some(cam_rx),
149 cam_tx: Arc::new(cam_tx),
150 quit: Arc::new(AtomicBool::new(false)),
151 handles: vec![],
152 quic_tx: Arc::new(quic_tx),
153 }
154 }
155
156 fn camera_thread(&self) -> Result<JoinHandle<()>> {
157 let devices = videocall_nokhwa::query(ApiBackend::Auto)?;
158 for (i, camera_info) in devices.iter().enumerate() {
159 info!("AVAILABLE CAMERA DEVICE INDEX {}: {:?}", i, camera_info);
160 }
161 let cam_tx = self.cam_tx.clone();
162 let width = self.config.width;
163 let height = self.config.height;
164 let framerate = self.config.framerate;
165 let frame_format = self.config.frame_format;
166 let video_device_index = match &self.config.video_device_index {
167 IndexKind::String(s) => CameraIndex::String(s.clone()),
168 IndexKind::Index(i) => CameraIndex::Index(*i),
169 };
170 let quit = self.quit.clone();
171 Ok(std::thread::spawn(move || {
172 debug!("Camera opened... waiting for frames");
173 let mut camera = match Camera::new(
174 video_device_index,
175 RequestedFormat::new::<I420Format>(RequestedFormatType::Exact(
176 CameraFormat::new_from(width, height, frame_format, framerate),
177 )),
178 ) {
179 Ok(camera) => camera,
180 Err(e) => {
181 panic!("{}\n please run 'info --list-formats' to see the available resolutions, frame formats and fps", e)
182 }
183 };
184 let actual_resolution = camera.resolution();
185 camera.open_stream().unwrap();
186 println!("Requested format: {:?}", frame_format);
187 println!("Actual stream format: {:?}", camera.frame_format());
188
189 let mut i420_image_buffer = vec![
191 0u8;
192 buffer_size_i420(actual_resolution.width(), actual_resolution.height())
193 as usize
194 ];
195
196 let frame_time = Duration::from_millis(1000u64 / framerate as u64);
197 let mut last_frame_time = Instant::now();
198 loop {
199 let elapsed = last_frame_time.elapsed();
201 if elapsed < frame_time {
202 continue;
203 }
204 last_frame_time = Instant::now();
205 let frame = camera.frame().unwrap();
206 frame
207 .decode_image_to_buffer::<I420Format>(&mut i420_image_buffer)
208 .unwrap();
209 if quit.load(std::sync::atomic::Ordering::Relaxed) {
211 info!("Quit signal received, exiting frame loop.");
212 return;
213 }
214
215 if let Err(e) = cam_tx.try_send(Some(CameraPacket::new(
217 i420_image_buffer.clone(),
218 frame_format,
219 since_the_epoch().as_millis(),
220 ))) {
221 error!("Error sending image: {}", e);
222 }
223 }
224 }))
225 }
226}
227
228pub fn buffer_size_i420(width: u32, height: u32) -> u32 {
229 width
230 .checked_mul(height)
231 .and_then(|y_size| y_size.checked_add(y_size / 2)) .expect("Buffer size calculation overflowed")
233}