Skip to main content

tailtriage_core/
sink.rs

1use std::fs::{self, OpenOptions};
2use std::io::{BufWriter, Error as IoError, Write};
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use crate::Run;
7
8/// A sink that can persist a run artifact.
9pub trait RunSink {
10    /// Persists a run.
11    ///
12    /// # Errors
13    ///
14    /// Returns [`SinkError`] if the sink cannot write the run output, such as
15    /// when file I/O fails or serialization cannot complete.
16    fn write(&self, run: &Run) -> Result<(), SinkError>;
17}
18
19/// Local file sink that writes one JSON document per run.
20#[derive(Debug, Clone)]
21pub struct LocalJsonSink {
22    path: PathBuf,
23}
24
25impl LocalJsonSink {
26    /// Creates a local JSON sink for `path`.
27    #[must_use]
28    pub fn new(path: impl AsRef<Path>) -> Self {
29        Self {
30            path: path.as_ref().to_path_buf(),
31        }
32    }
33
34    /// Returns the target file path used by this sink.
35    #[must_use]
36    pub fn path(&self) -> &Path {
37        &self.path
38    }
39}
40
41impl RunSink for LocalJsonSink {
42    fn write(&self, run: &Run) -> Result<(), SinkError> {
43        let parent = self.path.parent().unwrap_or_else(|| Path::new("."));
44        let temp_path = create_temp_path(parent, &self.path);
45        let write_result = (|| {
46            let file = OpenOptions::new()
47                .write(true)
48                .create_new(true)
49                .open(&temp_path)
50                .map_err(SinkError::Io)?;
51            let mut writer = BufWriter::new(file);
52            serde_json::to_writer_pretty(&mut writer, run).map_err(SinkError::Serialize)?;
53            writer.flush().map_err(SinkError::Io)?;
54            let file = writer
55                .into_inner()
56                .map_err(|err| SinkError::Io(err.into_error()))?;
57            file.sync_all().map_err(SinkError::Io)?;
58            finalize_temp_file(&temp_path, &self.path).map_err(SinkError::Io)
59        })();
60
61        if write_result.is_err() {
62            let _ = fs::remove_file(&temp_path);
63        }
64
65        write_result
66    }
67}
68
69fn create_temp_path(parent: &Path, final_path: &Path) -> PathBuf {
70    let file_name = final_path
71        .file_name()
72        .and_then(std::ffi::OsStr::to_str)
73        .unwrap_or("tailtriage-run.json");
74    let epoch_nanos = SystemTime::now()
75        .duration_since(UNIX_EPOCH)
76        .map_or(0, |duration| duration.as_nanos());
77    parent.join(format!(
78        ".{file_name}.tmp-{}-{epoch_nanos}",
79        std::process::id()
80    ))
81}
82
83#[cfg(windows)]
84fn create_backup_path(final_path: &Path) -> PathBuf {
85    let parent = final_path.parent().unwrap_or_else(|| Path::new("."));
86    let file_name = final_path
87        .file_name()
88        .and_then(std::ffi::OsStr::to_str)
89        .unwrap_or("tailtriage-run.json");
90    let epoch_nanos = SystemTime::now()
91        .duration_since(UNIX_EPOCH)
92        .map_or(0, |duration| duration.as_nanos());
93    parent.join(format!(
94        ".{file_name}.bak-{}-{epoch_nanos}",
95        std::process::id()
96    ))
97}
98
99fn finalize_temp_file(temp_path: &Path, final_path: &Path) -> Result<(), IoError> {
100    #[cfg(unix)]
101    {
102        fs::rename(temp_path, final_path)
103    }
104
105    #[cfg(windows)]
106    {
107        match fs::rename(temp_path, final_path) {
108            Ok(()) => Ok(()),
109            Err(first_err) if final_path.is_file() && temp_path.is_file() => {
110                // Windows does not replace an existing destination on rename.
111                // Preserve the existing destination by moving it aside first,
112                // then restore it if the second rename fails.
113                let backup_path = create_backup_path(final_path);
114                fs::rename(final_path, &backup_path)?;
115
116                match fs::rename(temp_path, final_path) {
117                    Ok(()) => {
118                        let _ = fs::remove_file(&backup_path);
119                        Ok(())
120                    }
121                    Err(second_err) => {
122                        let restore_result = fs::rename(&backup_path, final_path);
123                        match restore_result {
124                            Ok(()) => Err(IoError::new(
125                                second_err.kind(),
126                                format!(
127                                    "failed to finalize run output after preserving existing destination: first rename error: {first_err}; second rename error: {second_err}"
128                                ),
129                            )),
130                            Err(restore_err) => Err(IoError::new(
131                                second_err.kind(),
132                                format!(
133                                    "failed to finalize run output and failed to restore existing destination: first rename error: {first_err}; second rename error: {second_err}; restore error: {restore_err}"
134                                ),
135                            )),
136                        }
137                    }
138                }
139            }
140            Err(err) => Err(err),
141        }
142    }
143
144    #[cfg(not(any(unix, windows)))]
145    {
146        fs::rename(temp_path, final_path)
147    }
148}
149
150/// Errors emitted while writing run artifacts.
151#[derive(Debug)]
152pub enum SinkError {
153    /// Underlying I/O failure.
154    Io(IoError),
155    /// Serialization failure.
156    Serialize(serde_json::Error),
157    /// Strict lifecycle validation failure during shutdown.
158    Lifecycle {
159        /// Number of unfinished requests detected at shutdown.
160        unfinished_count: usize,
161    },
162}
163
164impl std::fmt::Display for SinkError {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        match self {
167            Self::Io(err) => write!(f, "I/O error while writing run output: {err}"),
168            Self::Serialize(err) => {
169                write!(f, "serialization error while writing run output: {err}")
170            }
171            Self::Lifecycle { unfinished_count } => write!(
172                f,
173                "strict lifecycle validation failed: {unfinished_count} unfinished request(s) remained at shutdown"
174            ),
175        }
176    }
177}
178
179impl std::error::Error for SinkError {
180    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
181        match self {
182            Self::Io(err) => Some(err),
183            Self::Serialize(err) => Some(err),
184            Self::Lifecycle { .. } => None,
185        }
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::{finalize_temp_file, LocalJsonSink, RunSink, SinkError};
192    use crate::{CaptureMode, Run, RunMetadata, UnfinishedRequests, SCHEMA_VERSION};
193    use std::path::PathBuf;
194    use std::time::{SystemTime, UNIX_EPOCH};
195
196    fn unique_path(suffix: &str) -> PathBuf {
197        let nanos = SystemTime::now()
198            .duration_since(UNIX_EPOCH)
199            .map_or(0, |duration| duration.as_nanos());
200        std::env::temp_dir().join(format!(
201            "tailtriage-core-sink-{suffix}-{}-{nanos}.json",
202            std::process::id()
203        ))
204    }
205
206    fn sample_run() -> Run {
207        Run::new(RunMetadata {
208            run_id: "run-1".to_string(),
209            service_name: "checkout".to_string(),
210            service_version: Some("1.0.0".to_string()),
211            started_at_unix_ms: 1,
212            finished_at_unix_ms: 2,
213            mode: CaptureMode::Light,
214            host: None,
215            pid: Some(123),
216            lifecycle_warnings: Vec::new(),
217            unfinished_requests: UnfinishedRequests::default(),
218        })
219    }
220
221    #[test]
222    fn local_sink_write_creates_deserializable_artifact() {
223        let output = unique_path("success");
224        let sink = LocalJsonSink::new(&output);
225        let run = sample_run();
226
227        sink.write(&run).expect("write should succeed");
228
229        let bytes = std::fs::read(&output).expect("artifact should be written");
230        let restored: Run = serde_json::from_slice(&bytes).expect("artifact should deserialize");
231        assert_eq!(restored, run);
232        assert_eq!(restored.schema_version, SCHEMA_VERSION);
233
234        let _ = std::fs::remove_file(output);
235    }
236
237    #[test]
238    fn local_sink_write_replaces_existing_destination_with_new_run() {
239        let output = unique_path("replace-existing");
240        let sink = LocalJsonSink::new(&output);
241
242        let mut first_run = sample_run();
243        first_run.metadata.run_id = "run-first".to_string();
244        sink.write(&first_run).expect("first write should succeed");
245
246        let mut second_run = sample_run();
247        second_run.metadata.run_id = "run-second".to_string();
248        second_run.requests.push(crate::RequestEvent {
249            request_id: "req-2".to_string(),
250            route: "/checkout".to_string(),
251            kind: Some("http".to_string()),
252            started_at_unix_ms: 10,
253            finished_at_unix_ms: 20,
254            latency_us: 10_000,
255            outcome: "ok".to_string(),
256        });
257        sink.write(&second_run)
258            .expect("second write should replace existing artifact");
259
260        let bytes = std::fs::read(&output).expect("artifact should be written");
261        let restored: Run = serde_json::from_slice(&bytes).expect("artifact should deserialize");
262        assert_eq!(restored, second_run, "existing file should be replaced");
263
264        let _ = std::fs::remove_file(output);
265    }
266
267    #[test]
268    fn failed_finalization_keeps_existing_destination_unchanged() {
269        let output = unique_path("finalization-failure");
270        let original_payload = b"{\"run_id\":\"existing\"}";
271        std::fs::write(&output, original_payload).expect("initial artifact should be writable");
272        let missing_temp = unique_path("missing-temp");
273
274        let err = finalize_temp_file(&missing_temp, &output)
275            .expect_err("finalization should fail when temp is missing");
276        assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
277
278        let final_payload = std::fs::read(&output).expect("existing final artifact should remain");
279        assert_eq!(final_payload, original_payload);
280
281        let _ = std::fs::remove_file(output);
282    }
283
284    #[test]
285    fn local_sink_failed_finalization_cleans_up_temp_file_and_preserves_final_path() {
286        let output = std::env::temp_dir().join(format!(
287            "tailtriage-core-sink-dir-target-{}-{}",
288            std::process::id(),
289            SystemTime::now()
290                .duration_since(UNIX_EPOCH)
291                .map_or(0, |duration| duration.as_nanos())
292        ));
293        std::fs::create_dir_all(&output).expect("directory target should be created");
294        let sink = LocalJsonSink::new(&output);
295
296        let err = sink
297            .write(&sample_run())
298            .expect_err("rename to directory should fail");
299        assert!(matches!(err, SinkError::Io(_)));
300        assert!(
301            output.is_dir(),
302            "existing final directory should remain untouched"
303        );
304
305        let parent = output
306            .parent()
307            .expect("directory target should always have a parent");
308        let final_name = output
309            .file_name()
310            .and_then(std::ffi::OsStr::to_str)
311            .expect("directory target should be valid utf-8 for this test");
312        let temp_prefix = format!(".{final_name}.tmp-");
313        let leftover_temp = std::fs::read_dir(parent)
314            .expect("parent should be readable")
315            .filter_map(Result::ok)
316            .map(|entry| entry.file_name())
317            .filter_map(|name| name.to_str().map(str::to_owned))
318            .any(|name| name.starts_with(&temp_prefix));
319        assert!(
320            !leftover_temp,
321            "temporary file should be cleaned up on failed finalization"
322        );
323
324        let _ = std::fs::remove_dir_all(output);
325    }
326}