Skip to main content

tailtriage_core/
sink.rs

1use std::fs::{self, OpenOptions};
2use std::io::{BufWriter, Error as IoError, Write};
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use crate::Run;
8
9/// A sink that persists the final run artifact produced at shutdown.
10///
11/// Implement this trait to plug in custom persistence backends.
12///
13/// # Example
14///
15/// ```no_run
16/// use tailtriage_core::{Run, RunSink, SinkError, Tailtriage};
17///
18/// struct StdoutSink;
19///
20/// impl RunSink for StdoutSink {
21///     fn write(&self, run: &Run) -> Result<(), SinkError> {
22///         let bytes = serde_json::to_vec(run).map_err(SinkError::Serialize)?;
23///         println!("{}", String::from_utf8_lossy(&bytes));
24///         Ok(())
25///     }
26/// }
27///
28/// let run = Tailtriage::builder("checkout-service")
29///     .sink(StdoutSink)
30///     .build()?;
31/// # let _ = run;
32/// # Ok::<(), Box<dyn std::error::Error>>(())
33/// ```
34pub trait RunSink {
35    /// Persists a run.
36    ///
37    /// # Errors
38    ///
39    /// Returns [`SinkError`] if the sink cannot write the run output, such as
40    /// when file I/O fails or serialization cannot complete.
41    fn write(&self, run: &Run) -> Result<(), SinkError>;
42}
43
44/// Sink that finalizes capture lifecycle without writing a run artifact.
45///
46/// [`DiscardSink`] intentionally drops the finalized [`Run`] after shutdown and
47/// does not persist any JSON file artifact.
48///
49/// Use [`MemorySink`] instead when you want to keep the finalized [`Run`] for
50/// in-process analysis.
51#[derive(Debug, Clone, Copy, Default)]
52pub struct DiscardSink;
53
54impl RunSink for DiscardSink {
55    fn write(&self, _run: &Run) -> Result<(), SinkError> {
56        Ok(())
57    }
58}
59
60/// In-memory sink that stores only the last finalized run.
61///
62/// [`MemorySink`] writes no file artifact and keeps the most recent finalized
63/// [`Run`] in memory. Later writes replace earlier stored runs.
64///
65/// Storing finalized runs clones captured data and can increase memory use for
66/// large captures.
67#[derive(Debug, Clone, Default)]
68pub struct MemorySink {
69    run: Arc<Mutex<Option<Run>>>,
70}
71
72impl MemorySink {
73    /// Creates a new in-memory sink.
74    #[must_use]
75    pub fn new() -> Self {
76        Self::default()
77    }
78
79    /// Returns a cloned copy of the last finalized run, if present.
80    #[must_use]
81    pub fn last_run(&self) -> Option<Run> {
82        lock_recover(&self.run).clone()
83    }
84
85    /// Takes the last finalized run and clears the stored value.
86    #[must_use]
87    pub fn take_run(&self) -> Option<Run> {
88        lock_recover(&self.run).take()
89    }
90
91    /// Clears any stored finalized run.
92    pub fn clear(&self) {
93        *lock_recover(&self.run) = None;
94    }
95}
96
97impl RunSink for MemorySink {
98    fn write(&self, run: &Run) -> Result<(), SinkError> {
99        *lock_recover(&self.run) = Some(run.clone());
100        Ok(())
101    }
102}
103
104fn lock_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
105    match mutex.lock() {
106        Ok(guard) => guard,
107        Err(poisoned) => poisoned.into_inner(),
108    }
109}
110
111/// Local file sink that writes one JSON document per run at shutdown.
112///
113/// This is the default sink used by [`crate::TailtriageBuilder`].
114#[derive(Debug, Clone)]
115pub struct LocalJsonSink {
116    path: PathBuf,
117}
118
119impl LocalJsonSink {
120    /// Creates a local JSON sink for `path`.
121    #[must_use]
122    pub fn new(path: impl AsRef<Path>) -> Self {
123        Self {
124            path: path.as_ref().to_path_buf(),
125        }
126    }
127
128    /// Returns the target file path used by this sink.
129    #[must_use]
130    pub fn path(&self) -> &Path {
131        &self.path
132    }
133}
134
135impl RunSink for LocalJsonSink {
136    fn write(&self, run: &Run) -> Result<(), SinkError> {
137        let parent = self.path.parent().unwrap_or_else(|| Path::new("."));
138        let temp_path = create_temp_path(parent, &self.path);
139        let write_result = (|| {
140            let file = OpenOptions::new()
141                .write(true)
142                .create_new(true)
143                .open(&temp_path)
144                .map_err(SinkError::Io)?;
145            let mut writer = BufWriter::new(file);
146            serde_json::to_writer_pretty(&mut writer, run).map_err(SinkError::Serialize)?;
147            writer.flush().map_err(SinkError::Io)?;
148            let file = writer
149                .into_inner()
150                .map_err(|err| SinkError::Io(err.into_error()))?;
151            file.sync_all().map_err(SinkError::Io)?;
152            finalize_temp_file(&temp_path, &self.path).map_err(SinkError::Io)
153        })();
154
155        if write_result.is_err() {
156            let _ = fs::remove_file(&temp_path);
157        }
158
159        write_result
160    }
161}
162
163fn create_temp_path(parent: &Path, final_path: &Path) -> PathBuf {
164    let file_name = final_path
165        .file_name()
166        .and_then(std::ffi::OsStr::to_str)
167        .unwrap_or("tailtriage-run.json");
168    let epoch_nanos = SystemTime::now()
169        .duration_since(UNIX_EPOCH)
170        .map_or(0, |duration| duration.as_nanos());
171    parent.join(format!(
172        ".{file_name}.tmp-{}-{epoch_nanos}",
173        std::process::id()
174    ))
175}
176
177fn finalize_temp_file(temp_path: &Path, final_path: &Path) -> Result<(), IoError> {
178    fs::rename(temp_path, final_path)
179}
180
181/// Errors emitted while writing run artifacts.
182#[derive(Debug)]
183pub enum SinkError {
184    /// Underlying I/O failure.
185    Io(IoError),
186    /// Serialization failure.
187    Serialize(serde_json::Error),
188    /// Strict lifecycle validation failure during shutdown.
189    Lifecycle {
190        /// Number of unfinished requests detected at shutdown.
191        unfinished_count: usize,
192    },
193}
194
195impl std::fmt::Display for SinkError {
196    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197        match self {
198            Self::Io(err) => write!(f, "I/O error while writing run output: {err}"),
199            Self::Serialize(err) => {
200                write!(f, "serialization error while writing run output: {err}")
201            }
202            Self::Lifecycle { unfinished_count } => write!(
203                f,
204                "strict lifecycle validation failed: {unfinished_count} unfinished request(s) remained at shutdown"
205            ),
206        }
207    }
208}
209
210impl std::error::Error for SinkError {
211    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
212        match self {
213            Self::Io(err) => Some(err),
214            Self::Serialize(err) => Some(err),
215            Self::Lifecycle { .. } => None,
216        }
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::{
223        finalize_temp_file, lock_recover, DiscardSink, LocalJsonSink, MemorySink, RunSink,
224        SinkError,
225    };
226    use crate::{CaptureMode, Run, RunMetadata, UnfinishedRequests, SCHEMA_VERSION};
227    use std::path::PathBuf;
228    use std::time::{SystemTime, UNIX_EPOCH};
229
230    fn unique_path(suffix: &str) -> PathBuf {
231        let nanos = SystemTime::now()
232            .duration_since(UNIX_EPOCH)
233            .map_or(0, |duration| duration.as_nanos());
234        std::env::temp_dir().join(format!(
235            "tailtriage-core-sink-{suffix}-{}-{nanos}.json",
236            std::process::id()
237        ))
238    }
239
240    fn sample_run() -> Run {
241        Run::new(RunMetadata {
242            run_id: "run-1".to_string(),
243            service_name: "checkout".to_string(),
244            service_version: Some("1.0.0".to_string()),
245            started_at_unix_ms: 1,
246            finished_at_unix_ms: 2,
247            finalized_at_unix_ms: Some(2),
248            mode: CaptureMode::Light,
249            effective_core_config: Some(crate::EffectiveCoreConfig {
250                mode: CaptureMode::Light,
251                capture_limits: CaptureMode::Light.core_defaults(),
252                strict_lifecycle: false,
253            }),
254            effective_tokio_sampler_config: None,
255            host: None,
256            pid: Some(123),
257            lifecycle_warnings: Vec::new(),
258            unfinished_requests: UnfinishedRequests::default(),
259            run_end_reason: None,
260        })
261    }
262
263    #[test]
264    fn local_sink_write_creates_deserializable_artifact() {
265        let output = unique_path("success");
266        let sink = LocalJsonSink::new(&output);
267        let run = sample_run();
268
269        sink.write(&run).expect("write should succeed");
270
271        let bytes = std::fs::read(&output).expect("artifact should be written");
272        let restored: Run = serde_json::from_slice(&bytes).expect("artifact should deserialize");
273        assert_eq!(restored, run);
274        assert_eq!(restored.schema_version, SCHEMA_VERSION);
275
276        let _ = std::fs::remove_file(output);
277    }
278
279    #[test]
280    fn discard_sink_write_succeeds() {
281        let sink = DiscardSink;
282        sink.write(&sample_run()).expect("discard should succeed");
283    }
284
285    #[test]
286    fn memory_sink_replaces_previous_run() {
287        let sink = MemorySink::new();
288        let mut first = sample_run();
289        first.metadata.run_id = "run-first".to_string();
290        sink.write(&first).expect("first write should succeed");
291        assert_eq!(
292            sink.last_run()
293                .expect("run should be present")
294                .metadata
295                .run_id,
296            "run-first"
297        );
298
299        let mut second = sample_run();
300        second.metadata.run_id = "run-second".to_string();
301        sink.write(&second).expect("second write should succeed");
302        assert_eq!(
303            sink.last_run()
304                .expect("run should be present")
305                .metadata
306                .run_id,
307            "run-second"
308        );
309    }
310
311    #[test]
312    fn memory_sink_recovers_from_poisoned_mutex_operations() {
313        let sink = MemorySink::new();
314        {
315            let sink_clone = sink.clone();
316            let _ = std::thread::spawn(move || {
317                let _guard = lock_recover(&sink_clone.run);
318                panic!("poison mutex");
319            })
320            .join();
321        }
322
323        assert!(sink.last_run().is_none(), "last_run should recover");
324        assert!(sink.take_run().is_none(), "take_run should recover");
325        sink.clear();
326        assert!(sink.last_run().is_none(), "clear should recover");
327        sink.write(&sample_run()).expect("write should recover");
328        assert!(sink.last_run().is_some(), "write should store run");
329    }
330
331    #[test]
332    fn local_sink_write_replaces_existing_destination_with_new_run() {
333        let output = unique_path("replace-existing");
334        let sink = LocalJsonSink::new(&output);
335
336        let mut first_run = sample_run();
337        first_run.metadata.run_id = "run-first".to_string();
338        sink.write(&first_run).expect("first write should succeed");
339
340        let mut second_run = sample_run();
341        second_run.metadata.run_id = "run-second".to_string();
342        second_run.requests.push(crate::RequestEvent {
343            request_id: "req-2".to_string(),
344            route: "/checkout".to_string(),
345            kind: Some("http".to_string()),
346            started_at_unix_ms: 10,
347            finished_at_unix_ms: 20,
348            latency_us: 10_000,
349            outcome: "ok".to_string(),
350        });
351        sink.write(&second_run)
352            .expect("second write should replace existing artifact");
353
354        let bytes = std::fs::read(&output).expect("artifact should be written");
355        let restored: Run = serde_json::from_slice(&bytes).expect("artifact should deserialize");
356        assert_eq!(restored, second_run, "existing file should be replaced");
357
358        let _ = std::fs::remove_file(output);
359    }
360
361    #[test]
362    fn failed_finalization_keeps_existing_destination_unchanged() {
363        let output = unique_path("finalization-failure");
364        let original_payload = b"{\"run_id\":\"existing\"}";
365        std::fs::write(&output, original_payload).expect("initial artifact should be writable");
366        let missing_temp = unique_path("missing-temp");
367
368        let err = finalize_temp_file(&missing_temp, &output)
369            .expect_err("finalization should fail when temp is missing");
370        assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
371
372        let final_payload = std::fs::read(&output).expect("existing final artifact should remain");
373        assert_eq!(final_payload, original_payload);
374
375        let _ = std::fs::remove_file(output);
376    }
377
378    #[test]
379    fn local_sink_failed_finalization_cleans_up_temp_file_and_preserves_final_path() {
380        let output = std::env::temp_dir().join(format!(
381            "tailtriage-core-sink-dir-target-{}-{}",
382            std::process::id(),
383            SystemTime::now()
384                .duration_since(UNIX_EPOCH)
385                .map_or(0, |duration| duration.as_nanos())
386        ));
387        std::fs::create_dir_all(&output).expect("directory target should be created");
388        let sink = LocalJsonSink::new(&output);
389
390        let err = sink
391            .write(&sample_run())
392            .expect_err("rename to directory should fail");
393        assert!(matches!(err, SinkError::Io(_)));
394        assert!(
395            output.is_dir(),
396            "existing final directory should remain untouched"
397        );
398
399        let parent = output
400            .parent()
401            .expect("directory target should always have a parent");
402        let final_name = output
403            .file_name()
404            .and_then(std::ffi::OsStr::to_str)
405            .expect("directory target should be valid utf-8 for this test");
406        let temp_prefix = format!(".{final_name}.tmp-");
407        let leftover_temp = std::fs::read_dir(parent)
408            .expect("parent should be readable")
409            .filter_map(Result::ok)
410            .map(|entry| entry.file_name())
411            .filter_map(|name| name.to_str().map(str::to_owned))
412            .any(|name| name.starts_with(&temp_prefix));
413        assert!(
414            !leftover_temp,
415            "temporary file should be cleaned up on failed finalization"
416        );
417
418        let _ = std::fs::remove_dir_all(output);
419    }
420}