Skip to main content

re_log_encoding/rrd/
file_sink.rs

1use std::fmt;
2use std::path::PathBuf;
3
4use crossbeam::channel::{Receiver, RecvTimeoutError, SendError, Sender};
5use parking_lot::Mutex;
6use re_log_types::LogMsg;
7use re_quota_channel::send_crossbeam;
8
9/// An error that can occur when flushing.
10#[derive(Debug, thiserror::Error)]
11pub enum FileFlushError {
12    #[error("Failed to flush file: {message}")]
13    Failed { message: String },
14
15    #[error("File flush timed out - not all messages were written.")]
16    Timeout,
17}
18
19impl FileFlushError {
20    fn failed(message: impl Into<String>) -> Self {
21        Self::Failed {
22            message: message.into(),
23        }
24    }
25}
26
27/// Errors that can occur when creating a [`FileSink`].
28#[derive(thiserror::Error, Debug)]
29pub enum FileSinkError {
30    /// Error creating the file.
31    #[error("Failed to create file: {source}, path: {path}")]
32    CreateFile {
33        source: std::io::Error,
34        path: PathBuf,
35    },
36
37    /// Error spawning the file writer thread.
38    #[error("Failed to spawn thread: {0}")]
39    SpawnThread(std::io::Error),
40
41    /// Error encoding a log message.
42    #[error("Failed to encode LogMsg: {0}")]
43    LogMsgEncode(#[from] crate::rrd::EncodeError),
44}
45
46#[derive(Debug)]
47enum Command {
48    Send(LogMsg),
49    Flush { on_done: Sender<Result<(), String>> },
50}
51
52impl Command {
53    fn flush() -> (Self, Receiver<Result<(), String>>) {
54        let (tx, rx) = crossbeam::channel::bounded(0); // oneshot
55        (Self::Flush { on_done: tx }, rx)
56    }
57}
58
59/// Stream log messages to an `.rrd` file.
60pub struct FileSink {
61    // None = quit
62    tx: Mutex<Sender<Option<Command>>>,
63    join_handle: Option<std::thread::JoinHandle<()>>,
64
65    /// Only used for diagnostics, not for access after `new()`.
66    ///
67    /// `None` indicates stdout.
68    path: Option<PathBuf>,
69}
70
71impl Drop for FileSink {
72    fn drop(&mut self) {
73        send_crossbeam(&self.tx.lock(), None).ok();
74        if let Some(join_handle) = self.join_handle.take() {
75            join_handle.join().ok();
76        }
77    }
78}
79
80impl FileSink {
81    /// Start writing log messages to a file at the given path.
82    pub fn new(path: impl Into<std::path::PathBuf>) -> Result<Self, FileSinkError> {
83        // We always compress on disk
84        let encoding_options = crate::rrd::EncodingOptions::PROTOBUF_COMPRESSED;
85
86        let (tx, rx) = crossbeam::channel::bounded(1024);
87
88        let path = path.into();
89
90        re_log::debug!("Saving file to {path:?}…");
91
92        // TODO(andreas): Can we ensure that a single process doesn't
93        // have multiple file sinks for the same file live?
94        // This likely caused an instability in the past, see https://github.com/rerun-io/rerun/issues/3306
95
96        let file = std::fs::File::create(&path).map_err(|err| FileSinkError::CreateFile {
97            path: path.clone(),
98            source: err,
99        })?;
100        let encoder =
101            crate::Encoder::new_eager(re_build_info::CrateVersion::LOCAL, encoding_options, file)?;
102        let join_handle = spawn_and_stream(Some(&path), encoder, rx)?;
103
104        Ok(Self {
105            tx: tx.into(),
106            join_handle: Some(join_handle),
107            path: Some(path),
108        })
109    }
110
111    /// Start writing log messages to standard output.
112    pub fn stdout() -> Result<Self, FileSinkError> {
113        let encoding_options = crate::rrd::EncodingOptions::PROTOBUF_COMPRESSED;
114
115        let (tx, rx) = crossbeam::channel::bounded(1024);
116
117        re_log::debug!("Writing to stdout…");
118
119        let encoder = crate::Encoder::new_eager(
120            re_build_info::CrateVersion::LOCAL,
121            encoding_options,
122            std::io::stdout(),
123        )?;
124        let join_handle = spawn_and_stream(None, encoder, rx)?;
125
126        Ok(Self {
127            tx: tx.into(),
128            join_handle: Some(join_handle),
129            path: None,
130        })
131    }
132
133    #[inline]
134    pub fn flush_blocking(&self, timeout: std::time::Duration) -> Result<(), FileFlushError> {
135        let (cmd, oneshot) = Command::flush();
136        send_crossbeam(&self.tx.lock(), Some(cmd)).map_err(|_ignored| {
137            FileFlushError::failed("File-writer thread shut down prematurely")
138        })?;
139
140        oneshot
141            .recv_timeout(timeout)
142            .map_err(|err| match err {
143                RecvTimeoutError::Timeout => FileFlushError::Timeout,
144                RecvTimeoutError::Disconnected => {
145                    FileFlushError::failed("File-writer thread shut down prematurely")
146                }
147            })?
148            .map_err(FileFlushError::failed)
149    }
150
151    #[inline]
152    pub fn send(&self, log_msg: LogMsg) {
153        send_crossbeam(&self.tx.lock(), Some(Command::Send(log_msg))).ok();
154    }
155}
156
157/// Set `filepath` to `None` to stream to standard output.
158fn spawn_and_stream<W: std::io::Write + Send + 'static>(
159    filepath: Option<&std::path::Path>,
160    mut encoder: crate::Encoder<W>,
161    rx: Receiver<Option<Command>>,
162) -> Result<std::thread::JoinHandle<()>, FileSinkError> {
163    let (name, target) = if let Some(filepath) = filepath {
164        ("file_writer", filepath.display().to_string())
165    } else {
166        ("stdout_writer", "stdout".to_owned())
167    };
168    std::thread::Builder::new()
169        .name(name.into())
170        .spawn({
171            move || {
172                while let Ok(Some(cmd)) = rx.recv() {
173                    match cmd {
174                        Command::Send(log_msg) => {
175                            if let Err(err) = encoder.append(&log_msg) {
176                                re_log::error!("Failed to write log stream to {target}: {err}");
177                                return;
178                            }
179                        }
180                        Command::Flush { on_done } => {
181                            re_log::trace!("Flushing…");
182
183                            let result = encoder.flush_blocking().map_err(|err| {
184                                format!("Failed to flush log stream to {target}: {err}")
185                            });
186
187                            // Send back the result:
188                            if let Err(SendError(result)) = send_crossbeam(&on_done, result)
189                                && let Err(err) = result
190                            {
191                                // There was an error, and nobody received it:
192                                re_log::error!("{err}");
193                            }
194                        }
195                    }
196                }
197                if let Err(err) = encoder.finish() {
198                    re_log::error!("Failed to end log stream for {target}: {err}");
199                    return;
200                }
201                re_log::debug!("Log stream written to {target}");
202            }
203        })
204        .map_err(FileSinkError::SpawnThread)
205}
206
207impl fmt::Debug for FileSink {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        f.debug_struct("FileSink")
210            .field(
211                "path",
212                &self.path.clone().unwrap_or_else(|| "stdout".into()),
213            )
214            .finish_non_exhaustive()
215    }
216}