1use chrono::{DateTime, Utc};
32use serde_json::Value;
33use std::io::Write;
34use std::path::{Path, PathBuf};
35use uuid::Uuid;
36
37use crate::atomic_file::atomic_replace_file;
38use crate::detection::CompressionFormat;
39use crate::diagnostics::{Diagnostic, DiagnosticCode, DiagnosticCollector};
40use crate::diff;
41use crate::events::{Event, Observation};
42
43#[derive(Debug, Clone)]
45pub enum FinishStrategy {
46 FlushOnly,
50
51 AtomicReplace {
54 temp_path: PathBuf,
55 output_path: PathBuf,
56 },
57}
58
59pub struct WriteContext<W: Write> {
65 pub writer: W,
67
68 pub current_state: Value,
70
71 pub observation_count: usize,
73
74 pub snapshot_interval: Option<usize>,
76
77 pub finish_strategy: FinishStrategy,
79
80 pub diagnostics: DiagnosticCollector,
82}
83
84impl<W: Write> WriteContext<W> {
85 pub fn new(
87 writer: W,
88 current_state: Value,
89 observation_count: usize,
90 snapshot_interval: Option<usize>,
91 finish_strategy: FinishStrategy,
92 ) -> Self {
93 Self {
94 writer,
95 current_state,
96 observation_count,
97 snapshot_interval,
98 finish_strategy,
99 diagnostics: DiagnosticCollector::new(),
100 }
101 }
102
103 pub fn with_diagnostics(
105 writer: W,
106 current_state: Value,
107 observation_count: usize,
108 snapshot_interval: Option<usize>,
109 finish_strategy: FinishStrategy,
110 diagnostics: DiagnosticCollector,
111 ) -> Self {
112 Self {
113 writer,
114 current_state,
115 observation_count,
116 snapshot_interval,
117 finish_strategy,
118 diagnostics,
119 }
120 }
121
122 pub fn write_observations<P: AsRef<Path>>(
133 &mut self,
134 files: &[P],
135 ) -> Result<usize, Vec<Diagnostic>> {
136 let mut observations_written = 0;
137
138 for file_path in files.iter() {
139 let file_path = file_path.as_ref();
140
141 if let Err(e) = writeln!(self.writer, "# Processing file: {}", file_path.display()) {
143 return Err(vec![Diagnostic::fatal(
144 DiagnosticCode::PathNotFound,
145 format!("I couldn't write to the output: {}", e),
146 )]);
147 }
148
149 let file_mtime = get_file_mtime(file_path)?;
151
152 let content = std::fs::read_to_string(file_path).map_err(|e| {
154 vec![Diagnostic::fatal(
155 DiagnosticCode::PathNotFound,
156 format!("I couldn't read the input file '{}': {}", file_path.display(), e),
157 )]
158 })?;
159
160 let new_state: Value = serde_json::from_str(&content).map_err(|e| {
161 vec![Diagnostic::fatal(
162 DiagnosticCode::InvalidEventJson,
163 format!("I couldn't parse '{}' as JSON: {}", file_path.display(), e),
164 )
165 .with_advice("Make sure the file contains valid JSON.".to_string())]
166 })?;
167
168 let observation_id = format!("obs-{}", Uuid::new_v4());
170 let diff_events = diff::diff(&self.current_state, &new_state, "", &observation_id);
171
172 if diff_events.is_empty() {
174 continue;
175 }
176
177 let mut observation = Observation::new(observation_id, file_mtime);
179 for event in diff_events {
180 observation.add_event(event);
181 }
182
183 self.write_observation(observation)?;
184 observations_written += 1;
185 self.observation_count += 1;
186
187 if self.should_write_snapshot() {
189 self.write_snapshot(&new_state, file_mtime)?;
190 }
191
192 self.current_state = new_state;
194 }
195
196 Ok(observations_written)
197 }
198
199 fn write_observation(&mut self, observation: Observation) -> Result<(), Vec<Diagnostic>> {
201 for event in observation.to_events() {
202 let event_json = serde_json::to_string(&event).map_err(|e| {
203 vec![Diagnostic::fatal(
204 DiagnosticCode::InvalidEventJson,
205 format!("I couldn't serialize an event to JSON: {}", e),
206 )]
207 })?;
208
209 writeln!(self.writer, "{}", event_json).map_err(|e| {
210 vec![Diagnostic::fatal(
211 DiagnosticCode::PathNotFound,
212 format!("I couldn't write to the output: {}", e),
213 )]
214 })?;
215 }
216
217 Ok(())
218 }
219
220 fn should_write_snapshot(&self) -> bool {
222 if let Some(interval) = self.snapshot_interval {
223 self.observation_count > 0 && self.observation_count % interval == 0
224 } else {
225 false
226 }
227 }
228
229 fn write_snapshot(&mut self, state: &Value, timestamp: DateTime<Utc>) -> Result<(), Vec<Diagnostic>> {
231 let snapshot_id = format!("snapshot-{}", Uuid::new_v4());
232 let snapshot = Event::Snapshot {
233 observation_id: snapshot_id,
234 timestamp,
235 object: state.clone(),
236 };
237
238 let snapshot_json = serde_json::to_string(&snapshot).map_err(|e| {
239 vec![Diagnostic::fatal(
240 DiagnosticCode::InvalidEventJson,
241 format!("I couldn't serialize the snapshot to JSON: {}", e),
242 )]
243 })?;
244
245 writeln!(self.writer, "{}", snapshot_json).map_err(|e| {
246 vec![Diagnostic::fatal(
247 DiagnosticCode::PathNotFound,
248 format!("I couldn't write to the output: {}", e),
249 )]
250 })?;
251
252 Ok(())
253 }
254
255 pub fn finish(mut self) -> Result<DiagnosticCollector, Vec<Diagnostic>> {
260 self.writer.flush().map_err(|e| {
262 vec![Diagnostic::fatal(
263 DiagnosticCode::PathNotFound,
264 format!("I couldn't flush the output file: {}", e),
265 )]
266 })?;
267
268 match self.finish_strategy {
270 FinishStrategy::FlushOnly => {
271 }
273 FinishStrategy::AtomicReplace { temp_path, output_path } => {
274 atomic_replace_file(&output_path, &temp_path)?;
275 }
276 }
277
278 Ok(self.diagnostics)
279 }
280}
281
282fn get_file_mtime<P: AsRef<Path>>(path: P) -> Result<DateTime<Utc>, Vec<Diagnostic>> {
284 let path = path.as_ref();
285 let metadata = std::fs::metadata(path).map_err(|e| {
286 vec![Diagnostic::fatal(
287 DiagnosticCode::PathNotFound,
288 format!("I couldn't get metadata for '{}': {}", path.display(), e),
289 )]
290 })?;
291
292 let modified = metadata.modified().map_err(|e| {
293 vec![Diagnostic::fatal(
294 DiagnosticCode::PathNotFound,
295 format!("I couldn't get modification time for '{}': {}", path.display(), e),
296 )]
297 })?;
298
299 Ok(modified.into())
300}
301
302#[cfg(feature = "compression")]
307pub enum CompressedWriter {
308 Gzip(flate2::write::GzEncoder<std::fs::File>),
309 Zlib(flate2::write::ZlibEncoder<std::fs::File>),
310 Zstd(zstd::stream::write::Encoder<'static, std::fs::File>),
311 Brotli(brotli::CompressorWriter<std::fs::File>),
312}
313
314#[cfg(feature = "compression")]
315impl Write for CompressedWriter {
316 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
317 match self {
318 CompressedWriter::Gzip(w) => w.write(buf),
319 CompressedWriter::Zlib(w) => w.write(buf),
320 CompressedWriter::Zstd(w) => w.write(buf),
321 CompressedWriter::Brotli(w) => w.write(buf),
322 }
323 }
324
325 fn flush(&mut self) -> std::io::Result<()> {
326 match self {
327 CompressedWriter::Gzip(w) => w.flush(),
328 CompressedWriter::Zlib(w) => w.flush(),
329 CompressedWriter::Zstd(w) => w.flush(),
330 CompressedWriter::Brotli(w) => w.flush(),
331 }
332 }
333}
334
335#[cfg(feature = "compression")]
336impl CompressedWriter {
337 pub fn new(format: CompressionFormat, file: std::fs::File) -> Result<Self, Diagnostic> {
339 use flate2::Compression;
340
341 match format {
342 CompressionFormat::Gzip => {
343 Ok(CompressedWriter::Gzip(flate2::write::GzEncoder::new(file, Compression::default())))
344 }
345 CompressionFormat::Zlib => {
346 Ok(CompressedWriter::Zlib(flate2::write::ZlibEncoder::new(file, Compression::default())))
347 }
348 CompressionFormat::Zstd => {
349 let encoder = zstd::stream::write::Encoder::new(file, 0).map_err(|e| {
350 Diagnostic::fatal(
351 DiagnosticCode::PathNotFound,
352 format!("I couldn't create zstd encoder: {}", e),
353 )
354 })?;
355 Ok(CompressedWriter::Zstd(encoder))
356 }
357 CompressionFormat::Brotli => {
358 Ok(CompressedWriter::Brotli(brotli::CompressorWriter::new(file, 4096, 11, 22)))
359 }
360 CompressionFormat::Deflate => {
361 Err(Diagnostic::fatal(
363 DiagnosticCode::UnsupportedVersion,
364 "Standalone deflate compression is not supported for writing.".to_string(),
365 ))
366 }
367 CompressionFormat::None => {
368 Err(Diagnostic::fatal(
369 DiagnosticCode::UnsupportedVersion,
370 "CompressedWriter::new called with CompressionFormat::None".to_string(),
371 ))
372 }
373 }
374 }
375
376 pub fn finish(self) -> Result<(), Diagnostic> {
381 match self {
382 CompressedWriter::Gzip(w) => {
383 w.finish().map_err(|e| {
384 Diagnostic::fatal(
385 DiagnosticCode::PathNotFound,
386 format!("I couldn't finish gzip compression: {}", e),
387 )
388 })?;
389 }
390 CompressedWriter::Zlib(w) => {
391 w.finish().map_err(|e| {
392 Diagnostic::fatal(
393 DiagnosticCode::PathNotFound,
394 format!("I couldn't finish zlib compression: {}", e),
395 )
396 })?;
397 }
398 CompressedWriter::Zstd(w) => {
399 w.finish().map_err(|e| {
400 Diagnostic::fatal(
401 DiagnosticCode::PathNotFound,
402 format!("I couldn't finish zstd compression: {}", e),
403 )
404 })?;
405 }
406 CompressedWriter::Brotli(mut w) => {
407 w.flush().map_err(|e| {
409 Diagnostic::fatal(
410 DiagnosticCode::PathNotFound,
411 format!("I couldn't flush brotli compression: {}", e),
412 )
413 })?;
414 }
415 }
416 Ok(())
417 }
418}
419
420#[cfg(feature = "compression")]
426pub struct CompressedWriteContext {
427 inner: WriteContext<CompressedWriter>,
429}
430
431#[cfg(feature = "compression")]
432impl CompressedWriteContext {
433 pub fn new(
435 writer: CompressedWriter,
436 current_state: Value,
437 observation_count: usize,
438 snapshot_interval: Option<usize>,
439 finish_strategy: FinishStrategy,
440 diagnostics: DiagnosticCollector,
441 ) -> Self {
442 Self {
443 inner: WriteContext::with_diagnostics(
444 writer,
445 current_state,
446 observation_count,
447 snapshot_interval,
448 finish_strategy,
449 diagnostics,
450 ),
451 }
452 }
453
454 pub fn write_observations<P: AsRef<Path>>(
456 &mut self,
457 files: &[P],
458 ) -> Result<usize, Vec<Diagnostic>> {
459 self.inner.write_observations(files)
460 }
461
462 pub fn write_raw(&mut self, bytes: &[u8]) -> Result<(), Vec<Diagnostic>> {
464 self.inner.writer.write_all(bytes).map_err(|e| {
465 vec![Diagnostic::fatal(
466 DiagnosticCode::PathNotFound,
467 format!("I couldn't write to the output: {}", e),
468 )]
469 })
470 }
471
472 pub fn finish(self) -> Result<DiagnosticCollector, Vec<Diagnostic>> {
477 let finish_strategy = self.inner.finish_strategy.clone();
478 let diagnostics = self.inner.diagnostics;
479
480 self.inner.writer.finish().map_err(|d| vec![d])?;
482
483 match finish_strategy {
485 FinishStrategy::FlushOnly => {
486 }
488 FinishStrategy::AtomicReplace { temp_path, output_path } => {
489 atomic_replace_file(&output_path, &temp_path)?;
490 }
491 }
492
493 Ok(diagnostics)
494 }
495}
496
497#[cfg(test)]
498mod tests {
499 use super::*;
500 use serde_json::json;
501
502 #[test]
503 fn test_write_context_single_observation() {
504 let mut output = Vec::new();
505 let initial_state = json!({"count": 0});
506
507 {
508 let mut ctx = WriteContext::new(
509 &mut output,
510 initial_state,
511 0,
512 None,
513 FinishStrategy::FlushOnly,
514 );
515
516 let mut temp_file = tempfile::NamedTempFile::new().unwrap();
518 std::io::Write::write_all(&mut temp_file, br#"{"count": 1}"#).unwrap();
519 temp_file.flush().unwrap();
520
521 let count = ctx.write_observations(&[temp_file.path()]).unwrap();
522 assert_eq!(count, 1);
523 }
524
525 let output_str = String::from_utf8(output).unwrap();
526 assert!(output_str.contains("# Processing file:"));
527 assert!(output_str.contains("observe"));
528 assert!(output_str.contains("change"));
529 assert!(output_str.contains("/count"));
530 }
531
532 #[test]
533 fn test_write_context_no_changes() {
534 let mut output = Vec::new();
535 let initial_state = json!({"count": 0});
536
537 {
538 let mut ctx = WriteContext::new(
539 &mut output,
540 initial_state,
541 0,
542 None,
543 FinishStrategy::FlushOnly,
544 );
545
546 let mut temp_file = tempfile::NamedTempFile::new().unwrap();
548 std::io::Write::write_all(&mut temp_file, br#"{"count": 0}"#).unwrap();
549 temp_file.flush().unwrap();
550
551 let count = ctx.write_observations(&[temp_file.path()]).unwrap();
552 assert_eq!(count, 0);
553 }
554
555 let output_str = String::from_utf8(output).unwrap();
556 assert!(output_str.contains("# Processing file:"));
558 assert!(!output_str.contains("observe"));
559 }
560
561 #[test]
562 fn test_should_write_snapshot() {
563 let output: Vec<u8> = Vec::new();
564
565 let ctx: WriteContext<Vec<u8>> = WriteContext::new(
567 output.clone(),
568 json!({}),
569 5,
570 None,
571 FinishStrategy::FlushOnly,
572 );
573 assert!(!ctx.should_write_snapshot());
574
575 let ctx: WriteContext<Vec<u8>> = WriteContext::new(
577 output.clone(),
578 json!({}),
579 4,
580 Some(2),
581 FinishStrategy::FlushOnly,
582 );
583 assert!(ctx.should_write_snapshot());
584
585 let ctx: WriteContext<Vec<u8>> = WriteContext::new(
587 output,
588 json!({}),
589 3,
590 Some(2),
591 FinishStrategy::FlushOnly,
592 );
593 assert!(!ctx.should_write_snapshot());
594 }
595}