1use std::fs::{self, OpenOptions};
2use std::io::{BufWriter, Error as IoError, Write};
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use crate::Run;
7
8pub trait RunSink {
10 fn write(&self, run: &Run) -> Result<(), SinkError>;
17}
18
19#[derive(Debug, Clone)]
21pub struct LocalJsonSink {
22 path: PathBuf,
23}
24
25impl LocalJsonSink {
26 #[must_use]
28 pub fn new(path: impl AsRef<Path>) -> Self {
29 Self {
30 path: path.as_ref().to_path_buf(),
31 }
32 }
33
34 #[must_use]
36 pub fn path(&self) -> &Path {
37 &self.path
38 }
39}
40
41impl RunSink for LocalJsonSink {
42 fn write(&self, run: &Run) -> Result<(), SinkError> {
43 let parent = self.path.parent().unwrap_or_else(|| Path::new("."));
44 let temp_path = create_temp_path(parent, &self.path);
45 let write_result = (|| {
46 let file = OpenOptions::new()
47 .write(true)
48 .create_new(true)
49 .open(&temp_path)
50 .map_err(SinkError::Io)?;
51 let mut writer = BufWriter::new(file);
52 serde_json::to_writer_pretty(&mut writer, run).map_err(SinkError::Serialize)?;
53 writer.flush().map_err(SinkError::Io)?;
54 let file = writer
55 .into_inner()
56 .map_err(|err| SinkError::Io(err.into_error()))?;
57 file.sync_all().map_err(SinkError::Io)?;
58 finalize_temp_file(&temp_path, &self.path).map_err(SinkError::Io)
59 })();
60
61 if write_result.is_err() {
62 let _ = fs::remove_file(&temp_path);
63 }
64
65 write_result
66 }
67}
68
69fn create_temp_path(parent: &Path, final_path: &Path) -> PathBuf {
70 let file_name = final_path
71 .file_name()
72 .and_then(std::ffi::OsStr::to_str)
73 .unwrap_or("tailtriage-run.json");
74 let epoch_nanos = SystemTime::now()
75 .duration_since(UNIX_EPOCH)
76 .map_or(0, |duration| duration.as_nanos());
77 parent.join(format!(
78 ".{file_name}.tmp-{}-{epoch_nanos}",
79 std::process::id()
80 ))
81}
82
83#[cfg(windows)]
84fn create_backup_path(final_path: &Path) -> PathBuf {
85 let parent = final_path.parent().unwrap_or_else(|| Path::new("."));
86 let file_name = final_path
87 .file_name()
88 .and_then(std::ffi::OsStr::to_str)
89 .unwrap_or("tailtriage-run.json");
90 let epoch_nanos = SystemTime::now()
91 .duration_since(UNIX_EPOCH)
92 .map_or(0, |duration| duration.as_nanos());
93 parent.join(format!(
94 ".{file_name}.bak-{}-{epoch_nanos}",
95 std::process::id()
96 ))
97}
98
99fn finalize_temp_file(temp_path: &Path, final_path: &Path) -> Result<(), IoError> {
100 #[cfg(unix)]
101 {
102 fs::rename(temp_path, final_path)
103 }
104
105 #[cfg(windows)]
106 {
107 match fs::rename(temp_path, final_path) {
108 Ok(()) => Ok(()),
109 Err(first_err) if final_path.is_file() && temp_path.is_file() => {
110 let backup_path = create_backup_path(final_path);
114 fs::rename(final_path, &backup_path)?;
115
116 match fs::rename(temp_path, final_path) {
117 Ok(()) => {
118 let _ = fs::remove_file(&backup_path);
119 Ok(())
120 }
121 Err(second_err) => {
122 let restore_result = fs::rename(&backup_path, final_path);
123 match restore_result {
124 Ok(()) => Err(IoError::new(
125 second_err.kind(),
126 format!(
127 "failed to finalize run output after preserving existing destination: first rename error: {first_err}; second rename error: {second_err}"
128 ),
129 )),
130 Err(restore_err) => Err(IoError::new(
131 second_err.kind(),
132 format!(
133 "failed to finalize run output and failed to restore existing destination: first rename error: {first_err}; second rename error: {second_err}; restore error: {restore_err}"
134 ),
135 )),
136 }
137 }
138 }
139 }
140 Err(err) => Err(err),
141 }
142 }
143
144 #[cfg(not(any(unix, windows)))]
145 {
146 fs::rename(temp_path, final_path)
147 }
148}
149
150#[derive(Debug)]
152pub enum SinkError {
153 Io(IoError),
155 Serialize(serde_json::Error),
157 Lifecycle {
159 unfinished_count: usize,
161 },
162}
163
164impl std::fmt::Display for SinkError {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 match self {
167 Self::Io(err) => write!(f, "I/O error while writing run output: {err}"),
168 Self::Serialize(err) => {
169 write!(f, "serialization error while writing run output: {err}")
170 }
171 Self::Lifecycle { unfinished_count } => write!(
172 f,
173 "strict lifecycle validation failed: {unfinished_count} unfinished request(s) remained at shutdown"
174 ),
175 }
176 }
177}
178
179impl std::error::Error for SinkError {
180 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
181 match self {
182 Self::Io(err) => Some(err),
183 Self::Serialize(err) => Some(err),
184 Self::Lifecycle { .. } => None,
185 }
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::{finalize_temp_file, LocalJsonSink, RunSink, SinkError};
192 use crate::{CaptureMode, Run, RunMetadata, UnfinishedRequests, SCHEMA_VERSION};
193 use std::path::PathBuf;
194 use std::time::{SystemTime, UNIX_EPOCH};
195
196 fn unique_path(suffix: &str) -> PathBuf {
197 let nanos = SystemTime::now()
198 .duration_since(UNIX_EPOCH)
199 .map_or(0, |duration| duration.as_nanos());
200 std::env::temp_dir().join(format!(
201 "tailtriage-core-sink-{suffix}-{}-{nanos}.json",
202 std::process::id()
203 ))
204 }
205
206 fn sample_run() -> Run {
207 Run::new(RunMetadata {
208 run_id: "run-1".to_string(),
209 service_name: "checkout".to_string(),
210 service_version: Some("1.0.0".to_string()),
211 started_at_unix_ms: 1,
212 finished_at_unix_ms: 2,
213 mode: CaptureMode::Light,
214 host: None,
215 pid: Some(123),
216 lifecycle_warnings: Vec::new(),
217 unfinished_requests: UnfinishedRequests::default(),
218 })
219 }
220
221 #[test]
222 fn local_sink_write_creates_deserializable_artifact() {
223 let output = unique_path("success");
224 let sink = LocalJsonSink::new(&output);
225 let run = sample_run();
226
227 sink.write(&run).expect("write should succeed");
228
229 let bytes = std::fs::read(&output).expect("artifact should be written");
230 let restored: Run = serde_json::from_slice(&bytes).expect("artifact should deserialize");
231 assert_eq!(restored, run);
232 assert_eq!(restored.schema_version, SCHEMA_VERSION);
233
234 let _ = std::fs::remove_file(output);
235 }
236
237 #[test]
238 fn local_sink_write_replaces_existing_destination_with_new_run() {
239 let output = unique_path("replace-existing");
240 let sink = LocalJsonSink::new(&output);
241
242 let mut first_run = sample_run();
243 first_run.metadata.run_id = "run-first".to_string();
244 sink.write(&first_run).expect("first write should succeed");
245
246 let mut second_run = sample_run();
247 second_run.metadata.run_id = "run-second".to_string();
248 second_run.requests.push(crate::RequestEvent {
249 request_id: "req-2".to_string(),
250 route: "/checkout".to_string(),
251 kind: Some("http".to_string()),
252 started_at_unix_ms: 10,
253 finished_at_unix_ms: 20,
254 latency_us: 10_000,
255 outcome: "ok".to_string(),
256 });
257 sink.write(&second_run)
258 .expect("second write should replace existing artifact");
259
260 let bytes = std::fs::read(&output).expect("artifact should be written");
261 let restored: Run = serde_json::from_slice(&bytes).expect("artifact should deserialize");
262 assert_eq!(restored, second_run, "existing file should be replaced");
263
264 let _ = std::fs::remove_file(output);
265 }
266
267 #[test]
268 fn failed_finalization_keeps_existing_destination_unchanged() {
269 let output = unique_path("finalization-failure");
270 let original_payload = b"{\"run_id\":\"existing\"}";
271 std::fs::write(&output, original_payload).expect("initial artifact should be writable");
272 let missing_temp = unique_path("missing-temp");
273
274 let err = finalize_temp_file(&missing_temp, &output)
275 .expect_err("finalization should fail when temp is missing");
276 assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
277
278 let final_payload = std::fs::read(&output).expect("existing final artifact should remain");
279 assert_eq!(final_payload, original_payload);
280
281 let _ = std::fs::remove_file(output);
282 }
283
284 #[test]
285 fn local_sink_failed_finalization_cleans_up_temp_file_and_preserves_final_path() {
286 let output = std::env::temp_dir().join(format!(
287 "tailtriage-core-sink-dir-target-{}-{}",
288 std::process::id(),
289 SystemTime::now()
290 .duration_since(UNIX_EPOCH)
291 .map_or(0, |duration| duration.as_nanos())
292 ));
293 std::fs::create_dir_all(&output).expect("directory target should be created");
294 let sink = LocalJsonSink::new(&output);
295
296 let err = sink
297 .write(&sample_run())
298 .expect_err("rename to directory should fail");
299 assert!(matches!(err, SinkError::Io(_)));
300 assert!(
301 output.is_dir(),
302 "existing final directory should remain untouched"
303 );
304
305 let parent = output
306 .parent()
307 .expect("directory target should always have a parent");
308 let final_name = output
309 .file_name()
310 .and_then(std::ffi::OsStr::to_str)
311 .expect("directory target should be valid utf-8 for this test");
312 let temp_prefix = format!(".{final_name}.tmp-");
313 let leftover_temp = std::fs::read_dir(parent)
314 .expect("parent should be readable")
315 .filter_map(Result::ok)
316 .map(|entry| entry.file_name())
317 .filter_map(|name| name.to_str().map(str::to_owned))
318 .any(|name| name.starts_with(&temp_prefix));
319 assert!(
320 !leftover_temp,
321 "temporary file should be cleaned up on failed finalization"
322 );
323
324 let _ = std::fs::remove_dir_all(output);
325 }
326}