1use std::{
10 error::Error,
11 fmt::Display,
12 io::{ErrorKind, Write},
13 net::{SocketAddr, SocketAddrV4},
14 path::{Path, PathBuf},
15 process::{Command, Stdio},
16 sync::Arc,
17};
18
19use crossbeam::{queue::ArrayQueue, utils::Backoff};
20use ffmpeg_sidecar::{child::FfmpegChild, command::FfmpegCommand, event::FfmpegEvent};
21use image::{DynamicImage, EncodableLayout};
22use log::{error, info, warn};
23use tokio::{
24 fs::File,
25 io::{AsyncWrite, AsyncWriteExt, BufWriter},
26 net::TcpStream,
27 sync::mpsc,
28};
29
30use crate::{spawn_persistent_thread, DropCheck};
31
32use super::SUB_LOGGING_DIR;
33
34struct DataDumpInner {
35 writer: mpsc::UnboundedSender<Vec<u8>>,
36 empty_vecs: mpsc::UnboundedReceiver<Vec<u8>>,
37}
38
39#[derive(Default)]
50pub struct DataDump(Option<DataDumpInner>);
51
52impl DataDump {
53 pub async fn new_file(path: impl AsRef<Path>) -> std::io::Result<Self> {
63 let file = if path.as_ref().is_absolute() {
64 File::create(path.as_ref()).await?
65 } else {
66 let Some(sub_log_dir) = SUB_LOGGING_DIR.get() else {
67 return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "Sub-logging directory has not been initialized with a call to `init_logger`, `async_run_all`, or `run_all`"));
68 };
69 File::create(PathBuf::from(sub_log_dir).join(path.as_ref())).await?
70 };
71 Self::new(BufWriter::new(file), path.as_ref().to_string_lossy())
72 }
73
74 pub async fn new_tcp(addr: SocketAddr) -> std::io::Result<Self> {
78 let stream = TcpStream::connect(addr).await?;
79 Self::new(BufWriter::new(stream), addr.to_string())
80 }
81
82 pub fn new<A>(mut writer: A, name: impl Into<String>) -> std::io::Result<Self>
86 where
87 A: AsyncWrite + Unpin + Send + 'static,
88 {
89 let name = name.into();
90 let (empty_vecs_sender, empty_vecs) = mpsc::unbounded_channel();
91 let (writer_sender, mut reader) = mpsc::unbounded_channel::<Vec<_>>();
92 spawn_persistent_thread(move || {
93 tokio::runtime::Builder::new_current_thread()
94 .enable_io()
95 .build()
96 .unwrap()
97 .block_on(async move {
98 loop {
99 let Some(mut bytes) = reader.recv().await else {
100 break;
101 };
102 if let Err(e) = writer.write_all(&bytes).await {
103 error!("Failed to write to {name:?}: {e}");
104 return;
105 }
106 bytes.clear();
107 let _ = empty_vecs_sender.send(bytes);
108 }
109 if let Err(e) = writer.flush().await {
110 error!("Failed to flush to {name:?}: {e}");
111 }
112 });
113 });
114 Ok(Self(Some(DataDumpInner {
115 writer: writer_sender,
116 empty_vecs,
117 })))
118 }
119}
120
121impl Write for DataDump {
122 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
123 let Some(inner) = self.0.as_mut() else {
124 return Ok(buf.len());
125 };
126 let mut vec = inner
127 .empty_vecs
128 .try_recv()
129 .unwrap_or_else(|_| Vec::with_capacity(buf.len()));
130
131 let buf_capacity = vec.capacity();
132 let n;
133 if buf_capacity < buf.len() {
134 n = buf_capacity;
135 vec.extend_from_slice(buf.split_at(buf_capacity).0);
136 } else {
137 n = buf.len();
138 vec.extend_from_slice(buf);
139 }
140
141 inner
142 .writer
143 .send(vec)
144 .map_err(|_| std::io::Error::from(std::io::ErrorKind::BrokenPipe))?;
145
146 Ok(n)
147 }
148
149 fn flush(&mut self) -> std::io::Result<()> {
150 Ok(())
151 }
152}
153
154#[derive(Debug)]
156pub enum VideoWriteError {
157 IncorrectDimensions {
159 expected_width: u32,
160 expected_height: u32,
161 actual_width: u32,
162 actual_height: u32,
163 },
164 Unknown,
166}
167
168impl Display for VideoWriteError {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 match self {
171 Self::Unknown => write!(f, "The video writing thread has failed for some reason"),
172 Self::IncorrectDimensions { expected_width: expected_x, expected_height: expected_y, actual_width: actual_x, actual_height: actual_y } => write!(f, "Image dimensions are wrong. Expected {expected_x}x{expected_y}, got {actual_x}x{actual_y}"),
173 }
174 }
175}
176impl Error for VideoWriteError {}
177
178#[derive(Clone)]
179enum VideoDataDumpType {
180 Rtp(SocketAddrV4),
181 File(PathBuf),
182 Display,
183}
184
185impl std::fmt::Display for VideoDataDumpType {
186 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187 match self {
188 VideoDataDumpType::Rtp(addr) => write!(f, "{addr}"),
189 VideoDataDumpType::File(path) => {
190 write!(f, "{}", path.file_name().unwrap().to_string_lossy())
191 }
192 VideoDataDumpType::Display => {
193 write!(f, "Display")
194 }
195 }
196 }
197}
198
199pub struct VideoDataDump {
204 video_writer: Arc<ArrayQueue<Arc<DynamicImage>>>,
205 writer_drop: DropCheck,
206 width: u32,
207 height: u32,
208 dump_type: VideoDataDumpType,
209 }
212
213#[derive(Debug)]
215pub enum VideoDumpInitError {
216 IOError(std::io::Error),
218 VideoError(String),
220 LoggingError(anyhow::Error),
222 FFMPEGInstallError(String),
224}
225
226impl Error for VideoDumpInitError {}
227impl Display for VideoDumpInitError {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 match self {
230 VideoDumpInitError::IOError(e) => {
231 write!(f, "Faced an error initializing the video encoder: {e}")
232 }
233 VideoDumpInitError::LoggingError(e) => write!(
234 f,
235 "Faced an error setting up the logging for the video encoder: {e}"
236 ),
237 VideoDumpInitError::FFMPEGInstallError(e) => write!(
238 f,
239 "Faced an error installing FFMPEG for the video encoder: {e}"
240 ),
241 VideoDumpInitError::VideoError(e) => {
242 write!(f, "Faced an error while encoding video: {e}")
243 }
244 }
245 }
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub enum ScalingFilter {
251 Neighbor,
255 FastBilinear,
259}
260
261impl std::fmt::Display for ScalingFilter {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 match self {
264 Self::Neighbor => write!(f, "neighbor"),
265 Self::FastBilinear => write!(f, "fast_bilinear"),
266 }
267 }
268}
269
270impl VideoDataDump {
271 #[must_use]
274 pub fn generate_sdp(&self) -> Option<String> {
275 let VideoDataDumpType::Rtp(addr) = &self.dump_type else {
276 return None;
277 };
278 Some(format!(
279 "v=0
280o=- 0 0 IN IP4 127.0.0.1
281s=No Name
282c=IN IP4 {}
283t=0 0
284a=tool:libavformat 58.76.100
285m=video {} RTP/AVP 96
286a=rtpmap:96 H264/90000
287a=fmtp:96 packetization-mode=1",
288 addr.ip(),
289 addr.port()
290 ))
291 }
292
293 pub fn new_display(
297 in_width: u32,
298 in_height: u32,
299 fps: usize,
300 ) -> Result<Self, VideoDumpInitError> {
301 let cmd = Command::new("ffplay")
302 .args([
303 "-f",
304 "rawvideo",
305 "-pixel_format",
306 "rgb24",
307 "-video_size",
308 &format!("{in_width}x{in_height}"),
309 "-vf",
310 &format!("fps={fps}"),
311 "-i",
312 "-",
313 ])
314 .stdin(Stdio::piped())
315 .stdout(Stdio::null())
316 .stderr(Stdio::piped())
317 .spawn()
318 .map_err(VideoDumpInitError::IOError)?;
319
320 let queue_sender = Arc::new(ArrayQueue::<Arc<DynamicImage>>::new(1));
321 let queue_receiver = queue_sender.clone();
322 let writer_drop = DropCheck::default();
323 let reader_drop = writer_drop.clone();
324
325 let mut video_out = cmd.stdin.unwrap();
326
327 let backoff = Backoff::new();
328
329 spawn_persistent_thread(move || loop {
330 if reader_drop.has_dropped() {
331 if let Err(e) = video_out.flush() {
332 error!("Failed to flush Display: {e}");
333 }
334 break;
335 }
336 let Some(frame) = queue_receiver.pop() else {
337 backoff.snooze();
338 continue;
339 };
340 backoff.reset();
341
342 if let Err(e) = video_out.write_all(frame.to_rgb8().as_bytes()) {
343 if e.kind() == ErrorKind::BrokenPipe {
344 error!("Display has closed!");
345 break;
346 } else {
347 error!("Faced the following error while writing video frame to Display: {e}");
348 }
349 }
350 });
351
352 Ok(Self {
353 video_writer: queue_sender,
354 writer_drop,
355 width: in_width,
356 height: in_height,
357 dump_type: VideoDataDumpType::Display,
358 })
359 }
360
361 pub fn new_file(
363 in_width: u32,
364 in_height: u32,
365 out_width: u32,
366 out_height: u32,
367 scale_filter: ScalingFilter,
368 path: impl AsRef<Path>,
369 fps: usize,
370 ) -> Result<Self, VideoDumpInitError> {
371 ffmpeg_sidecar::download::auto_download()
372 .map_err(|e| VideoDumpInitError::FFMPEGInstallError(e.to_string()))?;
373 let pathbuf: PathBuf = if path.as_ref().is_absolute() {
374 path.as_ref().into()
375 } else {
376 let Some(sub_log_dir) = SUB_LOGGING_DIR.get() else {
377 return Err(VideoDumpInitError::LoggingError(anyhow::anyhow!("Sub-logging directory has not been initialized with a call to `init_logger`, `async_run_all`, or `run_all`")));
378 };
379 PathBuf::from(sub_log_dir).join(path.as_ref())
380 };
381
382 let output = FfmpegCommand::new()
383 .hwaccel("auto")
384 .args([
385 "-f",
386 "rawvideo",
387 "-pix_fmt",
388 "rgb24",
389 "-s",
390 &format!("{in_width}x{in_height}"),
391 ])
392 .input("-")
393 .args([
394 "-vf",
395 &format!("fps={fps},scale={out_width}:{out_height}"),
396 "-sws_flags",
397 &scale_filter.to_string(),
398 ])
399 .args(["-c:v", "libx265"])
400 .args(["-y".as_ref(), pathbuf.as_os_str()])
401 .spawn()
402 .map_err(VideoDumpInitError::IOError)?;
403
404 Self::new(
405 in_width,
406 in_height,
407 VideoDataDumpType::File(pathbuf),
408 output,
409 )
410 }
411
412 pub fn new_rtp(
414 in_width: u32,
415 in_height: u32,
416 out_width: u32,
417 out_height: u32,
418 scale_filter: ScalingFilter,
419 addr: SocketAddrV4,
420 fps: usize,
421 ) -> Result<Self, VideoDumpInitError> {
422 ffmpeg_sidecar::download::auto_download()
423 .map_err(|e| VideoDumpInitError::FFMPEGInstallError(e.to_string()))?;
424
425 let output = FfmpegCommand::new()
426 .hwaccel("auto")
427 .format("rawvideo")
428 .pix_fmt("rgb24")
429 .size(in_width, in_height)
430 .input("-")
431 .codec_video("libx264")
432 .pix_fmt("yuv420p")
433 .args([
434 "-crf",
435 "35",
436 "-an",
437 "-vf",
438 &format!("fps={fps},scale={out_width}:{out_height}"),
439 "-sws_flags",
440 &scale_filter.to_string(),
441 ])
442 .args([
443 "-preset",
444 "ultrafast",
445 "-tune",
446 "zerolatency",
447 "-strict",
448 "2",
449 "-avioflags",
450 "direct",
451 "-rtsp_transport",
452 "udp",
453 ])
454 .format("rtp")
456 .output(format!("rtp://{addr}"))
457 .spawn()
458 .map_err(VideoDumpInitError::IOError)?;
459
460 Self::new(in_width, in_height, VideoDataDumpType::Rtp(addr), output)
461 }
462
463 fn new(
464 in_width: u32,
465 in_height: u32,
466 dump_type: VideoDataDumpType,
467 mut output: FfmpegChild,
468 ) -> Result<Self, VideoDumpInitError> {
469 let queue_sender = Arc::new(ArrayQueue::<Arc<DynamicImage>>::new(1));
470 let queue_receiver = queue_sender.clone();
471 let writer_drop = DropCheck::default();
472 let reader_drop = writer_drop.clone();
473
474 let mut video_out = output.take_stdin().unwrap();
475
476 let events = output
477 .iter()
478 .map_err(|e| VideoDumpInitError::VideoError(e.to_string()))?;
479
480 let dump_type2 = dump_type.clone();
481
482 spawn_persistent_thread(move || {
483 events.for_each(|event| {
484 if let FfmpegEvent::Log(level, msg) = event {
485 match level {
486 ffmpeg_sidecar::event::LogLevel::Info => info!("[{dump_type2}] {msg}"),
487 ffmpeg_sidecar::event::LogLevel::Warning => warn!("[{dump_type2}] {msg}"),
488 ffmpeg_sidecar::event::LogLevel::Unknown => {}
489 _ => error!("[{dump_type2}] {msg}"),
490 }
491 }
492 });
493 });
494
495 let dump_type2 = dump_type.clone();
496 let backoff = Backoff::new();
497
498 spawn_persistent_thread(move || loop {
499 if reader_drop.has_dropped() {
500 if let Err(e) = video_out.flush() {
501 error!("Failed to flush {}: {e}", dump_type2);
502 }
503 break;
504 }
505 let Some(frame) = queue_receiver.pop() else {
506 backoff.snooze();
507 continue;
508 };
509 backoff.reset();
510
511 if let Err(e) = video_out.write_all(frame.to_rgb8().as_bytes()) {
512 if e.kind() == ErrorKind::BrokenPipe {
513 error!("{} has closed!", dump_type2);
514 break;
515 } else {
516 error!(
517 "Faced the following error while writing video frame to {}: {e}",
518 dump_type2
519 );
520 }
521 }
522 });
523
524 Ok(Self {
525 video_writer: queue_sender,
526 writer_drop,
527 width: in_width,
528 height: in_height,
529 dump_type, })
532 }
533
534 pub fn write_frame(&mut self, frame: Arc<DynamicImage>) -> Result<(), VideoWriteError> {
536 if frame.width() != self.width || frame.height() != self.height {
537 return Err(VideoWriteError::IncorrectDimensions {
538 expected_width: self.width,
539 expected_height: self.height,
540 actual_width: frame.width(),
541 actual_height: frame.height(),
542 });
543 }
544
545 if self.writer_drop.has_dropped() {
546 return Err(VideoWriteError::Unknown);
547 }
548 match self.video_writer.force_push(frame) {
549 Some(_) => {
550 warn!(
551 "Overwriting last frame in {} as queue was full!",
552 self.dump_type
553 );
554 Ok(())
555 }
556 None => Ok(()),
557 }
558 }
559
560 pub fn write_frame_quiet(&mut self, frame: Arc<DynamicImage>) -> Result<(), VideoWriteError> {
561 if frame.width() != self.width || frame.height() != self.height {
562 return Err(VideoWriteError::IncorrectDimensions {
563 expected_width: self.width,
564 expected_height: self.height,
565 actual_width: frame.width(),
566 actual_height: frame.height(),
567 });
568 }
569
570 if self.writer_drop.has_dropped() {
571 return Err(VideoWriteError::Unknown);
572 }
573 self.video_writer.force_push(frame);
574 Ok(())
575 }
576
577 }
590
591