1use crate::cli_args::IndexKind;
2use crate::video_encoder::Frame;
3use anyhow::Result;
4use protobuf::Message;
5use std::sync::atomic::AtomicBool;
6use std::sync::Arc;
7use std::thread::JoinHandle;
8use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
9use tokio::sync::mpsc::{self, Sender};
10use tracing::{debug, error, info};
11use videocall_nokhwa::pixel_format::I420Format;
12use videocall_nokhwa::utils::RequestedFormat;
13use videocall_nokhwa::utils::RequestedFormatType;
14use videocall_nokhwa::{
15 utils::{ApiBackend, CameraFormat, CameraIndex, FrameFormat},
16 Camera,
17};
18
19use videocall_types::protos::media_packet::media_packet::MediaType;
20use videocall_types::protos::media_packet::{MediaPacket, VideoMetadata};
21use videocall_types::protos::packet_wrapper::{packet_wrapper::PacketType, PacketWrapper};
22
23use super::encoder_thread::encoder_thread;
24use super::producer::Producer;
25
26pub struct CameraPacket {
27 pub data: Vec<u8>,
28 pub _format: FrameFormat,
29 pub age: u128,
30}
31
32impl CameraPacket {
33 pub fn new(data: Vec<u8>, format: FrameFormat, age: u128) -> CameraPacket {
34 CameraPacket {
35 data,
36 _format: format,
37 age,
38 }
39 }
40}
41
42pub fn transform_video_chunk(frame: &Frame, email: &str) -> PacketWrapper {
43 let frame_type = if frame.key {
44 "key".to_string()
45 } else {
46 "delta".to_string()
47 };
48 let media_packet: MediaPacket = MediaPacket {
49 data: frame.data.to_vec(),
50 frame_type,
51 email: email.to_owned(),
52 media_type: MediaType::VIDEO.into(),
53 timestamp: since_the_epoch().as_micros() as f64,
54 video_metadata: Some(VideoMetadata {
55 sequence: frame.pts as u64,
56 ..Default::default()
57 })
58 .into(),
59 ..Default::default()
60 };
61 let data = media_packet.write_to_bytes().unwrap();
62 PacketWrapper {
63 data,
64 email: media_packet.email,
65 packet_type: PacketType::MEDIA.into(),
66 ..Default::default()
67 }
68}
69
70pub static THRESHOLD_MILLIS: u128 = 1000;
71
72pub fn since_the_epoch() -> Duration {
73 SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
74}
75
76#[derive(Clone, Debug)]
77pub struct CameraConfig {
78 pub width: u32,
79 pub height: u32,
80 pub framerate: u32,
81 pub video_device_index: IndexKind,
82 pub frame_format: FrameFormat,
83 pub bitrate_kbps: u32,
84 pub cpu_used: u8,
85}
86
87pub struct CameraDaemon {
88 config: CameraConfig,
89 user_id: String,
90 cam_rx: Option<mpsc::Receiver<Option<CameraPacket>>>,
91 cam_tx: Arc<mpsc::Sender<Option<CameraPacket>>>,
92 quic_tx: Arc<Sender<Vec<u8>>>,
93 quit: Arc<AtomicBool>,
94 handles: Vec<JoinHandle<()>>,
95}
96
97impl Producer for CameraDaemon {
98 fn start(&mut self) -> Result<()> {
99 self.handles.push(self.camera_thread()?);
100 let encoder = encoder_thread(
101 self.cam_rx.take().unwrap(),
102 self.quic_tx.clone(),
103 self.quit.clone(),
104 self.config.clone(),
105 self.user_id.clone(),
106 );
107 self.handles.push(encoder);
108 Ok(())
109 }
110
111 fn stop(&mut self) -> Result<()> {
112 self.quit.store(true, std::sync::atomic::Ordering::Relaxed);
113 for handle in self.handles.drain(..) {
114 handle.join().unwrap();
115 }
116 Ok(())
117 }
118}
119
120impl CameraDaemon {
121 pub fn from_config(
122 config: CameraConfig,
123 user_id: String,
124 quic_tx: Sender<Vec<u8>>,
125 ) -> CameraDaemon {
126 let (cam_tx, cam_rx) = mpsc::channel(100);
127 CameraDaemon {
128 config,
129 user_id,
130 cam_rx: Some(cam_rx),
131 cam_tx: Arc::new(cam_tx),
132 quit: Arc::new(AtomicBool::new(false)),
133 handles: vec![],
134 quic_tx: Arc::new(quic_tx),
135 }
136 }
137
138 fn camera_thread(&self) -> Result<JoinHandle<()>> {
139 let devices = videocall_nokhwa::query(ApiBackend::Auto)?;
140 for (i, camera_info) in devices.iter().enumerate() {
141 info!("AVAILABLE CAMERA DEVICE INDEX {}: {:?}", i, camera_info);
142 }
143 let cam_tx = self.cam_tx.clone();
144 let width = self.config.width;
145 let height = self.config.height;
146 let framerate = self.config.framerate;
147 let frame_format = self.config.frame_format;
148 let video_device_index = match &self.config.video_device_index {
149 IndexKind::String(s) => CameraIndex::String(s.clone()),
150 IndexKind::Index(i) => CameraIndex::Index(*i),
151 };
152 let quit = self.quit.clone();
153 Ok(std::thread::spawn(move || {
154 debug!("Camera opened... waiting for frames");
155 let mut camera = match Camera::new(
156 video_device_index,
157 RequestedFormat::new::<I420Format>(RequestedFormatType::Exact(
158 CameraFormat::new_from(width, height, frame_format, framerate),
159 )),
160 ) {
161 Ok(camera) => camera,
162 Err(e) => {
163 panic!("{}\n please run 'info --list-formats' to see the available resolutions, frame formats and fps", e)
164 }
165 };
166 let actual_resolution = camera.resolution();
167 camera.open_stream().unwrap();
168 println!("Requested format: {:?}", frame_format);
169 println!("Actual stream format: {:?}", camera.frame_format());
170
171 let mut i420_image_buffer = vec![
173 0u8;
174 buffer_size_i420(actual_resolution.width(), actual_resolution.height())
175 as usize
176 ];
177
178 let frame_time = Duration::from_millis(1000u64 / framerate as u64);
179 let mut last_frame_time = Instant::now();
180 loop {
181 let elapsed = last_frame_time.elapsed();
183 if elapsed < frame_time {
184 continue;
185 }
186 last_frame_time = Instant::now();
187 let frame = camera.frame().unwrap();
188 frame
189 .decode_image_to_buffer::<I420Format>(&mut i420_image_buffer)
190 .unwrap();
191 if quit.load(std::sync::atomic::Ordering::Relaxed) {
193 info!("Quit signal received, exiting frame loop.");
194 return;
195 }
196
197 if let Err(e) = cam_tx.try_send(Some(CameraPacket::new(
199 i420_image_buffer.clone(),
200 frame_format,
201 since_the_epoch().as_millis(),
202 ))) {
203 error!("Error sending image: {}", e);
204 }
205 }
206 }))
207 }
208}
209
210pub fn buffer_size_i420(width: u32, height: u32) -> u32 {
211 width
212 .checked_mul(height)
213 .and_then(|y_size| y_size.checked_add(y_size / 2)) .expect("Buffer size calculation overflowed")
215}