json_archive/
archive_reader.rs

1// json-archive is a tool for tracking JSON file changes over time
2// Copyright (C) 2025  Peoples Grocers LLC
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Affero General Public License as published
6// by the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program.  If not, see <https://www.gnu.org/licenses/>.
16//
17// To purchase a license under different terms contact admin@peoplesgrocers.com
18// To request changes, report bugs, or give user feedback contact
19// marxism@peoplesgrocers.com
20//
21
22use serde_json::Value;
23use std::collections::HashSet;
24use std::fs::File;
25use std::io::{BufRead, BufReader, Read};
26use std::path::Path;
27
28use crate::diagnostics::{Diagnostic, DiagnosticCode, DiagnosticCollector, DiagnosticLevel};
29use crate::event_deserialize::EventDeserializer;
30use crate::events::{Event, Header};
31use crate::pointer::JsonPointer;
32use crate::detection::{CompressionFormat, detect_compression_format};
33
34#[cfg(feature = "compression")]
35use flate2::read::{DeflateDecoder, GzDecoder, ZlibDecoder};
36#[cfg(feature = "compression")]
37use brotli::Decompressor;
38#[cfg(feature = "compression")]
39use zstd::stream::read::Decoder as ZstdDecoder;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ReadMode {
43    FullValidation,
44    AppendSeek,
45}
46
47pub struct ArchiveReader {
48    mode: ReadMode,
49    filename: String,
50}
51
52#[derive(Debug)]
53pub struct ReadResult {
54    pub header: Header,
55    pub final_state: Value,
56    pub diagnostics: DiagnosticCollector,
57    pub observation_count: usize,
58}
59
60pub struct EventIterator {
61    reader: Box<dyn BufRead>,
62    pub diagnostics: DiagnosticCollector,
63    pub header: Header,
64    filename: String,
65    line_number: usize,
66}
67
68impl Iterator for EventIterator {
69    type Item = Event;
70
71    fn next(&mut self) -> Option<Self::Item> {
72        let mut line = String::new();
73
74        loop {
75            line.clear();
76            self.line_number += 1;
77
78            match self.reader.read_line(&mut line) {
79                Ok(0) => return None, // EOF
80                Ok(_) => {
81                    let trimmed = line.trim();
82
83                    // Skip comments and blank lines
84                    if trimmed.starts_with('#') || trimmed.is_empty() {
85                        continue;
86                    }
87
88                    // Try to parse as event
89                    let event_deserializer = match serde_json::from_str::<EventDeserializer>(&line) {
90                        Ok(d) => d,
91                        Err(e) => {
92                            self.diagnostics.add(
93                                Diagnostic::fatal(
94                                    DiagnosticCode::InvalidEventJson,
95                                    format!("I couldn't parse this line as JSON: {}", e),
96                                )
97                                .with_location(self.filename.clone(), self.line_number)
98                                .with_snippet(format!("{} | {}", self.line_number, line.trim()))
99                                .with_advice(
100                                    "Each line after the header must be either:\n\
101                                     - A comment starting with #\n\
102                                     - A valid JSON array representing an event\n\n\
103                                     Check for missing commas, quotes, or brackets."
104                                        .to_string(),
105                                ),
106                            );
107                            continue;
108                        }
109                    };
110
111                    // Add any diagnostics from deserialization
112                    for diagnostic in event_deserializer.diagnostics {
113                        self.diagnostics.add(
114                            diagnostic
115                                .with_location(self.filename.clone(), self.line_number)
116                                .with_snippet(format!("{} | {}", self.line_number, line.trim()))
117                        );
118                    }
119
120                    // Return event if we have one
121                    if let Some(event) = event_deserializer.event {
122                        return Some(event);
123                    }
124
125                    // If no event but had diagnostics, continue to next line
126                    continue;
127                }
128                Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
129                    self.diagnostics.add(
130                        Diagnostic::fatal(
131                            DiagnosticCode::InvalidUtf8,
132                            format!("I found invalid UTF-8 bytes at line {}.", self.line_number)
133                        )
134                        .with_location(self.filename.clone(), self.line_number)
135                        .with_advice(
136                            "The JSON Archive format requires UTF-8 encoding. Make sure the file \
137                             was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
138                                .to_string()
139                        )
140                    );
141                    return None;
142                }
143                Err(_) => return None,
144            }
145        }
146    }
147}
148
149impl ArchiveReader {
150    pub fn new<P: AsRef<Path>>(path: P, mode: ReadMode) -> std::io::Result<Self> {
151        let filename = path.as_ref().display().to_string();
152        Ok(Self { mode, filename })
153    }
154
155    pub fn events<P: AsRef<Path>>(&self, path: P) -> std::io::Result<(Value, EventIterator)> {
156        let path = path.as_ref();
157        let mut file = File::open(path)?;
158
159        // Detect compression format
160        let mut magic_bytes = [0u8; 4];
161        let bytes_read = file.read(&mut magic_bytes)?;
162        let compression_format = detect_compression_format(path, &magic_bytes[..bytes_read]);
163
164        // Re-open file to reset position
165        file = File::open(path)?;
166
167        let mut diagnostics = DiagnosticCollector::new();
168
169        // Check if compression is detected but not supported
170        #[cfg(not(feature = "compression"))]
171        if compression_format != CompressionFormat::None {
172            let format_name = match compression_format {
173                CompressionFormat::Gzip => "gzip",
174                CompressionFormat::Deflate => "deflate",
175                CompressionFormat::Zlib => "zlib",
176                CompressionFormat::Brotli => "brotli",
177                CompressionFormat::Zstd => "zstd",
178                CompressionFormat::None => unreachable!(),
179            };
180
181            diagnostics.add(
182                Diagnostic::fatal(
183                    DiagnosticCode::UnsupportedVersion,
184                    format!("I detected a {}-compressed archive, but this build doesn't support compression.", format_name)
185                )
186                .with_location(self.filename.clone(), 1)
187                .with_advice(
188                    "This binary was built without compression support to reduce binary size and dependencies.\n\
189                     You have two options:\n\
190                     1. Install the version with compression support: cargo install json-archive --features compression\n\
191                     2. Manually decompress the file first, then use this tool on the uncompressed archive"
192                        .to_string()
193                )
194            );
195
196            // Return dummy values with fatal diagnostic
197            let iterator = EventIterator {
198                reader: Box::new(BufReader::new(std::io::empty())),
199                diagnostics,
200                header: Header::new(Value::Null, None),
201                filename: self.filename.clone(),
202                line_number: 1,
203            };
204            return Ok((Value::Null, iterator));
205        }
206
207        // Create appropriate reader based on compression format
208        #[cfg(feature = "compression")]
209        let reader: Box<dyn BufRead> = match compression_format {
210            CompressionFormat::Gzip => Box::new(BufReader::new(GzDecoder::new(file))),
211            CompressionFormat::Deflate => Box::new(BufReader::new(DeflateDecoder::new(file))),
212            CompressionFormat::Zlib => Box::new(BufReader::new(ZlibDecoder::new(file))),
213            CompressionFormat::Brotli => Box::new(BufReader::new(Decompressor::new(file, 4096))),
214            CompressionFormat::Zstd => Box::new(BufReader::new(ZstdDecoder::new(file)?)),
215            CompressionFormat::None => Box::new(BufReader::new(file)),
216        };
217
218        #[cfg(not(feature = "compression"))]
219        let reader: Box<dyn BufRead> = Box::new(BufReader::new(file));
220
221        let mut reader = reader;
222        let mut header_line = String::new();
223
224        let _bytes_read = match reader.read_line(&mut header_line) {
225            Ok(0) => {
226                // Empty file
227                diagnostics.add(
228                    Diagnostic::fatal(
229                        DiagnosticCode::EmptyFile,
230                        "I found an empty file, but I need at least a header line.".to_string(),
231                    )
232                    .with_location(self.filename.clone(), 1)
233                    .with_advice(
234                        "See the file format specification for header structure."
235                            .to_string(),
236                    ),
237                );
238                let iterator = EventIterator {
239                    reader,
240                    diagnostics,
241                    header: Header::new(Value::Null, None),
242                    filename: self.filename.clone(),
243                    line_number: 1,
244                };
245                return Ok((Value::Null, iterator));
246            }
247            Ok(n) => n,
248            Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
249                // UTF-8 error
250                diagnostics.add(
251                    Diagnostic::fatal(
252                        DiagnosticCode::InvalidUtf8,
253                        "I found invalid UTF-8 bytes at line 1.".to_string()
254                    )
255                    .with_location(self.filename.clone(), 1)
256                    .with_advice(
257                        "The JSON Archive format requires UTF-8 encoding. Make sure the file \
258                         was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
259                            .to_string()
260                    )
261                );
262                let iterator = EventIterator {
263                    reader,
264                    diagnostics,
265                    header: Header::new(Value::Null, None),
266                    filename: self.filename.clone(),
267                    line_number: 1,
268                };
269                return Ok((Value::Null, iterator));
270            }
271            Err(e) => return Err(e),
272        };
273
274        let header = match self.parse_header(&header_line, 1, &mut diagnostics) {
275            Some(h) => h,
276            None => {
277                let iterator = EventIterator {
278                    reader,
279                    diagnostics,
280                    header: Header::new(Value::Null, None),
281                    filename: self.filename.clone(),
282                    line_number: 1,
283                };
284                return Ok((Value::Null, iterator));
285            }
286        };
287
288        let iterator = EventIterator {
289            reader,
290            diagnostics,
291            header: header.clone(),
292            filename: self.filename.clone(),
293            line_number: 1,
294        };
295
296        Ok((header.initial, iterator))
297    }
298
299    pub fn read<P: AsRef<Path>>(&self, path: P) -> std::io::Result<ReadResult> {
300        let (initial_value, mut event_iter) = self.events(&path)?;
301
302        // Check for early fatal diagnostics (like compression not supported)
303        if event_iter.diagnostics.has_fatal() {
304            return Ok(ReadResult {
305                header: Header::new(Value::Null, None),
306                final_state: Value::Null,
307                diagnostics: event_iter.diagnostics,
308                observation_count: 0,
309            });
310        }
311
312        let header = Header::new(initial_value.clone(), None);
313        let mut state = initial_value;
314        let mut seen_observations: HashSet<String> = HashSet::new();
315        let mut current_observation: Option<(String, usize, usize)> = None;
316        let mut events_in_observation = 0;
317        let mut observation_count = 0;
318
319        // Process events from iterator
320        while let Some(event) = event_iter.next() {
321            let line_number = event_iter.line_number;
322
323            match event {
324                Event::Observe { observation_id, timestamp: _, change_count } => {
325                    if let Some((_obs_id, obs_line, expected_count)) = &current_observation {
326                        if events_in_observation != *expected_count {
327                            event_iter.diagnostics.add(
328                                Diagnostic::new(
329                                    DiagnosticLevel::Warning,
330                                    DiagnosticCode::ChangeCountMismatch,
331                                    format!(
332                                        "The observe event at line {} declared {} changes, but I found {}.",
333                                        obs_line, expected_count, events_in_observation
334                                    )
335                                )
336                                .with_location(self.filename.clone(), *obs_line)
337                                .with_advice(
338                                    "Make sure the change_count in the observe event matches the number of \
339                                     add/change/remove/move events that follow it."
340                                        .to_string()
341                                )
342                            );
343                        }
344                    }
345
346                    if seen_observations.contains(&observation_id) {
347                        event_iter.diagnostics.add(
348                            Diagnostic::new(
349                                DiagnosticLevel::Warning,
350                                DiagnosticCode::DuplicateObservationId,
351                                format!("I found a duplicate observation ID: '{}'", observation_id),
352                            )
353                            .with_location(self.filename.clone(), line_number)
354                            .with_advice(
355                                "Each observation ID should be unique within the archive. \
356                                 Consider using UUIDs or timestamps to ensure uniqueness."
357                                    .to_string(),
358                            ),
359                        );
360                    }
361
362                    seen_observations.insert(observation_id.clone());
363                    current_observation = Some((observation_id, line_number, change_count));
364                    events_in_observation = 0;
365                    observation_count += 1;
366                }
367
368                Event::Add { path, value, observation_id } => {
369                    events_in_observation += 1;
370
371                    if self.mode == ReadMode::FullValidation
372                        && !seen_observations.contains(&observation_id)
373                    {
374                        event_iter.diagnostics.add(
375                            Diagnostic::fatal(
376                                DiagnosticCode::NonExistentObservationId,
377                                format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
378                            )
379                            .with_location(self.filename.clone(), line_number)
380                            .with_advice(
381                                "Each add/change/remove/move event must reference an observation ID from a preceding observe event."
382                                    .to_string()
383                            )
384                        );
385                        continue;
386                    }
387
388                    if let Err(diag) = apply_add(&mut state, &path, value) {
389                        event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
390                        continue;
391                    }
392                }
393
394                Event::Change { path, new_value, observation_id } => {
395                    events_in_observation += 1;
396
397                    if self.mode == ReadMode::FullValidation
398                        && !seen_observations.contains(&observation_id)
399                    {
400                        event_iter.diagnostics.add(
401                            Diagnostic::fatal(
402                                DiagnosticCode::NonExistentObservationId,
403                                format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
404                            )
405                            .with_location(self.filename.clone(), line_number)
406                        );
407                        continue;
408                    }
409
410                    if let Err(diag) = apply_change(&mut state, &path, new_value) {
411                        event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
412                        continue;
413                    }
414                }
415
416                Event::Remove { path, observation_id } => {
417                    events_in_observation += 1;
418
419                    if self.mode == ReadMode::FullValidation
420                        && !seen_observations.contains(&observation_id)
421                    {
422                        event_iter.diagnostics.add(
423                            Diagnostic::fatal(
424                                DiagnosticCode::NonExistentObservationId,
425                                format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
426                            )
427                            .with_location(self.filename.clone(), line_number)
428                        );
429                        continue;
430                    }
431
432                    if let Err(diag) = apply_remove(&mut state, &path) {
433                        event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
434                        continue;
435                    }
436                }
437
438                Event::Move { path, moves, observation_id } => {
439                    events_in_observation += 1;
440
441                    if self.mode == ReadMode::FullValidation
442                        && !seen_observations.contains(&observation_id)
443                    {
444                        event_iter.diagnostics.add(
445                            Diagnostic::fatal(
446                                DiagnosticCode::NonExistentObservationId,
447                                format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
448                            )
449                            .with_location(self.filename.clone(), line_number)
450                        );
451                        continue;
452                    }
453
454                    if let Err(diag) = apply_move(&mut state, &path, moves) {
455                        event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
456                        continue;
457                    }
458                }
459
460                Event::Snapshot { observation_id: _, timestamp: _, object } => {
461                    if self.mode == ReadMode::FullValidation && state != object {
462                        event_iter.diagnostics.add(
463                            Diagnostic::fatal(
464                                DiagnosticCode::SnapshotStateMismatch,
465                                "I found a snapshot whose state doesn't match the replayed state up to this point.".to_string()
466                            )
467                            .with_location(self.filename.clone(), line_number)
468                            .with_advice(
469                                "This could indicate corruption or that events were applied incorrectly. \
470                                 The snapshot state should exactly match the result of replaying all events \
471                                 from the initial state."
472                                    .to_string()
473                            )
474                        );
475                    }
476
477                    state = object;
478                }
479            }
480        }
481
482        if let Some((_obs_id, obs_line, expected_count)) = &current_observation {
483            if events_in_observation != *expected_count {
484                event_iter.diagnostics.add(
485                    Diagnostic::new(
486                        DiagnosticLevel::Warning,
487                        DiagnosticCode::ChangeCountMismatch,
488                        format!(
489                            "The observe event at line {} declared {} changes, but I found {}.",
490                            obs_line, expected_count, events_in_observation
491                        ),
492                    )
493                    .with_location(self.filename.clone(), *obs_line),
494                );
495            }
496        }
497
498        Ok(ReadResult {
499            header,
500            final_state: state,
501            diagnostics: event_iter.diagnostics,
502            observation_count,
503        })
504    }
505
506    fn parse_header(
507        &self,
508        line: &str,
509        line_number: usize,
510        diagnostics: &mut DiagnosticCollector,
511    ) -> Option<Header> {
512        let value: Value = match serde_json::from_str(line) {
513            Ok(v) => v,
514            Err(e) => {
515                diagnostics.add(
516                    Diagnostic::fatal(
517                        DiagnosticCode::MissingHeader,
518                        format!("I couldn't parse the header as JSON: {}", e),
519                    )
520                    .with_location(self.filename.clone(), line_number)
521                    .with_snippet(format!("{} | {}", line_number, line))
522                    .with_advice(
523                        "The first line must be a JSON object containing the archive header.\n\
524                         Required fields: type, version, created, initial"
525                            .to_string(),
526                    ),
527                );
528                return None;
529            }
530        };
531
532        match serde_json::from_value::<Header>(value.clone()) {
533            Ok(header) => {
534                if header.version != 1 {
535                    diagnostics.add(
536                        Diagnostic::fatal(
537                            DiagnosticCode::UnsupportedVersion,
538                            format!("I found version {}, but I only support version 1.", header.version)
539                        )
540                        .with_location(self.filename.clone(), line_number)
541                        .with_advice(
542                            "This archive was created with a newer or older version of the format. \
543                             You may need to upgrade your tools or convert the archive."
544                                .to_string()
545                        )
546                    );
547                    return None;
548                }
549
550                Some(header)
551            }
552            Err(e) => {
553                diagnostics.add(
554                    Diagnostic::fatal(
555                        DiagnosticCode::MissingHeaderField,
556                        format!("I couldn't parse the header: {}", e),
557                    )
558                    .with_location(self.filename.clone(), line_number)
559                    .with_snippet(format!("{} | {}", line_number, line))
560                    .with_advice(
561                        "The header must contain:\n\
562                         - type: \"@peoplesgrocers/json-archive\"\n\
563                         - version: 1\n\
564                         - created: an ISO-8601 timestamp\n\
565                         - initial: the initial state object"
566                            .to_string(),
567                    ),
568                );
569                None
570            }
571        }
572    }
573
574}
575
576pub fn apply_add(state: &mut Value, path: &str, value: Value) -> Result<(), Diagnostic> {
577    let pointer = JsonPointer::new(path).map_err(|diag| {
578        diag.with_advice(
579            "JSON Pointer paths must start with '/' and use '/' to separate segments.\n\
580             Special characters: use ~0 for ~ and ~1 for /"
581                .to_string()
582        )
583    })?;
584
585    pointer.set(state, value).map_err(|diag| {
586        diag.with_advice(
587            "For add operations, the parent path must exist. \
588             For example, to add /a/b/c, the paths /a and /a/b must already exist."
589                .to_string()
590        )
591    })
592}
593
594pub fn apply_change(state: &mut Value, path: &str, new_value: Value) -> Result<(), Diagnostic> {
595    let pointer = JsonPointer::new(path)?;
596    pointer.set(state, new_value)?;
597    Ok(())
598}
599
600pub fn apply_remove(state: &mut Value, path: &str) -> Result<(), Diagnostic> {
601    let pointer = JsonPointer::new(path)?;
602    pointer.remove(state)?;
603    Ok(())
604}
605
606pub fn apply_move(
607    state: &mut Value,
608    path: &str,
609    moves: Vec<(usize, usize)>,
610) -> Result<(), Diagnostic> {
611    let pointer = JsonPointer::new(path)?;
612
613    let array = pointer.get_mut(state)?;
614
615    if !array.is_array() {
616        return Err(
617            Diagnostic::fatal(
618                DiagnosticCode::MoveOnNonArray,
619                format!(
620                    "I can't apply move operations to '{}' because it's not an array.",
621                    path
622                ),
623            )
624            .with_advice(
625                "Move operations can only reorder elements within an array. \
626                 The path must point to an array value."
627                    .to_string(),
628            ),
629        );
630    }
631
632    let arr = array.as_array_mut().unwrap();
633
634    // Validate all moves upfront before mutating
635    for (from_idx, to_idx) in &moves {
636        if *from_idx >= arr.len() {
637            return Err(Diagnostic::fatal(
638                DiagnosticCode::MoveIndexOutOfBounds,
639                format!(
640                    "The 'from' index {} is out of bounds (array length is {}).",
641                    from_idx,
642                    arr.len()
643                ),
644            ));
645        }
646
647        if *to_idx > arr.len() {
648            return Err(Diagnostic::fatal(
649                DiagnosticCode::MoveIndexOutOfBounds,
650                format!(
651                    "The 'to' index {} is out of bounds (array length is {}).",
652                    to_idx,
653                    arr.len()
654                ),
655            ));
656        }
657    }
658
659    // Apply moves now that we know they're all valid
660    for (from_idx, to_idx) in moves {
661        let element = arr.remove(from_idx);
662        let insert_idx = if to_idx > from_idx { to_idx - 1 } else { to_idx };
663        arr.insert(insert_idx, element);
664    }
665
666    Ok(())
667}
668
669#[cfg(test)]
670mod tests {
671    use super::*;
672    use serde_json::json;
673    use std::io::Write;
674    use tempfile::NamedTempFile;
675
676    #[test]
677    fn test_read_valid_archive() -> Result<(), Box<dyn std::error::Error>> {
678        let mut temp_file = NamedTempFile::new()?;
679
680        let header = Header::new(json!({"count": 0}), Some("test".to_string()));
681        writeln!(temp_file, "{}", serde_json::to_string(&header)?)?;
682        writeln!(
683            temp_file,
684            r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 1]"#
685        )?;
686        writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#)?;
687
688        let reader = ArchiveReader::new(temp_file.path(), ReadMode::FullValidation)?;
689        let result = reader.read(temp_file.path())?;
690
691        assert_eq!(result.final_state, json!({"count": 1}));
692        assert_eq!(result.observation_count, 1);
693        assert!(!result.diagnostics.has_fatal());
694
695        Ok(())
696    }
697
698    #[test]
699    fn test_empty_file() -> Result<(), Box<dyn std::error::Error>> {
700        let temp_file = NamedTempFile::new()?;
701
702        let reader = ArchiveReader::new(temp_file.path(), ReadMode::FullValidation)?;
703        let result = reader.read(temp_file.path())?;
704
705        assert!(result.diagnostics.has_fatal());
706        assert_eq!(result.diagnostics.len(), 1);
707
708        Ok(())
709    }
710
711    #[test]
712    fn test_non_existent_observation_id() -> Result<(), Box<dyn std::error::Error>> {
713        let mut temp_file = NamedTempFile::new()?;
714
715        let header = Header::new(json!({"count": 0}), None);
716        writeln!(temp_file, "{}", serde_json::to_string(&header)?)?;
717        writeln!(temp_file, r#"["change", "/count", 1, "obs-999"]"#)?;
718
719        let reader = ArchiveReader::new(temp_file.path(), ReadMode::FullValidation)?;
720        let result = reader.read(temp_file.path())?;
721
722        assert!(result.diagnostics.has_fatal());
723
724        Ok(())
725    }
726
727    #[test]
728    fn test_append_mode_ignores_observation_id() -> Result<(), Box<dyn std::error::Error>> {
729        let mut temp_file = NamedTempFile::new()?;
730
731        let header = Header::new(json!({"count": 0}), None);
732        writeln!(temp_file, "{}", serde_json::to_string(&header)?)?;
733        writeln!(temp_file, r#"["change", "/count", 1, "obs-999"]"#)?;
734
735        let reader = ArchiveReader::new(temp_file.path(), ReadMode::AppendSeek)?;
736        let result = reader.read(temp_file.path())?;
737
738        assert!(!result.diagnostics.has_fatal());
739        assert_eq!(result.final_state, json!({"count": 1}));
740
741        Ok(())
742    }
743
744    #[test]
745    fn test_change_count_mismatch() -> Result<(), Box<dyn std::error::Error>> {
746        let mut temp_file = NamedTempFile::new()?;
747
748        let header = Header::new(json!({"count": 0}), None);
749        writeln!(temp_file, "{}", serde_json::to_string(&header)?)?;
750        writeln!(
751            temp_file,
752            r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 2]"#
753        )?;
754        writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#)?;
755
756        let reader = ArchiveReader::new(temp_file.path(), ReadMode::FullValidation)?;
757        let result = reader.read(temp_file.path())?;
758
759        let warnings: Vec<_> = result
760            .diagnostics
761            .diagnostics()
762            .iter()
763            .filter(|d| d.level == DiagnosticLevel::Warning)
764            .collect();
765
766        assert_eq!(warnings.len(), 1);
767
768        Ok(())
769    }
770
771    #[test]
772    fn test_simple_change() -> Result<(), Box<dyn std::error::Error>> {
773        let mut temp_file = NamedTempFile::new()?;
774
775        let header = Header::new(json!({"count": 5}), None);
776        writeln!(temp_file, "{}", serde_json::to_string(&header)?)?;
777        writeln!(
778            temp_file,
779            r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 1]"#
780        )?;
781        writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#)?;
782
783        let reader = ArchiveReader::new(temp_file.path(), ReadMode::FullValidation)?;
784        let result = reader.read(temp_file.path())?;
785
786        assert!(!result.diagnostics.has_fatal());
787        assert_eq!(result.final_state, json!({"count": 1}));
788
789        Ok(())
790    }
791}