1use std::fs::{self, OpenOptions};
2use std::io::{BufWriter, Error as IoError, Write};
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use crate::Run;
8
9pub trait RunSink {
35 fn write(&self, run: &Run) -> Result<(), SinkError>;
42}
43
44#[derive(Debug, Clone, Copy, Default)]
52pub struct DiscardSink;
53
54impl RunSink for DiscardSink {
55 fn write(&self, _run: &Run) -> Result<(), SinkError> {
56 Ok(())
57 }
58}
59
60#[derive(Debug, Clone, Default)]
68pub struct MemorySink {
69 run: Arc<Mutex<Option<Run>>>,
70}
71
72impl MemorySink {
73 #[must_use]
75 pub fn new() -> Self {
76 Self::default()
77 }
78
79 #[must_use]
81 pub fn last_run(&self) -> Option<Run> {
82 lock_recover(&self.run).clone()
83 }
84
85 #[must_use]
87 pub fn take_run(&self) -> Option<Run> {
88 lock_recover(&self.run).take()
89 }
90
91 pub fn clear(&self) {
93 *lock_recover(&self.run) = None;
94 }
95}
96
97impl RunSink for MemorySink {
98 fn write(&self, run: &Run) -> Result<(), SinkError> {
99 *lock_recover(&self.run) = Some(run.clone());
100 Ok(())
101 }
102}
103
104fn lock_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
105 match mutex.lock() {
106 Ok(guard) => guard,
107 Err(poisoned) => poisoned.into_inner(),
108 }
109}
110
111#[derive(Debug, Clone)]
115pub struct LocalJsonSink {
116 path: PathBuf,
117}
118
119impl LocalJsonSink {
120 #[must_use]
122 pub fn new(path: impl AsRef<Path>) -> Self {
123 Self {
124 path: path.as_ref().to_path_buf(),
125 }
126 }
127
128 #[must_use]
130 pub fn path(&self) -> &Path {
131 &self.path
132 }
133}
134
135impl RunSink for LocalJsonSink {
136 fn write(&self, run: &Run) -> Result<(), SinkError> {
137 let parent = self.path.parent().unwrap_or_else(|| Path::new("."));
138 let temp_path = create_temp_path(parent, &self.path);
139 let write_result = (|| {
140 let file = OpenOptions::new()
141 .write(true)
142 .create_new(true)
143 .open(&temp_path)
144 .map_err(SinkError::Io)?;
145 let mut writer = BufWriter::new(file);
146 serde_json::to_writer_pretty(&mut writer, run).map_err(SinkError::Serialize)?;
147 writer.flush().map_err(SinkError::Io)?;
148 let file = writer
149 .into_inner()
150 .map_err(|err| SinkError::Io(err.into_error()))?;
151 file.sync_all().map_err(SinkError::Io)?;
152 finalize_temp_file(&temp_path, &self.path).map_err(SinkError::Io)
153 })();
154
155 if write_result.is_err() {
156 let _ = fs::remove_file(&temp_path);
157 }
158
159 write_result
160 }
161}
162
163fn create_temp_path(parent: &Path, final_path: &Path) -> PathBuf {
164 let file_name = final_path
165 .file_name()
166 .and_then(std::ffi::OsStr::to_str)
167 .unwrap_or("tailtriage-run.json");
168 let epoch_nanos = SystemTime::now()
169 .duration_since(UNIX_EPOCH)
170 .map_or(0, |duration| duration.as_nanos());
171 parent.join(format!(
172 ".{file_name}.tmp-{}-{epoch_nanos}",
173 std::process::id()
174 ))
175}
176
177fn finalize_temp_file(temp_path: &Path, final_path: &Path) -> Result<(), IoError> {
178 fs::rename(temp_path, final_path)
179}
180
181#[derive(Debug)]
183pub enum SinkError {
184 Io(IoError),
186 Serialize(serde_json::Error),
188 Lifecycle {
190 unfinished_count: usize,
192 },
193}
194
195impl std::fmt::Display for SinkError {
196 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 match self {
198 Self::Io(err) => write!(f, "I/O error while writing run output: {err}"),
199 Self::Serialize(err) => {
200 write!(f, "serialization error while writing run output: {err}")
201 }
202 Self::Lifecycle { unfinished_count } => write!(
203 f,
204 "strict lifecycle validation failed: {unfinished_count} unfinished request(s) remained at shutdown"
205 ),
206 }
207 }
208}
209
210impl std::error::Error for SinkError {
211 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
212 match self {
213 Self::Io(err) => Some(err),
214 Self::Serialize(err) => Some(err),
215 Self::Lifecycle { .. } => None,
216 }
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::{
223 finalize_temp_file, lock_recover, DiscardSink, LocalJsonSink, MemorySink, RunSink,
224 SinkError,
225 };
226 use crate::{CaptureMode, Run, RunMetadata, UnfinishedRequests, SCHEMA_VERSION};
227 use std::path::PathBuf;
228 use std::time::{SystemTime, UNIX_EPOCH};
229
230 fn unique_path(suffix: &str) -> PathBuf {
231 let nanos = SystemTime::now()
232 .duration_since(UNIX_EPOCH)
233 .map_or(0, |duration| duration.as_nanos());
234 std::env::temp_dir().join(format!(
235 "tailtriage-core-sink-{suffix}-{}-{nanos}.json",
236 std::process::id()
237 ))
238 }
239
240 fn sample_run() -> Run {
241 Run::new(RunMetadata {
242 run_id: "run-1".to_string(),
243 service_name: "checkout".to_string(),
244 service_version: Some("1.0.0".to_string()),
245 started_at_unix_ms: 1,
246 finished_at_unix_ms: 2,
247 finalized_at_unix_ms: Some(2),
248 mode: CaptureMode::Light,
249 effective_core_config: Some(crate::EffectiveCoreConfig {
250 mode: CaptureMode::Light,
251 capture_limits: CaptureMode::Light.core_defaults(),
252 strict_lifecycle: false,
253 }),
254 effective_tokio_sampler_config: None,
255 host: None,
256 pid: Some(123),
257 lifecycle_warnings: Vec::new(),
258 unfinished_requests: UnfinishedRequests::default(),
259 run_end_reason: None,
260 })
261 }
262
263 #[test]
264 fn local_sink_write_creates_deserializable_artifact() {
265 let output = unique_path("success");
266 let sink = LocalJsonSink::new(&output);
267 let run = sample_run();
268
269 sink.write(&run).expect("write should succeed");
270
271 let bytes = std::fs::read(&output).expect("artifact should be written");
272 let restored: Run = serde_json::from_slice(&bytes).expect("artifact should deserialize");
273 assert_eq!(restored, run);
274 assert_eq!(restored.schema_version, SCHEMA_VERSION);
275
276 let _ = std::fs::remove_file(output);
277 }
278
279 #[test]
280 fn discard_sink_write_succeeds() {
281 let sink = DiscardSink;
282 sink.write(&sample_run()).expect("discard should succeed");
283 }
284
285 #[test]
286 fn memory_sink_replaces_previous_run() {
287 let sink = MemorySink::new();
288 let mut first = sample_run();
289 first.metadata.run_id = "run-first".to_string();
290 sink.write(&first).expect("first write should succeed");
291 assert_eq!(
292 sink.last_run()
293 .expect("run should be present")
294 .metadata
295 .run_id,
296 "run-first"
297 );
298
299 let mut second = sample_run();
300 second.metadata.run_id = "run-second".to_string();
301 sink.write(&second).expect("second write should succeed");
302 assert_eq!(
303 sink.last_run()
304 .expect("run should be present")
305 .metadata
306 .run_id,
307 "run-second"
308 );
309 }
310
311 #[test]
312 fn memory_sink_recovers_from_poisoned_mutex_operations() {
313 let sink = MemorySink::new();
314 {
315 let sink_clone = sink.clone();
316 let _ = std::thread::spawn(move || {
317 let _guard = lock_recover(&sink_clone.run);
318 panic!("poison mutex");
319 })
320 .join();
321 }
322
323 assert!(sink.last_run().is_none(), "last_run should recover");
324 assert!(sink.take_run().is_none(), "take_run should recover");
325 sink.clear();
326 assert!(sink.last_run().is_none(), "clear should recover");
327 sink.write(&sample_run()).expect("write should recover");
328 assert!(sink.last_run().is_some(), "write should store run");
329 }
330
331 #[test]
332 fn local_sink_write_replaces_existing_destination_with_new_run() {
333 let output = unique_path("replace-existing");
334 let sink = LocalJsonSink::new(&output);
335
336 let mut first_run = sample_run();
337 first_run.metadata.run_id = "run-first".to_string();
338 sink.write(&first_run).expect("first write should succeed");
339
340 let mut second_run = sample_run();
341 second_run.metadata.run_id = "run-second".to_string();
342 second_run.requests.push(crate::RequestEvent {
343 request_id: "req-2".to_string(),
344 route: "/checkout".to_string(),
345 kind: Some("http".to_string()),
346 started_at_unix_ms: 10,
347 finished_at_unix_ms: 20,
348 latency_us: 10_000,
349 outcome: "ok".to_string(),
350 });
351 sink.write(&second_run)
352 .expect("second write should replace existing artifact");
353
354 let bytes = std::fs::read(&output).expect("artifact should be written");
355 let restored: Run = serde_json::from_slice(&bytes).expect("artifact should deserialize");
356 assert_eq!(restored, second_run, "existing file should be replaced");
357
358 let _ = std::fs::remove_file(output);
359 }
360
361 #[test]
362 fn failed_finalization_keeps_existing_destination_unchanged() {
363 let output = unique_path("finalization-failure");
364 let original_payload = b"{\"run_id\":\"existing\"}";
365 std::fs::write(&output, original_payload).expect("initial artifact should be writable");
366 let missing_temp = unique_path("missing-temp");
367
368 let err = finalize_temp_file(&missing_temp, &output)
369 .expect_err("finalization should fail when temp is missing");
370 assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
371
372 let final_payload = std::fs::read(&output).expect("existing final artifact should remain");
373 assert_eq!(final_payload, original_payload);
374
375 let _ = std::fs::remove_file(output);
376 }
377
378 #[test]
379 fn local_sink_failed_finalization_cleans_up_temp_file_and_preserves_final_path() {
380 let output = std::env::temp_dir().join(format!(
381 "tailtriage-core-sink-dir-target-{}-{}",
382 std::process::id(),
383 SystemTime::now()
384 .duration_since(UNIX_EPOCH)
385 .map_or(0, |duration| duration.as_nanos())
386 ));
387 std::fs::create_dir_all(&output).expect("directory target should be created");
388 let sink = LocalJsonSink::new(&output);
389
390 let err = sink
391 .write(&sample_run())
392 .expect_err("rename to directory should fail");
393 assert!(matches!(err, SinkError::Io(_)));
394 assert!(
395 output.is_dir(),
396 "existing final directory should remain untouched"
397 );
398
399 let parent = output
400 .parent()
401 .expect("directory target should always have a parent");
402 let final_name = output
403 .file_name()
404 .and_then(std::ffi::OsStr::to_str)
405 .expect("directory target should be valid utf-8 for this test");
406 let temp_prefix = format!(".{final_name}.tmp-");
407 let leftover_temp = std::fs::read_dir(parent)
408 .expect("parent should be readable")
409 .filter_map(Result::ok)
410 .map(|entry| entry.file_name())
411 .filter_map(|name| name.to_str().map(str::to_owned))
412 .any(|name| name.starts_with(&temp_prefix));
413 assert!(
414 !leftover_temp,
415 "temporary file should be cleaned up on failed finalization"
416 );
417
418 let _ = std::fs::remove_dir_all(output);
419 }
420}