1use std::fmt;
5use std::io::{self, Read, Write};
6use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9use std::thread::JoinHandle;
10use std::time::Duration;
11
12use bytes::Bytes;
13use cameras::{Frame, PixelFormat};
14use dioxus::LaunchBuilder;
15
16use crate::component::PreviewPort;
17use crate::poison::recover_lock;
18use crate::registry::{FrameSource, Registry};
19
20const PREVIEW_MAGIC: [u8; 4] = *b"CAMS";
21const PREVIEW_VERSION: u8 = 1;
22const PREVIEW_FORMAT_NONE: u8 = 0;
23const PREVIEW_FORMAT_NV12: u8 = 1;
24const PREVIEW_FORMAT_BGRA: u8 = 2;
25const PREVIEW_FORMAT_RGBA: u8 = 3;
26const PREVIEW_HEADER_LEN: usize = 24;
27const SHUTDOWN_KICK_TIMEOUT: Duration = Duration::from_millis(100);
28
29#[derive(Clone)]
57pub struct PreviewServer {
58 pub port: u16,
60 pub(crate) registry: Registry,
61 pub(crate) _listener: Arc<ListenerGuard>,
62}
63
64impl fmt::Debug for PreviewServer {
65 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66 f.debug_struct("PreviewServer")
67 .field("port", &self.port)
68 .finish_non_exhaustive()
69 }
70}
71
72pub fn start_preview_server() -> io::Result<PreviewServer> {
78 let registry = Registry::default();
79 let shutdown = Arc::new(AtomicBool::new(false));
80 let (port, handle) = spawn_listener(Arc::new(registry.clone()), Arc::clone(&shutdown))?;
81 let listener = Arc::new(ListenerGuard {
82 port,
83 shutdown,
84 handle: Mutex::new(Some(handle)),
85 });
86 Ok(PreviewServer {
87 port,
88 registry,
89 _listener: listener,
90 })
91}
92
93pub fn register_with(server: &PreviewServer, launch: LaunchBuilder) -> LaunchBuilder {
109 launch
110 .with_context(server.registry.clone())
111 .with_context(PreviewPort(server.port))
112 .with_context(server.clone())
113}
114
115pub(crate) struct ListenerGuard {
116 pub(crate) port: u16,
117 pub(crate) shutdown: Arc<AtomicBool>,
118 pub(crate) handle: Mutex<Option<JoinHandle<()>>>,
119}
120
121impl Drop for ListenerGuard {
122 fn drop(&mut self) {
123 self.shutdown.store(true, Ordering::Relaxed);
124 let addr: SocketAddr = match format!("127.0.0.1:{}", self.port).parse() {
125 Ok(addr) => addr,
126 Err(_) => return,
127 };
128 if let Ok(stream) = TcpStream::connect_timeout(&addr, SHUTDOWN_KICK_TIMEOUT) {
129 let _ = stream.shutdown(Shutdown::Both);
130 }
131 if let Some(handle) = recover_lock(&self.handle).take() {
132 let _ = handle.join();
133 }
134 }
135}
136
137fn spawn_listener<S: FrameSource>(
138 source: Arc<S>,
139 shutdown: Arc<AtomicBool>,
140) -> io::Result<(u16, JoinHandle<()>)> {
141 let listener = TcpListener::bind("127.0.0.1:0")?;
142 let port = listener.local_addr()?.port();
143 let handle = std::thread::Builder::new()
144 .name("cameras-preview-server".into())
145 .spawn(move || run_listener(listener, source, shutdown))?;
146 Ok((port, handle))
147}
148
149fn run_listener<S: FrameSource>(listener: TcpListener, source: Arc<S>, shutdown: Arc<AtomicBool>) {
150 loop {
151 if shutdown.load(Ordering::Relaxed) {
152 break;
153 }
154 let Ok((stream, _)) = listener.accept() else {
155 break;
156 };
157 if shutdown.load(Ordering::Relaxed) {
158 break;
159 }
160 let source = Arc::clone(&source);
161 let _ = std::thread::Builder::new()
162 .name("cameras-preview-conn".into())
163 .spawn(move || {
164 let _ = stream.set_nodelay(true);
165 let _ = handle_connection(stream, source.as_ref());
166 });
167 }
168}
169
170fn handle_connection<S: FrameSource + ?Sized>(mut stream: TcpStream, source: &S) -> io::Result<()> {
171 let mut request_buf = [0u8; 2048];
172 loop {
173 let n = stream.read(&mut request_buf)?;
174 if n == 0 {
175 return Ok(());
176 }
177 let id = parse_preview_id(&request_buf[..n]);
178 write_response(&mut stream, source, id)?;
179 }
180}
181
182fn parse_preview_id(request_bytes: &[u8]) -> Option<u32> {
183 let text = std::str::from_utf8(request_bytes).ok()?;
184 let path = text.split_whitespace().nth(1)?;
185 let rest = path.strip_prefix("/preview/")?;
186 let id_str = rest.strip_suffix(".bin")?;
187 id_str.parse().ok()
188}
189
190fn write_response<S: FrameSource + ?Sized>(
191 stream: &mut TcpStream,
192 source: &S,
193 id: Option<u32>,
194) -> io::Result<()> {
195 let parts = match id.and_then(|id| source.snapshot(id)) {
196 Some((frame, counter)) => preview_parts(&frame, counter),
197 None => PreviewParts {
198 header: preview_header(PREVIEW_FORMAT_NONE, 0, 0, 0, 0),
199 primary: None,
200 secondary: None,
201 },
202 };
203 let total_body_len = parts.header.len()
204 + parts.primary.as_ref().map(|b| b.len()).unwrap_or(0)
205 + parts.secondary.as_ref().map(|b| b.len()).unwrap_or(0);
206 let http_header = format!(
207 "HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: {}\r\nCache-Control: no-store\r\nAccess-Control-Allow-Origin: *\r\nConnection: keep-alive\r\n\r\n",
208 total_body_len
209 );
210 stream.write_all(http_header.as_bytes())?;
211 stream.write_all(&parts.header)?;
212 if let Some(primary) = &parts.primary {
213 stream.write_all(primary)?;
214 }
215 if let Some(secondary) = &parts.secondary {
216 stream.write_all(secondary)?;
217 }
218 Ok(())
219}
220
221struct PreviewParts {
222 header: Vec<u8>,
223 primary: Option<Bytes>,
224 secondary: Option<Bytes>,
225}
226
227fn preview_parts(frame: &Frame, counter: u32) -> PreviewParts {
228 match frame.pixel_format {
229 PixelFormat::Nv12 => PreviewParts {
230 header: preview_header(
231 PREVIEW_FORMAT_NV12,
232 frame.width,
233 frame.height,
234 frame.stride,
235 counter,
236 ),
237 primary: Some(frame.plane_primary.clone()),
238 secondary: Some(frame.plane_secondary.clone()),
239 },
240 PixelFormat::Bgra8 => {
241 let stride = if frame.stride == 0 {
242 frame.width * 4
243 } else {
244 frame.stride
245 };
246 PreviewParts {
247 header: preview_header(
248 PREVIEW_FORMAT_BGRA,
249 frame.width,
250 frame.height,
251 stride,
252 counter,
253 ),
254 primary: Some(frame.plane_primary.clone()),
255 secondary: None,
256 }
257 }
258 _ => {
259 let Ok(rgba) = cameras::to_rgba8(frame) else {
260 return PreviewParts {
261 header: preview_header(PREVIEW_FORMAT_NONE, 0, 0, 0, counter),
262 primary: None,
263 secondary: None,
264 };
265 };
266 let stride = frame.width * 4;
267 PreviewParts {
268 header: preview_header(
269 PREVIEW_FORMAT_RGBA,
270 frame.width,
271 frame.height,
272 stride,
273 counter,
274 ),
275 primary: Some(Bytes::from(rgba)),
276 secondary: None,
277 }
278 }
279 }
280}
281
282fn preview_header(format: u8, width: u32, height: u32, stride: u32, counter: u32) -> Vec<u8> {
283 let mut header = Vec::with_capacity(PREVIEW_HEADER_LEN);
284 header.extend_from_slice(&PREVIEW_MAGIC);
285 header.push(PREVIEW_VERSION);
286 header.push(format);
287 header.extend_from_slice(&[0u8, 0u8]);
288 header.extend_from_slice(&width.to_le_bytes());
289 header.extend_from_slice(&height.to_le_bytes());
290 header.extend_from_slice(&stride.to_le_bytes());
291 header.extend_from_slice(&counter.to_le_bytes());
292 header
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298
299 #[test]
300 fn parse_id_from_valid_get_request() {
301 let request = b"GET /preview/42.bin HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n";
302 assert_eq!(parse_preview_id(request), Some(42));
303 }
304
305 #[test]
306 fn parse_id_from_zero() {
307 let request = b"GET /preview/0.bin HTTP/1.1\r\n\r\n";
308 assert_eq!(parse_preview_id(request), Some(0));
309 }
310
311 #[test]
312 fn parse_id_rejects_non_numeric() {
313 let request = b"GET /preview/abc.bin HTTP/1.1\r\n\r\n";
314 assert_eq!(parse_preview_id(request), None);
315 }
316
317 #[test]
318 fn parse_id_rejects_missing_extension() {
319 let request = b"GET /preview/42 HTTP/1.1\r\n\r\n";
320 assert_eq!(parse_preview_id(request), None);
321 }
322
323 #[test]
324 fn parse_id_rejects_wrong_prefix() {
325 let request = b"GET /other/42.bin HTTP/1.1\r\n\r\n";
326 assert_eq!(parse_preview_id(request), None);
327 }
328
329 #[test]
330 fn parse_id_rejects_empty_request() {
331 assert_eq!(parse_preview_id(b""), None);
332 }
333
334 #[test]
335 fn parse_id_rejects_invalid_utf8() {
336 let bytes = [0xFF, 0xFE, 0xFD, 0xFC];
337 assert_eq!(parse_preview_id(&bytes), None);
338 }
339
340 #[test]
341 fn header_has_expected_length_and_magic() {
342 let header = preview_header(PREVIEW_FORMAT_RGBA, 1920, 1080, 7680, 42);
343 assert_eq!(header.len(), PREVIEW_HEADER_LEN);
344 assert_eq!(&header[0..4], &PREVIEW_MAGIC);
345 assert_eq!(header[4], PREVIEW_VERSION);
346 assert_eq!(header[5], PREVIEW_FORMAT_RGBA);
347 assert_eq!(header[6], 0);
348 assert_eq!(header[7], 0);
349 }
350
351 #[test]
352 fn header_fields_are_little_endian() {
353 let width = 0x0000_0780_u32;
354 let height = 0x0000_0438_u32;
355 let stride = 0x0000_1E00_u32;
356 let counter = 0xDEAD_BEEF_u32;
357 let header = preview_header(PREVIEW_FORMAT_NV12, width, height, stride, counter);
358 assert_eq!(&header[8..12], &width.to_le_bytes());
359 assert_eq!(&header[12..16], &height.to_le_bytes());
360 assert_eq!(&header[16..20], &stride.to_le_bytes());
361 assert_eq!(&header[20..24], &counter.to_le_bytes());
362 }
363
364 #[test]
365 fn header_for_empty_frame_has_zero_fields() {
366 let header = preview_header(PREVIEW_FORMAT_NONE, 0, 0, 0, 0);
367 assert_eq!(header.len(), PREVIEW_HEADER_LEN);
368 assert_eq!(header[5], PREVIEW_FORMAT_NONE);
369 assert!(header[8..].iter().all(|&b| b == 0));
370 }
371}