json_archive/
archive_context.rs

1// json-archive is a tool for tracking JSON file changes over time
2// Copyright (C) 2025  Peoples Grocers LLC
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Affero General Public License as published
6// by the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program.  If not, see <https://www.gnu.org/licenses/>.
16//
17// To purchase a license under different terms contact admin@peoplesgrocers.com
18// To request changes, report bugs, or give user feedback contact
19// marxism@peoplesgrocers.com
20//
21
22//! Archive write context and shared observation writing logic.
23//!
24//! This module provides:
25//! - `WriteContext`: A struct that holds the state needed to write observations
26//! - `write_observations`: The shared logic for diffing JSON files and writing events
27//!
28//! The key insight is that both create and append operations share the same
29//! core logic once they've set up their initial state and writer.
30
31use chrono::{DateTime, Utc};
32use serde_json::Value;
33use std::io::Write;
34use std::path::{Path, PathBuf};
35use uuid::Uuid;
36
37use crate::atomic_file::atomic_replace_file;
38use crate::detection::CompressionFormat;
39use crate::diagnostics::{Diagnostic, DiagnosticCode, DiagnosticCollector};
40use crate::diff;
41use crate::events::{Event, Observation};
42
43/// Strategy for finishing the write operation.
44#[derive(Debug, Clone)]
45pub enum FinishStrategy {
46    /// Just flush the writer. Used for:
47    /// - Creating new archives
48    /// - Appending to uncompressed archives (same file)
49    FlushOnly,
50
51    /// Atomic replace: swap temp file with original. Used for:
52    /// - Appending to compressed archives (rewrite strategy)
53    AtomicReplace {
54        temp_path: PathBuf,
55        output_path: PathBuf,
56    },
57}
58
59/// Context for writing observations to an archive.
60///
61/// This struct is the result of the "setup phase" for both create and append
62/// operations. Once you have a WriteContext, you can use `write_observations`
63/// to add new states, then call `finish` to complete the operation.
64pub struct WriteContext<W: Write> {
65    /// The writer to output JSON lines to.
66    pub writer: W,
67
68    /// Current state of the archive (used for diffing).
69    pub current_state: Value,
70
71    /// Number of observations already in the archive.
72    pub observation_count: usize,
73
74    /// Optional interval for writing snapshots.
75    pub snapshot_interval: Option<usize>,
76
77    /// How to finish the write operation.
78    pub finish_strategy: FinishStrategy,
79
80    /// Diagnostics collected during setup (e.g., warnings from reading existing archive).
81    pub diagnostics: DiagnosticCollector,
82}
83
84impl<W: Write> WriteContext<W> {
85    /// Create a new write context.
86    pub fn new(
87        writer: W,
88        current_state: Value,
89        observation_count: usize,
90        snapshot_interval: Option<usize>,
91        finish_strategy: FinishStrategy,
92    ) -> Self {
93        Self {
94            writer,
95            current_state,
96            observation_count,
97            snapshot_interval,
98            finish_strategy,
99            diagnostics: DiagnosticCollector::new(),
100        }
101    }
102
103    /// Create a write context with existing diagnostics.
104    pub fn with_diagnostics(
105        writer: W,
106        current_state: Value,
107        observation_count: usize,
108        snapshot_interval: Option<usize>,
109        finish_strategy: FinishStrategy,
110        diagnostics: DiagnosticCollector,
111    ) -> Self {
112        Self {
113            writer,
114            current_state,
115            observation_count,
116            snapshot_interval,
117            finish_strategy,
118            diagnostics,
119        }
120    }
121
122    /// Write observations for a list of JSON files.
123    ///
124    /// For each file:
125    /// 1. Reads and parses the JSON
126    /// 2. Diffs against current state
127    /// 3. Writes observation events
128    /// 4. Optionally writes a snapshot if interval is reached
129    /// 5. Updates current state
130    ///
131    /// Returns the number of observations written.
132    pub fn write_observations<P: AsRef<Path>>(
133        &mut self,
134        files: &[P],
135    ) -> Result<usize, Vec<Diagnostic>> {
136        let mut observations_written = 0;
137
138        for file_path in files.iter() {
139            let file_path = file_path.as_ref();
140
141            // Write comment marking which file we're processing
142            if let Err(e) = writeln!(self.writer, "# Processing file: {}", file_path.display()) {
143                return Err(vec![Diagnostic::fatal(
144                    DiagnosticCode::PathNotFound,
145                    format!("I couldn't write to the output: {}", e),
146                )]);
147            }
148
149            // Get file modification time for the observation timestamp
150            let file_mtime = get_file_mtime(file_path)?;
151
152            // Read and parse new state
153            let content = std::fs::read_to_string(file_path).map_err(|e| {
154                vec![Diagnostic::fatal(
155                    DiagnosticCode::PathNotFound,
156                    format!("I couldn't read the input file '{}': {}", file_path.display(), e),
157                )]
158            })?;
159
160            let new_state: Value = serde_json::from_str(&content).map_err(|e| {
161                vec![Diagnostic::fatal(
162                    DiagnosticCode::InvalidEventJson,
163                    format!("I couldn't parse '{}' as JSON: {}", file_path.display(), e),
164                )
165                .with_advice("Make sure the file contains valid JSON.".to_string())]
166            })?;
167
168            // Generate diff and create observation
169            let observation_id = format!("obs-{}", Uuid::new_v4());
170            let diff_events = diff::diff(&self.current_state, &new_state, "", &observation_id);
171
172            // Skip if no changes
173            if diff_events.is_empty() {
174                continue;
175            }
176
177            // Create and write observation
178            let mut observation = Observation::new(observation_id, file_mtime);
179            for event in diff_events {
180                observation.add_event(event);
181            }
182
183            self.write_observation(observation)?;
184            observations_written += 1;
185            self.observation_count += 1;
186
187            // Check if we should write a snapshot
188            if self.should_write_snapshot() {
189                self.write_snapshot(&new_state, file_mtime)?;
190            }
191
192            // Update current state for next iteration
193            self.current_state = new_state;
194        }
195
196        Ok(observations_written)
197    }
198
199    /// Write a single observation's events to the output.
200    fn write_observation(&mut self, observation: Observation) -> Result<(), Vec<Diagnostic>> {
201        for event in observation.to_events() {
202            let event_json = serde_json::to_string(&event).map_err(|e| {
203                vec![Diagnostic::fatal(
204                    DiagnosticCode::InvalidEventJson,
205                    format!("I couldn't serialize an event to JSON: {}", e),
206                )]
207            })?;
208
209            writeln!(self.writer, "{}", event_json).map_err(|e| {
210                vec![Diagnostic::fatal(
211                    DiagnosticCode::PathNotFound,
212                    format!("I couldn't write to the output: {}", e),
213                )]
214            })?;
215        }
216
217        Ok(())
218    }
219
220    /// Check if we should write a snapshot based on observation count.
221    fn should_write_snapshot(&self) -> bool {
222        if let Some(interval) = self.snapshot_interval {
223            self.observation_count > 0 && self.observation_count % interval == 0
224        } else {
225            false
226        }
227    }
228
229    /// Write a snapshot event.
230    fn write_snapshot(&mut self, state: &Value, timestamp: DateTime<Utc>) -> Result<(), Vec<Diagnostic>> {
231        let snapshot_id = format!("snapshot-{}", Uuid::new_v4());
232        let snapshot = Event::Snapshot {
233            observation_id: snapshot_id,
234            timestamp,
235            object: state.clone(),
236        };
237
238        let snapshot_json = serde_json::to_string(&snapshot).map_err(|e| {
239            vec![Diagnostic::fatal(
240                DiagnosticCode::InvalidEventJson,
241                format!("I couldn't serialize the snapshot to JSON: {}", e),
242            )]
243        })?;
244
245        writeln!(self.writer, "{}", snapshot_json).map_err(|e| {
246            vec![Diagnostic::fatal(
247                DiagnosticCode::PathNotFound,
248                format!("I couldn't write to the output: {}", e),
249            )]
250        })?;
251
252        Ok(())
253    }
254
255    /// Finish the write operation.
256    ///
257    /// This flushes the writer and, for compressed append operations,
258    /// performs the atomic file replacement.
259    pub fn finish(mut self) -> Result<DiagnosticCollector, Vec<Diagnostic>> {
260        // Flush the writer
261        self.writer.flush().map_err(|e| {
262            vec![Diagnostic::fatal(
263                DiagnosticCode::PathNotFound,
264                format!("I couldn't flush the output file: {}", e),
265            )]
266        })?;
267
268        // Handle atomic replacement if needed
269        match self.finish_strategy {
270            FinishStrategy::FlushOnly => {
271                // Nothing more to do
272            }
273            FinishStrategy::AtomicReplace { temp_path, output_path } => {
274                atomic_replace_file(&output_path, &temp_path)?;
275            }
276        }
277
278        Ok(self.diagnostics)
279    }
280}
281
282/// Get the file modification time as a DateTime<Utc>.
283fn get_file_mtime<P: AsRef<Path>>(path: P) -> Result<DateTime<Utc>, Vec<Diagnostic>> {
284    let path = path.as_ref();
285    let metadata = std::fs::metadata(path).map_err(|e| {
286        vec![Diagnostic::fatal(
287            DiagnosticCode::PathNotFound,
288            format!("I couldn't get metadata for '{}': {}", path.display(), e),
289        )]
290    })?;
291
292    let modified = metadata.modified().map_err(|e| {
293        vec![Diagnostic::fatal(
294            DiagnosticCode::PathNotFound,
295            format!("I couldn't get modification time for '{}': {}", path.display(), e),
296        )]
297    })?;
298
299    Ok(modified.into())
300}
301
302/// Encoder wrapper that provides a uniform interface for different compression formats.
303///
304/// This enum wraps the various compression encoders so we can treat them uniformly
305/// in the append-to-compressed-archive flow.
306#[cfg(feature = "compression")]
307pub enum CompressedWriter {
308    Gzip(flate2::write::GzEncoder<std::fs::File>),
309    Zlib(flate2::write::ZlibEncoder<std::fs::File>),
310    Zstd(zstd::stream::write::Encoder<'static, std::fs::File>),
311    Brotli(brotli::CompressorWriter<std::fs::File>),
312}
313
314#[cfg(feature = "compression")]
315impl Write for CompressedWriter {
316    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
317        match self {
318            CompressedWriter::Gzip(w) => w.write(buf),
319            CompressedWriter::Zlib(w) => w.write(buf),
320            CompressedWriter::Zstd(w) => w.write(buf),
321            CompressedWriter::Brotli(w) => w.write(buf),
322        }
323    }
324
325    fn flush(&mut self) -> std::io::Result<()> {
326        match self {
327            CompressedWriter::Gzip(w) => w.flush(),
328            CompressedWriter::Zlib(w) => w.flush(),
329            CompressedWriter::Zstd(w) => w.flush(),
330            CompressedWriter::Brotli(w) => w.flush(),
331        }
332    }
333}
334
335#[cfg(feature = "compression")]
336impl CompressedWriter {
337    /// Create a new compressed writer for the given format and file.
338    pub fn new(format: CompressionFormat, file: std::fs::File) -> Result<Self, Diagnostic> {
339        use flate2::Compression;
340
341        match format {
342            CompressionFormat::Gzip => {
343                Ok(CompressedWriter::Gzip(flate2::write::GzEncoder::new(file, Compression::default())))
344            }
345            CompressionFormat::Zlib => {
346                Ok(CompressedWriter::Zlib(flate2::write::ZlibEncoder::new(file, Compression::default())))
347            }
348            CompressionFormat::Zstd => {
349                let encoder = zstd::stream::write::Encoder::new(file, 0).map_err(|e| {
350                    Diagnostic::fatal(
351                        DiagnosticCode::PathNotFound,
352                        format!("I couldn't create zstd encoder: {}", e),
353                    )
354                })?;
355                Ok(CompressedWriter::Zstd(encoder))
356            }
357            CompressionFormat::Brotli => {
358                Ok(CompressedWriter::Brotli(brotli::CompressorWriter::new(file, 4096, 11, 22)))
359            }
360            CompressionFormat::Deflate => {
361                // Deflate is typically used within gzip/zlib, not standalone for files
362                Err(Diagnostic::fatal(
363                    DiagnosticCode::UnsupportedVersion,
364                    "Standalone deflate compression is not supported for writing.".to_string(),
365                ))
366            }
367            CompressionFormat::None => {
368                Err(Diagnostic::fatal(
369                    DiagnosticCode::UnsupportedVersion,
370                    "CompressedWriter::new called with CompressionFormat::None".to_string(),
371                ))
372            }
373        }
374    }
375
376    /// Finish compression and return any errors.
377    ///
378    /// This must be called before the file is closed to ensure all
379    /// compressed data is flushed.
380    pub fn finish(self) -> Result<(), Diagnostic> {
381        match self {
382            CompressedWriter::Gzip(w) => {
383                w.finish().map_err(|e| {
384                    Diagnostic::fatal(
385                        DiagnosticCode::PathNotFound,
386                        format!("I couldn't finish gzip compression: {}", e),
387                    )
388                })?;
389            }
390            CompressedWriter::Zlib(w) => {
391                w.finish().map_err(|e| {
392                    Diagnostic::fatal(
393                        DiagnosticCode::PathNotFound,
394                        format!("I couldn't finish zlib compression: {}", e),
395                    )
396                })?;
397            }
398            CompressedWriter::Zstd(w) => {
399                w.finish().map_err(|e| {
400                    Diagnostic::fatal(
401                        DiagnosticCode::PathNotFound,
402                        format!("I couldn't finish zstd compression: {}", e),
403                    )
404                })?;
405            }
406            CompressedWriter::Brotli(mut w) => {
407                // Brotli doesn't have a finish() method, flush is sufficient
408                w.flush().map_err(|e| {
409                    Diagnostic::fatal(
410                        DiagnosticCode::PathNotFound,
411                        format!("I couldn't flush brotli compression: {}", e),
412                    )
413                })?;
414            }
415        }
416        Ok(())
417    }
418}
419
420/// A write context specifically for compressed output.
421///
422/// This wraps WriteContext to handle the finish() call properly for
423/// compressed writers, which need to call finish() on the encoder
424/// before the atomic file swap.
425#[cfg(feature = "compression")]
426pub struct CompressedWriteContext {
427    /// The inner write context.
428    inner: WriteContext<CompressedWriter>,
429}
430
431#[cfg(feature = "compression")]
432impl CompressedWriteContext {
433    /// Create a new compressed write context.
434    pub fn new(
435        writer: CompressedWriter,
436        current_state: Value,
437        observation_count: usize,
438        snapshot_interval: Option<usize>,
439        finish_strategy: FinishStrategy,
440        diagnostics: DiagnosticCollector,
441    ) -> Self {
442        Self {
443            inner: WriteContext::with_diagnostics(
444                writer,
445                current_state,
446                observation_count,
447                snapshot_interval,
448                finish_strategy,
449                diagnostics,
450            ),
451        }
452    }
453
454    /// Write observations for a list of JSON files.
455    pub fn write_observations<P: AsRef<Path>>(
456        &mut self,
457        files: &[P],
458    ) -> Result<usize, Vec<Diagnostic>> {
459        self.inner.write_observations(files)
460    }
461
462    /// Write raw bytes to the output (used for copying existing archive content).
463    pub fn write_raw(&mut self, bytes: &[u8]) -> Result<(), Vec<Diagnostic>> {
464        self.inner.writer.write_all(bytes).map_err(|e| {
465            vec![Diagnostic::fatal(
466                DiagnosticCode::PathNotFound,
467                format!("I couldn't write to the output: {}", e),
468            )]
469        })
470    }
471
472    /// Finish the write operation.
473    ///
474    /// This finishes the compression encoder, then performs any atomic
475    /// file operations needed.
476    pub fn finish(self) -> Result<DiagnosticCollector, Vec<Diagnostic>> {
477        let finish_strategy = self.inner.finish_strategy.clone();
478        let diagnostics = self.inner.diagnostics;
479
480        // Finish compression first
481        self.inner.writer.finish().map_err(|d| vec![d])?;
482
483        // Then handle atomic replacement if needed
484        match finish_strategy {
485            FinishStrategy::FlushOnly => {
486                // Nothing more to do
487            }
488            FinishStrategy::AtomicReplace { temp_path, output_path } => {
489                atomic_replace_file(&output_path, &temp_path)?;
490            }
491        }
492
493        Ok(diagnostics)
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500    use serde_json::json;
501
502    #[test]
503    fn test_write_context_single_observation() {
504        let mut output = Vec::new();
505        let initial_state = json!({"count": 0});
506
507        {
508            let mut ctx = WriteContext::new(
509                &mut output,
510                initial_state,
511                0,
512                None,
513                FinishStrategy::FlushOnly,
514            );
515
516            // Create a temp file with new state
517            let mut temp_file = tempfile::NamedTempFile::new().unwrap();
518            std::io::Write::write_all(&mut temp_file, br#"{"count": 1}"#).unwrap();
519            temp_file.flush().unwrap();
520
521            let count = ctx.write_observations(&[temp_file.path()]).unwrap();
522            assert_eq!(count, 1);
523        }
524
525        let output_str = String::from_utf8(output).unwrap();
526        assert!(output_str.contains("# Processing file:"));
527        assert!(output_str.contains("observe"));
528        assert!(output_str.contains("change"));
529        assert!(output_str.contains("/count"));
530    }
531
532    #[test]
533    fn test_write_context_no_changes() {
534        let mut output = Vec::new();
535        let initial_state = json!({"count": 0});
536
537        {
538            let mut ctx = WriteContext::new(
539                &mut output,
540                initial_state,
541                0,
542                None,
543                FinishStrategy::FlushOnly,
544            );
545
546            // Create a temp file with same state
547            let mut temp_file = tempfile::NamedTempFile::new().unwrap();
548            std::io::Write::write_all(&mut temp_file, br#"{"count": 0}"#).unwrap();
549            temp_file.flush().unwrap();
550
551            let count = ctx.write_observations(&[temp_file.path()]).unwrap();
552            assert_eq!(count, 0);
553        }
554
555        let output_str = String::from_utf8(output).unwrap();
556        // Should have comment but no events
557        assert!(output_str.contains("# Processing file:"));
558        assert!(!output_str.contains("observe"));
559    }
560
561    #[test]
562    fn test_should_write_snapshot() {
563        let output: Vec<u8> = Vec::new();
564
565        // No interval set
566        let ctx: WriteContext<Vec<u8>> = WriteContext::new(
567            output.clone(),
568            json!({}),
569            5,
570            None,
571            FinishStrategy::FlushOnly,
572        );
573        assert!(!ctx.should_write_snapshot());
574
575        // Interval of 2, at observation 4 (multiple of 2)
576        let ctx: WriteContext<Vec<u8>> = WriteContext::new(
577            output.clone(),
578            json!({}),
579            4,
580            Some(2),
581            FinishStrategy::FlushOnly,
582        );
583        assert!(ctx.should_write_snapshot());
584
585        // Interval of 2, at observation 3 (not multiple of 2)
586        let ctx: WriteContext<Vec<u8>> = WriteContext::new(
587            output,
588            json!({}),
589            3,
590            Some(2),
591            FinishStrategy::FlushOnly,
592        );
593        assert!(!ctx.should_write_snapshot());
594    }
595}