re_log_encoding/rrd/
file_sink.rs1use std::fmt;
2use std::path::PathBuf;
3
4use crossbeam::channel::{Receiver, RecvTimeoutError, SendError, Sender};
5use parking_lot::Mutex;
6use re_log_types::LogMsg;
7use re_quota_channel::send_crossbeam;
8
9#[derive(Debug, thiserror::Error)]
11pub enum FileFlushError {
12 #[error("Failed to flush file: {message}")]
13 Failed { message: String },
14
15 #[error("File flush timed out - not all messages were written.")]
16 Timeout,
17}
18
19impl FileFlushError {
20 fn failed(message: impl Into<String>) -> Self {
21 Self::Failed {
22 message: message.into(),
23 }
24 }
25}
26
27#[derive(thiserror::Error, Debug)]
29pub enum FileSinkError {
30 #[error("Failed to create file: {source}, path: {path}")]
32 CreateFile {
33 source: std::io::Error,
34 path: PathBuf,
35 },
36
37 #[error("Failed to spawn thread: {0}")]
39 SpawnThread(std::io::Error),
40
41 #[error("Failed to encode LogMsg: {0}")]
43 LogMsgEncode(#[from] crate::rrd::EncodeError),
44}
45
46#[derive(Debug)]
47enum Command {
48 Send(LogMsg),
49 Flush { on_done: Sender<Result<(), String>> },
50}
51
52impl Command {
53 fn flush() -> (Self, Receiver<Result<(), String>>) {
54 let (tx, rx) = crossbeam::channel::bounded(0); (Self::Flush { on_done: tx }, rx)
56 }
57}
58
59pub struct FileSink {
61 tx: Mutex<Sender<Option<Command>>>,
63 join_handle: Option<std::thread::JoinHandle<()>>,
64
65 path: Option<PathBuf>,
69}
70
71impl Drop for FileSink {
72 fn drop(&mut self) {
73 send_crossbeam(&self.tx.lock(), None).ok();
74 if let Some(join_handle) = self.join_handle.take() {
75 join_handle.join().ok();
76 }
77 }
78}
79
80impl FileSink {
81 pub fn new(path: impl Into<std::path::PathBuf>) -> Result<Self, FileSinkError> {
83 let encoding_options = crate::rrd::EncodingOptions::PROTOBUF_COMPRESSED;
85
86 let (tx, rx) = crossbeam::channel::bounded(1024);
87
88 let path = path.into();
89
90 re_log::debug!("Saving file to {path:?}…");
91
92 let file = std::fs::File::create(&path).map_err(|err| FileSinkError::CreateFile {
97 path: path.clone(),
98 source: err,
99 })?;
100 let encoder =
101 crate::Encoder::new_eager(re_build_info::CrateVersion::LOCAL, encoding_options, file)?;
102 let join_handle = spawn_and_stream(Some(&path), encoder, rx)?;
103
104 Ok(Self {
105 tx: tx.into(),
106 join_handle: Some(join_handle),
107 path: Some(path),
108 })
109 }
110
111 pub fn stdout() -> Result<Self, FileSinkError> {
113 let encoding_options = crate::rrd::EncodingOptions::PROTOBUF_COMPRESSED;
114
115 let (tx, rx) = crossbeam::channel::bounded(1024);
116
117 re_log::debug!("Writing to stdout…");
118
119 let encoder = crate::Encoder::new_eager(
120 re_build_info::CrateVersion::LOCAL,
121 encoding_options,
122 std::io::stdout(),
123 )?;
124 let join_handle = spawn_and_stream(None, encoder, rx)?;
125
126 Ok(Self {
127 tx: tx.into(),
128 join_handle: Some(join_handle),
129 path: None,
130 })
131 }
132
133 #[inline]
134 pub fn flush_blocking(&self, timeout: std::time::Duration) -> Result<(), FileFlushError> {
135 let (cmd, oneshot) = Command::flush();
136 send_crossbeam(&self.tx.lock(), Some(cmd)).map_err(|_ignored| {
137 FileFlushError::failed("File-writer thread shut down prematurely")
138 })?;
139
140 oneshot
141 .recv_timeout(timeout)
142 .map_err(|err| match err {
143 RecvTimeoutError::Timeout => FileFlushError::Timeout,
144 RecvTimeoutError::Disconnected => {
145 FileFlushError::failed("File-writer thread shut down prematurely")
146 }
147 })?
148 .map_err(FileFlushError::failed)
149 }
150
151 #[inline]
152 pub fn send(&self, log_msg: LogMsg) {
153 send_crossbeam(&self.tx.lock(), Some(Command::Send(log_msg))).ok();
154 }
155}
156
157fn spawn_and_stream<W: std::io::Write + Send + 'static>(
159 filepath: Option<&std::path::Path>,
160 mut encoder: crate::Encoder<W>,
161 rx: Receiver<Option<Command>>,
162) -> Result<std::thread::JoinHandle<()>, FileSinkError> {
163 let (name, target) = if let Some(filepath) = filepath {
164 ("file_writer", filepath.display().to_string())
165 } else {
166 ("stdout_writer", "stdout".to_owned())
167 };
168 std::thread::Builder::new()
169 .name(name.into())
170 .spawn({
171 move || {
172 while let Ok(Some(cmd)) = rx.recv() {
173 match cmd {
174 Command::Send(log_msg) => {
175 if let Err(err) = encoder.append(&log_msg) {
176 re_log::error!("Failed to write log stream to {target}: {err}");
177 return;
178 }
179 }
180 Command::Flush { on_done } => {
181 re_log::trace!("Flushing…");
182
183 let result = encoder.flush_blocking().map_err(|err| {
184 format!("Failed to flush log stream to {target}: {err}")
185 });
186
187 if let Err(SendError(result)) = send_crossbeam(&on_done, result)
189 && let Err(err) = result
190 {
191 re_log::error!("{err}");
193 }
194 }
195 }
196 }
197 if let Err(err) = encoder.finish() {
198 re_log::error!("Failed to end log stream for {target}: {err}");
199 return;
200 }
201 re_log::debug!("Log stream written to {target}");
202 }
203 })
204 .map_err(FileSinkError::SpawnThread)
205}
206
207impl fmt::Debug for FileSink {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 f.debug_struct("FileSink")
210 .field(
211 "path",
212 &self.path.clone().unwrap_or_else(|| "stdout".into()),
213 )
214 .finish_non_exhaustive()
215 }
216}