unros_core/logging/
dump.rs

1//! Data dumps are an alternative way of logging that is more suited to
2//! large collections of data.
3//!
4//! Data dumps offer a way to write data to some location such that the
5//! code producing the data does not get blocked by writing. If the write
6//! is queued successfully, then the write is guaranteed to occur, as long
7//! as the current program is not forcefully terminated.
8
9use std::{
10    error::Error,
11    fmt::Display,
12    io::{ErrorKind, Write},
13    net::{SocketAddr, SocketAddrV4},
14    path::{Path, PathBuf},
15    process::{Command, Stdio},
16    sync::Arc,
17};
18
19use crossbeam::{queue::ArrayQueue, utils::Backoff};
20use ffmpeg_sidecar::{child::FfmpegChild, command::FfmpegCommand, event::FfmpegEvent};
21use image::{DynamicImage, EncodableLayout};
22use log::{error, info, warn};
23use tokio::{
24    fs::File,
25    io::{AsyncWrite, AsyncWriteExt, BufWriter},
26    net::TcpStream,
27    sync::mpsc,
28};
29
30use crate::{spawn_persistent_thread, DropCheck};
31
32use super::SUB_LOGGING_DIR;
33
34struct DataDumpInner {
35    writer: mpsc::UnboundedSender<Vec<u8>>,
36    empty_vecs: mpsc::UnboundedReceiver<Vec<u8>>,
37}
38
39/// A handle to the Data Dump thread that handles all writes
40/// to a location.
41///
42/// This struct implements the blocking `Write` interface as provided
43/// by `Rust`, instead of the non-blocking `AsyncWrite` offered by `tokio`.
44/// However, writes are guaranteed to be non-blocking and thus safe to use in
45/// async code.
46///
47/// # Note
48/// A default `DataDump` does not write to any location. It is equivalent to `sink()`.
49#[derive(Default)]
50pub struct DataDump(Option<DataDumpInner>);
51
52impl DataDump {
53    /// Create a `DataDump` that writes to the given path.
54    ///
55    /// For logging purposes, the filename is used as the name of the dump.
56    ///
57    /// # Note
58    /// If the given path is relative, it will be considered relative to the
59    /// sub-logging directory. If a logging implementation has not been initialized
60    /// through `init_logger`, `async_run_all`, or `run_all`, then this method will
61    /// return a `NotFound` io error.
62    pub async fn new_file(path: impl AsRef<Path>) -> std::io::Result<Self> {
63        let file = if path.as_ref().is_absolute() {
64            File::create(path.as_ref()).await?
65        } else {
66            let Some(sub_log_dir) = SUB_LOGGING_DIR.get() else {
67                return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "Sub-logging directory has not been initialized with a call to `init_logger`, `async_run_all`, or `run_all`"));
68            };
69            File::create(PathBuf::from(sub_log_dir).join(path.as_ref())).await?
70        };
71        Self::new(BufWriter::new(file), path.as_ref().to_string_lossy())
72    }
73
74    /// Create a `DataDump` that writes to the network address.
75    ///
76    /// For logging purposes, the address is used as the name of the dump.
77    pub async fn new_tcp(addr: SocketAddr) -> std::io::Result<Self> {
78        let stream = TcpStream::connect(addr).await?;
79        Self::new(BufWriter::new(stream), addr.to_string())
80    }
81
82    /// Create a `DataDump` that writes to the given writer.
83    ///
84    /// For logging purposes, the given name is used as the name of the dump.
85    pub fn new<A>(mut writer: A, name: impl Into<String>) -> std::io::Result<Self>
86    where
87        A: AsyncWrite + Unpin + Send + 'static,
88    {
89        let name = name.into();
90        let (empty_vecs_sender, empty_vecs) = mpsc::unbounded_channel();
91        let (writer_sender, mut reader) = mpsc::unbounded_channel::<Vec<_>>();
92        spawn_persistent_thread(move || {
93            tokio::runtime::Builder::new_current_thread()
94                .enable_io()
95                .build()
96                .unwrap()
97                .block_on(async move {
98                    loop {
99                        let Some(mut bytes) = reader.recv().await else {
100                            break;
101                        };
102                        if let Err(e) = writer.write_all(&bytes).await {
103                            error!("Failed to write to {name:?}: {e}");
104                            return;
105                        }
106                        bytes.clear();
107                        let _ = empty_vecs_sender.send(bytes);
108                    }
109                    if let Err(e) = writer.flush().await {
110                        error!("Failed to flush to {name:?}: {e}");
111                    }
112                });
113        });
114        Ok(Self(Some(DataDumpInner {
115            writer: writer_sender,
116            empty_vecs,
117        })))
118    }
119}
120
121impl Write for DataDump {
122    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
123        let Some(inner) = self.0.as_mut() else {
124            return Ok(buf.len());
125        };
126        let mut vec = inner
127            .empty_vecs
128            .try_recv()
129            .unwrap_or_else(|_| Vec::with_capacity(buf.len()));
130
131        let buf_capacity = vec.capacity();
132        let n;
133        if buf_capacity < buf.len() {
134            n = buf_capacity;
135            vec.extend_from_slice(buf.split_at(buf_capacity).0);
136        } else {
137            n = buf.len();
138            vec.extend_from_slice(buf);
139        }
140
141        inner
142            .writer
143            .send(vec)
144            .map_err(|_| std::io::Error::from(std::io::ErrorKind::BrokenPipe))?;
145
146        Ok(n)
147    }
148
149    fn flush(&mut self) -> std::io::Result<()> {
150        Ok(())
151    }
152}
153
154/// An error faced while writing video frames.
155#[derive(Debug)]
156pub enum VideoWriteError {
157    /// The size of the given image is incorrect.
158    IncorrectDimensions {
159        expected_width: u32,
160        expected_height: u32,
161        actual_width: u32,
162        actual_height: u32,
163    },
164    /// There is no information attached to the error.
165    Unknown,
166}
167
168impl Display for VideoWriteError {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        match self {
171            Self::Unknown => write!(f, "The video writing thread has failed for some reason"),
172            Self::IncorrectDimensions { expected_width: expected_x, expected_height: expected_y, actual_width: actual_x, actual_height: actual_y } => write!(f, "Image dimensions are wrong. Expected {expected_x}x{expected_y}, got {actual_x}x{actual_y}"),
173        }
174    }
175}
176impl Error for VideoWriteError {}
177
178#[derive(Clone)]
179enum VideoDataDumpType {
180    Rtp(SocketAddrV4),
181    File(PathBuf),
182    Display,
183}
184
185impl std::fmt::Display for VideoDataDumpType {
186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187        match self {
188            VideoDataDumpType::Rtp(addr) => write!(f, "{addr}"),
189            VideoDataDumpType::File(path) => {
190                write!(f, "{}", path.file_name().unwrap().to_string_lossy())
191            }
192            VideoDataDumpType::Display => {
193                write!(f, "Display")
194            }
195        }
196    }
197}
198
199/// A dump for writing images into videos using `ffmpeg`.
200/// 
201/// If `ffmpeg` is not installed, it will be downloaded locally
202/// automatically.
203pub struct VideoDataDump {
204    video_writer: Arc<ArrayQueue<Arc<DynamicImage>>>,
205    writer_drop: DropCheck,
206    width: u32,
207    height: u32,
208    dump_type: VideoDataDumpType,
209    // path: PathBuf,
210    // start: Instant,
211}
212
213/// An error faced while initializing a `VideoDataDump`.
214#[derive(Debug)]
215pub enum VideoDumpInitError {
216    /// An error writing to or reading from `ffmpeg`.
217    IOError(std::io::Error),
218    /// An error from `ffmpeg` while it was encoding the video.
219    VideoError(String),
220    /// An error setting up the logging for the dump.
221    LoggingError(anyhow::Error),
222    /// An error automatically installing `ffmpeg`.
223    FFMPEGInstallError(String),
224}
225
226impl Error for VideoDumpInitError {}
227impl Display for VideoDumpInitError {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        match self {
230            VideoDumpInitError::IOError(e) => {
231                write!(f, "Faced an error initializing the video encoder: {e}")
232            }
233            VideoDumpInitError::LoggingError(e) => write!(
234                f,
235                "Faced an error setting up the logging for the video encoder: {e}"
236            ),
237            VideoDumpInitError::FFMPEGInstallError(e) => write!(
238                f,
239                "Faced an error installing FFMPEG for the video encoder: {e}"
240            ),
241            VideoDumpInitError::VideoError(e) => {
242                write!(f, "Faced an error while encoding video: {e}")
243            }
244        }
245    }
246}
247
248/// The type of filter used when scaling.
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub enum ScalingFilter {
251    /// Nearest neighbor. Excellent for performance.
252    /// 
253    /// This adds no blurring whatsoever when upscaling, and mediocre quality when downscaling.
254    Neighbor,
255    /// Uses a fast bilinear algorithm. Good for performance.
256    /// 
257    /// This adds some blurring when upscaling, and average quality when downscaling.
258    FastBilinear,
259}
260
261impl std::fmt::Display for ScalingFilter {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        match self {
264            Self::Neighbor => write!(f, "neighbor"),
265            Self::FastBilinear => write!(f, "fast_bilinear"),
266        }
267    }
268}
269
270impl VideoDataDump {
271    /// Generates the sdp file for this data dump assuming that this dump
272    /// was made for RTP.
273    #[must_use]
274    pub fn generate_sdp(&self) -> Option<String> {
275        let VideoDataDumpType::Rtp(addr) = &self.dump_type else {
276            return None;
277        };
278        Some(format!(
279            "v=0
280o=- 0 0 IN IP4 127.0.0.1
281s=No Name
282c=IN IP4 {}
283t=0 0
284a=tool:libavformat 58.76.100
285m=video {} RTP/AVP 96
286a=rtpmap:96 H264/90000
287a=fmtp:96 packetization-mode=1",
288            addr.ip(),
289            addr.port()
290        ))
291    }
292
293    /// Creates a new `VideoDataDump` that displays to a window.
294    /// 
295    /// `ffplay` may need to be installed separately.
296    pub fn new_display(
297        in_width: u32,
298        in_height: u32,
299        fps: usize,
300    ) -> Result<Self, VideoDumpInitError> {
301        let cmd = Command::new("ffplay")
302            .args([
303                "-f",
304                "rawvideo",
305                "-pixel_format",
306                "rgb24",
307                "-video_size",
308                &format!("{in_width}x{in_height}"),
309                "-vf",
310                &format!("fps={fps}"),
311                "-i",
312                "-",
313            ])
314            .stdin(Stdio::piped())
315            .stdout(Stdio::null())
316            .stderr(Stdio::piped())
317            .spawn()
318            .map_err(VideoDumpInitError::IOError)?;
319
320        let queue_sender = Arc::new(ArrayQueue::<Arc<DynamicImage>>::new(1));
321        let queue_receiver = queue_sender.clone();
322        let writer_drop = DropCheck::default();
323        let reader_drop = writer_drop.clone();
324
325        let mut video_out = cmd.stdin.unwrap();
326
327        let backoff = Backoff::new();
328
329        spawn_persistent_thread(move || loop {
330            if reader_drop.has_dropped() {
331                if let Err(e) = video_out.flush() {
332                    error!("Failed to flush Display: {e}");
333                }
334                break;
335            }
336            let Some(frame) = queue_receiver.pop() else {
337                backoff.snooze();
338                continue;
339            };
340            backoff.reset();
341
342            if let Err(e) = video_out.write_all(frame.to_rgb8().as_bytes()) {
343                if e.kind() == ErrorKind::BrokenPipe {
344                    error!("Display has closed!");
345                    break;
346                } else {
347                    error!("Faced the following error while writing video frame to Display: {e}");
348                }
349            }
350        });
351
352        Ok(Self {
353            video_writer: queue_sender,
354            writer_drop,
355            width: in_width,
356            height: in_height,
357            dump_type: VideoDataDumpType::Display,
358        })
359    }
360
361    /// Creates a new `VideoDataDump` that writes to a video file.
362    pub fn new_file(
363        in_width: u32,
364        in_height: u32,
365        out_width: u32,
366        out_height: u32,
367        scale_filter: ScalingFilter,
368        path: impl AsRef<Path>,
369        fps: usize,
370    ) -> Result<Self, VideoDumpInitError> {
371        ffmpeg_sidecar::download::auto_download()
372            .map_err(|e| VideoDumpInitError::FFMPEGInstallError(e.to_string()))?;
373        let pathbuf: PathBuf = if path.as_ref().is_absolute() {
374            path.as_ref().into()
375        } else {
376            let Some(sub_log_dir) = SUB_LOGGING_DIR.get() else {
377                return Err(VideoDumpInitError::LoggingError(anyhow::anyhow!("Sub-logging directory has not been initialized with a call to `init_logger`, `async_run_all`, or `run_all`")));
378            };
379            PathBuf::from(sub_log_dir).join(path.as_ref())
380        };
381
382        let output = FfmpegCommand::new()
383            .hwaccel("auto")
384            .args([
385                "-f",
386                "rawvideo",
387                "-pix_fmt",
388                "rgb24",
389                "-s",
390                &format!("{in_width}x{in_height}"),
391            ])
392            .input("-")
393            .args([
394                "-vf",
395                &format!("fps={fps},scale={out_width}:{out_height}"),
396                "-sws_flags",
397                &scale_filter.to_string(),
398            ])
399            .args(["-c:v", "libx265"])
400            .args(["-y".as_ref(), pathbuf.as_os_str()])
401            .spawn()
402            .map_err(VideoDumpInitError::IOError)?;
403
404        Self::new(
405            in_width,
406            in_height,
407            VideoDataDumpType::File(pathbuf),
408            output,
409        )
410    }
411
412    /// Creates a new `VideoDataDump` that streams to a client over RTP.
413    pub fn new_rtp(
414        in_width: u32,
415        in_height: u32,
416        out_width: u32,
417        out_height: u32,
418        scale_filter: ScalingFilter,
419        addr: SocketAddrV4,
420        fps: usize,
421    ) -> Result<Self, VideoDumpInitError> {
422        ffmpeg_sidecar::download::auto_download()
423            .map_err(|e| VideoDumpInitError::FFMPEGInstallError(e.to_string()))?;
424
425        let output = FfmpegCommand::new()
426            .hwaccel("auto")
427            .format("rawvideo")
428            .pix_fmt("rgb24")
429            .size(in_width, in_height)
430            .input("-")
431            .codec_video("libx264")
432            .pix_fmt("yuv420p")
433            .args([
434                "-crf",
435                "35",
436                "-an",
437                "-vf",
438                &format!("fps={fps},scale={out_width}:{out_height}"),
439                "-sws_flags",
440                &scale_filter.to_string(),
441            ])
442            .args([
443                "-preset",
444                "ultrafast",
445                "-tune",
446                "zerolatency",
447                "-strict",
448                "2",
449                "-avioflags",
450                "direct",
451                "-rtsp_transport",
452                "udp",
453            ])
454            // .args(["-sdp_file", "sdp.txt"])
455            .format("rtp")
456            .output(format!("rtp://{addr}"))
457            .spawn()
458            .map_err(VideoDumpInitError::IOError)?;
459
460        Self::new(in_width, in_height, VideoDataDumpType::Rtp(addr), output)
461    }
462
463    fn new(
464        in_width: u32,
465        in_height: u32,
466        dump_type: VideoDataDumpType,
467        mut output: FfmpegChild,
468    ) -> Result<Self, VideoDumpInitError> {
469        let queue_sender = Arc::new(ArrayQueue::<Arc<DynamicImage>>::new(1));
470        let queue_receiver = queue_sender.clone();
471        let writer_drop = DropCheck::default();
472        let reader_drop = writer_drop.clone();
473
474        let mut video_out = output.take_stdin().unwrap();
475
476        let events = output
477            .iter()
478            .map_err(|e| VideoDumpInitError::VideoError(e.to_string()))?;
479
480        let dump_type2 = dump_type.clone();
481
482        spawn_persistent_thread(move || {
483            events.for_each(|event| {
484                if let FfmpegEvent::Log(level, msg) = event {
485                    match level {
486                        ffmpeg_sidecar::event::LogLevel::Info => info!("[{dump_type2}] {msg}"),
487                        ffmpeg_sidecar::event::LogLevel::Warning => warn!("[{dump_type2}] {msg}"),
488                        ffmpeg_sidecar::event::LogLevel::Unknown => {}
489                        _ => error!("[{dump_type2}] {msg}"),
490                    }
491                }
492            });
493        });
494
495        let dump_type2 = dump_type.clone();
496        let backoff = Backoff::new();
497
498        spawn_persistent_thread(move || loop {
499            if reader_drop.has_dropped() {
500                if let Err(e) = video_out.flush() {
501                    error!("Failed to flush {}: {e}", dump_type2);
502                }
503                break;
504            }
505            let Some(frame) = queue_receiver.pop() else {
506                backoff.snooze();
507                continue;
508            };
509            backoff.reset();
510
511            if let Err(e) = video_out.write_all(frame.to_rgb8().as_bytes()) {
512                if e.kind() == ErrorKind::BrokenPipe {
513                    error!("{} has closed!", dump_type2);
514                    break;
515                } else {
516                    error!(
517                        "Faced the following error while writing video frame to {}: {e}",
518                        dump_type2
519                    );
520                }
521            }
522        });
523
524        Ok(Self {
525            video_writer: queue_sender,
526            writer_drop,
527            width: in_width,
528            height: in_height,
529            dump_type, // start: Instant::now(),
530                       // path: path.to_path_buf(),
531        })
532    }
533
534    /// Writes an image into this dump.
535    pub fn write_frame(&mut self, frame: Arc<DynamicImage>) -> Result<(), VideoWriteError> {
536        if frame.width() != self.width || frame.height() != self.height {
537            return Err(VideoWriteError::IncorrectDimensions {
538                expected_width: self.width,
539                expected_height: self.height,
540                actual_width: frame.width(),
541                actual_height: frame.height(),
542            });
543        }
544
545        if self.writer_drop.has_dropped() {
546            return Err(VideoWriteError::Unknown);
547        }
548        match self.video_writer.force_push(frame) {
549            Some(_) => {
550                warn!(
551                    "Overwriting last frame in {} as queue was full!",
552                    self.dump_type
553                );
554                Ok(())
555            }
556            None => Ok(()),
557        }
558    }
559
560    pub fn write_frame_quiet(&mut self, frame: Arc<DynamicImage>) -> Result<(), VideoWriteError> {
561        if frame.width() != self.width || frame.height() != self.height {
562            return Err(VideoWriteError::IncorrectDimensions {
563                expected_width: self.width,
564                expected_height: self.height,
565                actual_width: frame.width(),
566                actual_height: frame.height(),
567            });
568        }
569
570        if self.writer_drop.has_dropped() {
571            return Err(VideoWriteError::Unknown);
572        }
573        self.video_writer.force_push(frame);
574        Ok(())
575    }
576
577    // pub async fn init_subtitles(&self) -> Result<SubtitleDump, std::io::Error> {
578    //     let path = PathBuf::from(self.path.file_stem().unwrap()).with_extension("srt");
579    //     let mut timestamp = Timestamp::new(0, 0, 0, 0);
580    //     timestamp.add_milliseconds(self.start.elapsed().as_millis() as i32);
581    //     Ok(SubtitleDump {
582    //         file: DataDump::new_file(path).await?,
583    //         start: self.start,
584    //         timestamp,
585    //         last_sub: None,
586    //         count: 0,
587    //     })
588    // }
589}
590
591// /// An error while writing subtitles to a video.
592// pub enum SubtitleWriteError {
593//     /// An error while writing the subtitiles to the file.
594//     IOError(std::io::Error),
595// }
596
597// pub struct SubtitleDump {
598//     file: DataDump,
599//     start: Instant,
600//     timestamp: Timestamp,
601//     count: usize,
602//     last_sub: Option<String>,
603// }
604
605// impl SubtitleDump {
606//     pub fn write_subtitle(&mut self, subtitle: impl Into<String>) -> std::io::Result<()> {
607//         let start_time = self.timestamp;
608//         let mut end_time = start_time;
609//         let now = Instant::now();
610//         end_time.add_milliseconds(now.duration_since(self.start).as_millis() as i32);
611
612//         if let Some(last_sub) = self.last_sub.take() {
613//             self.count += 1;
614//             let sub = Subtitle::new(self.count, start_time, end_time, last_sub);
615//             self.start = now;
616//             self.timestamp = end_time;
617//             self.last_sub = Some(subtitle.into());
618//             writeln!(self.file, "{sub}\n")
619//         } else {
620//             self.start = now;
621//             self.timestamp = end_time;
622//             self.last_sub = Some(subtitle.into());
623//             Ok(())
624//         }
625//     }
626
627//     pub fn clear_subtitle(&mut self) -> std::io::Result<()> {
628//         let start_time = self.timestamp;
629//         let mut end_time = start_time;
630//         let now = Instant::now();
631//         end_time.add_milliseconds(now.duration_since(self.start).as_millis() as i32);
632
633//         if let Some(last_sub) = self.last_sub.take() {
634//             self.count += 1;
635//             let sub = Subtitle::new(self.count, start_time, end_time, last_sub);
636//             self.start = now;
637//             self.timestamp = end_time;
638//             self.last_sub = None;
639//             writeln!(self.file, "{sub}\n")
640//         } else {
641//             self.start = now;
642//             self.timestamp = end_time;
643//             self.last_sub = None;
644//             Ok(())
645//         }
646//     }
647// }
648
649// impl Drop for SubtitleDump {
650//     fn drop(&mut self) {
651//         if let Some(last_sub) = self.last_sub.take() {
652//             let start_time = self.timestamp;
653//             let mut end_time = start_time;
654//             let now = Instant::now();
655//             end_time.add_milliseconds(now.duration_since(self.start).as_millis() as i32);
656//             self.count += 1;
657//             let sub = Subtitle::new(self.count, start_time, end_time, last_sub);
658//             self.start = now;
659//             self.timestamp = end_time;
660//             self.last_sub = None;
661//             if let Err(e) = writeln!(self.file, "{sub}\n") {
662//                 error!("Failed to write final subtitle: {e}");
663//             }
664//         }
665//     }
666// }