1use crate::cli_args::IndexKind;
20use crate::video_encoder::Frame;
21use anyhow::Result;
22use protobuf::Message;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25use std::thread::JoinHandle;
26use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27use tokio::sync::mpsc::{self, Sender};
28use tracing::{debug, error, info};
29use videocall_nokhwa::pixel_format::I420Format;
30use videocall_nokhwa::utils::RequestedFormat;
31use videocall_nokhwa::utils::RequestedFormatType;
32use videocall_nokhwa::{
33 utils::{ApiBackend, CameraFormat, CameraIndex, FrameFormat},
34 Camera,
35};
36
37use videocall_types::protos::media_packet::media_packet::MediaType;
38use videocall_types::protos::media_packet::{MediaPacket, VideoCodec, VideoMetadata};
39use videocall_types::protos::packet_wrapper::{packet_wrapper::PacketType, PacketWrapper};
40
41use super::encoder_thread::encoder_thread;
42use super::producer::Producer;
43
44pub struct CameraPacket {
45 pub data: Vec<u8>,
46 pub _format: FrameFormat,
47 pub age: u128,
48}
49
50impl CameraPacket {
51 pub fn new(data: Vec<u8>, format: FrameFormat, age: u128) -> CameraPacket {
52 CameraPacket {
53 data,
54 _format: format,
55 age,
56 }
57 }
58}
59
60pub fn transform_video_chunk(frame: &Frame, email: &str) -> PacketWrapper {
61 let frame_type = if frame.key {
62 "key".to_string()
63 } else {
64 "delta".to_string()
65 };
66 let media_packet: MediaPacket = MediaPacket {
67 data: frame.data.to_vec(),
68 frame_type,
69 email: email.to_owned(),
70 media_type: MediaType::VIDEO.into(),
71 timestamp: since_the_epoch().as_micros() as f64,
72 video_metadata: Some(VideoMetadata {
73 sequence: frame.pts as u64,
74 codec: VideoCodec::VP9_PROFILE0_LEVEL10_8BIT.into(),
75 ..Default::default()
76 })
77 .into(),
78 ..Default::default()
79 };
80 let data = media_packet.write_to_bytes().unwrap();
81 PacketWrapper {
82 data,
83 email: media_packet.email,
84 packet_type: PacketType::MEDIA.into(),
85 ..Default::default()
86 }
87}
88
89pub static THRESHOLD_MILLIS: u128 = 1000;
90
91pub fn since_the_epoch() -> Duration {
92 SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
93}
94
95#[derive(Clone, Debug)]
96pub struct CameraConfig {
97 pub width: u32,
98 pub height: u32,
99 pub framerate: u32,
100 pub video_device_index: IndexKind,
101 pub frame_format: FrameFormat,
102 pub bitrate_kbps: u32,
103 pub cpu_used: u8,
104}
105
106pub struct CameraDaemon {
107 config: CameraConfig,
108 user_id: String,
109 cam_rx: Option<mpsc::Receiver<Option<CameraPacket>>>,
110 cam_tx: Arc<mpsc::Sender<Option<CameraPacket>>>,
111 wt_tx: Sender<Vec<u8>>,
112 quit: Arc<AtomicBool>,
113 handles: Vec<JoinHandle<()>>,
114}
115
116impl Producer for CameraDaemon {
117 fn start(&mut self) -> Result<()> {
118 self.handles.push(self.camera_thread()?);
119 let encoder = encoder_thread(
120 self.cam_rx.take().unwrap(),
121 self.wt_tx.clone(),
122 self.quit.clone(),
123 self.config.clone(),
124 self.user_id.clone(),
125 );
126 self.handles.push(encoder);
127 Ok(())
128 }
129
130 fn stop(&mut self) -> Result<()> {
131 self.quit.store(true, std::sync::atomic::Ordering::Relaxed);
132 for handle in self.handles.drain(..) {
133 handle.join().unwrap();
134 }
135 Ok(())
136 }
137}
138
139impl CameraDaemon {
140 pub fn from_config(
141 config: CameraConfig,
142 user_id: String,
143 wt_tx: Sender<Vec<u8>>,
144 ) -> CameraDaemon {
145 let (cam_tx, cam_rx) = mpsc::channel(100);
146 CameraDaemon {
147 config,
148 user_id,
149 cam_rx: Some(cam_rx),
150 cam_tx: Arc::new(cam_tx),
151 quit: Arc::new(AtomicBool::new(false)),
152 handles: vec![],
153 wt_tx,
154 }
155 }
156
157 fn camera_thread(&self) -> Result<JoinHandle<()>> {
158 let devices = videocall_nokhwa::query(ApiBackend::Auto)?;
159 for (i, camera_info) in devices.iter().enumerate() {
160 info!("AVAILABLE CAMERA DEVICE INDEX {}: {:?}", i, camera_info);
161 }
162 let cam_tx = self.cam_tx.clone();
163 let width = self.config.width;
164 let height = self.config.height;
165 let framerate = self.config.framerate;
166 let frame_format = self.config.frame_format;
167 let video_device_index = match &self.config.video_device_index {
168 IndexKind::String(s) => CameraIndex::String(s.clone()),
169 IndexKind::Index(i) => CameraIndex::Index(*i),
170 };
171 let quit = self.quit.clone();
172 Ok(std::thread::spawn(move || {
173 debug!("Camera opened... waiting for frames");
174 let mut camera = match Camera::new(
175 video_device_index,
176 RequestedFormat::new::<I420Format>(RequestedFormatType::Exact(
177 CameraFormat::new_from(width, height, frame_format, framerate),
178 )),
179 ) {
180 Ok(camera) => camera,
181 Err(e) => {
182 panic!("{e}\n please run 'info --list-formats' to see the available resolutions, frame formats and fps")
183 }
184 };
185 let actual_resolution = camera.resolution();
186 camera.open_stream().unwrap();
187 println!("Requested format: {frame_format:?}");
188 println!("Actual stream format: {:?}", camera.frame_format());
189
190 let mut i420_image_buffer = vec![
192 0u8;
193 buffer_size_i420(actual_resolution.width(), actual_resolution.height())
194 as usize
195 ];
196
197 let frame_time = Duration::from_millis(1000u64 / framerate as u64);
198 let mut last_frame_time = Instant::now();
199 loop {
200 let elapsed = last_frame_time.elapsed();
202 if elapsed < frame_time {
203 continue;
204 }
205 last_frame_time = Instant::now();
206 let frame = camera.frame().unwrap();
207 frame
208 .decode_image_to_buffer::<I420Format>(&mut i420_image_buffer)
209 .unwrap();
210 if quit.load(std::sync::atomic::Ordering::Relaxed) {
212 info!("Quit signal received, exiting frame loop.");
213 return;
214 }
215
216 if let Err(e) = cam_tx.try_send(Some(CameraPacket::new(
218 i420_image_buffer.clone(),
219 frame_format,
220 since_the_epoch().as_millis(),
221 ))) {
222 error!("Error sending image: {}", e);
223 }
224 }
225 }))
226 }
227}
228
229pub fn buffer_size_i420(width: u32, height: u32) -> u32 {
230 width
231 .checked_mul(height)
232 .and_then(|y_size| y_size.checked_add(y_size / 2)) .expect("Buffer size calculation overflowed")
234}