json_archive/
archive_reader.rs

1// json-archive is a tool for tracking JSON file changes over time
2// Copyright (C) 2025  Peoples Grocers LLC
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Affero General Public License as published
6// by the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program.  If not, see <https://www.gnu.org/licenses/>.
16//
17// To purchase a license under different terms contact admin@peoplesgrocers.com
18// To request changes, report bugs, or give user feedback contact
19// marxism@peoplesgrocers.com
20//
21
22use serde_json::Value;
23use std::collections::HashSet;
24use std::io::BufRead;
25
26use crate::diagnostics::{Diagnostic, DiagnosticCode, DiagnosticCollector, DiagnosticLevel};
27use crate::event_deserialize::EventDeserializer;
28use crate::events::{Event, Header};
29use crate::pointer::JsonPointer;
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum ReadMode {
33    FullValidation,
34    AppendSeek,
35}
36
37#[derive(Debug)]
38pub struct ReadResult {
39    pub header: Header,
40    pub final_state: Value,
41    pub diagnostics: DiagnosticCollector,
42    pub observation_count: usize,
43}
44
45pub struct EventIterator<R: BufRead> {
46    reader: R,
47    pub diagnostics: DiagnosticCollector,
48    pub header: Header,
49    filename: String,
50    line_number: usize,
51}
52
53impl<R: BufRead> Iterator for EventIterator<R> {
54    type Item = Event;
55
56    fn next(&mut self) -> Option<Self::Item> {
57        let mut line = String::new();
58
59        loop {
60            line.clear();
61            self.line_number += 1;
62
63            match self.reader.read_line(&mut line) {
64                Ok(0) => return None, // EOF
65                Ok(_) => {
66                    let trimmed = line.trim();
67
68                    // Skip comments and blank lines
69                    if trimmed.starts_with('#') || trimmed.is_empty() {
70                        continue;
71                    }
72
73                    // Try to parse as event
74                    let event_deserializer = match serde_json::from_str::<EventDeserializer>(&line)
75                    {
76                        Ok(d) => d,
77                        Err(e) => {
78                            self.diagnostics.add(
79                                Diagnostic::fatal(
80                                    DiagnosticCode::InvalidEventJson,
81                                    format!("I couldn't parse this line as JSON: {}", e),
82                                )
83                                .with_location(self.filename.clone(), self.line_number)
84                                .with_snippet(format!("{} | {}", self.line_number, line.trim()))
85                                .with_advice(
86                                    "Each line after the header must be either:\n\
87                                     - A comment starting with #\n\
88                                     - A valid JSON array representing an event\n\n\
89                                     Check for missing commas, quotes, or brackets."
90                                        .to_string(),
91                                ),
92                            );
93                            continue;
94                        }
95                    };
96
97                    // Add any diagnostics from deserialization
98                    for diagnostic in event_deserializer.diagnostics {
99                        self.diagnostics.add(
100                            diagnostic
101                                .with_location(self.filename.clone(), self.line_number)
102                                .with_snippet(format!("{} | {}", self.line_number, line.trim())),
103                        );
104                    }
105
106                    // Return event if we have one
107                    if let Some(event) = event_deserializer.event {
108                        return Some(event);
109                    }
110
111                    // If no event but had diagnostics, continue to next line
112                    continue;
113                }
114                Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
115                    self.diagnostics.add(
116                        Diagnostic::fatal(
117                            DiagnosticCode::InvalidUtf8,
118                            format!("I found invalid UTF-8 bytes at line {}.", self.line_number)
119                        )
120                        .with_location(self.filename.clone(), self.line_number)
121                        .with_advice(
122                            "The JSON Archive format requires UTF-8 encoding. Make sure the file \
123                             was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
124                                .to_string()
125                        )
126                    );
127                    return None;
128                }
129                Err(_) => return None,
130            }
131        }
132    }
133}
134
135/// Parse header and create event iterator from any BufRead source.
136pub fn read_events<R: BufRead>(
137    mut reader: R,
138    filename: &str,
139) -> Result<(Value, EventIterator<R>), Diagnostic> {
140    let mut header_line = String::new();
141    let mut line_number = 0;
142
143    // Skip comment lines until we find the header
144    loop {
145        header_line.clear();
146        line_number += 1;
147
148        match reader.read_line(&mut header_line) {
149            Ok(0) => {
150                // Empty file or only comments
151                return Err(Diagnostic::fatal(
152                    DiagnosticCode::EmptyFile,
153                    "I found an empty file (or only comments), but I need at least a header line."
154                        .to_string(),
155                )
156                .with_location(filename.to_string(), line_number)
157                .with_advice(
158                    "See the file format specification for header structure.".to_string(),
159                ));
160            }
161            Ok(_) => {}
162            Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
163                // UTF-8 error
164                return Err(
165                    Diagnostic::fatal(
166                        DiagnosticCode::InvalidUtf8,
167                        format!("I found invalid UTF-8 bytes at line {}.", line_number)
168                    )
169                    .with_location(filename.to_string(), line_number)
170                    .with_advice(
171                        "The JSON Archive format requires UTF-8 encoding. Make sure the file \
172                         was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
173                            .to_string()
174                    )
175                );
176            }
177            Err(e) => {
178                return Err(Diagnostic::fatal(
179                    DiagnosticCode::PathNotFound,
180                    format!("I couldn't read from the archive: {}", e),
181                )
182                .with_location(filename.to_string(), line_number));
183            }
184        };
185
186        // Skip comment lines (lines starting with #)
187        let trimmed = header_line.trim_start();
188        if !trimmed.starts_with('#') {
189            break;
190        }
191    }
192
193    let header = parse_header(filename, &header_line, line_number)?;
194
195    let iterator = EventIterator {
196        reader,
197        diagnostics: DiagnosticCollector::new(),
198        header: header.clone(),
199        filename: filename.to_string(),
200        line_number,
201    };
202
203    Ok((header.initial, iterator))
204}
205
206/// Read all events and return final state.
207pub fn read_archive<R: BufRead>(
208    reader: R,
209    filename: &str,
210    mode: ReadMode,
211) -> Result<ReadResult, Diagnostic> {
212    let (initial_value, mut event_iter) = read_events(reader, filename)?;
213
214    let header = Header::new(initial_value.clone(), None);
215    let mut state = initial_value;
216    let mut seen_observations: HashSet<String> = HashSet::new();
217    let mut current_observation: Option<(String, usize, usize)> = None;
218    let mut events_in_observation = 0;
219    let mut observation_count = 0;
220
221    // Process events from iterator
222    while let Some(event) = event_iter.next() {
223        let line_number = event_iter.line_number;
224
225        match event {
226            Event::Observe {
227                observation_id,
228                timestamp: _,
229                change_count,
230            } => {
231                if let Some((_obs_id, obs_line, expected_count)) = &current_observation {
232                    if events_in_observation != *expected_count {
233                        event_iter.diagnostics.add(
234                            Diagnostic::new(
235                                DiagnosticLevel::Warning,
236                                DiagnosticCode::ChangeCountMismatch,
237                                format!(
238                                    "The observe event at line {} declared {} changes, but I found {}.",
239                                    obs_line, expected_count, events_in_observation
240                                )
241                            )
242                            .with_location(filename.to_string(), *obs_line)
243                            .with_advice(
244                                "Make sure the change_count in the observe event matches the number of \
245                                 add/change/remove/move events that follow it."
246                                    .to_string()
247                            )
248                        );
249                    }
250                }
251
252                if seen_observations.contains(&observation_id) {
253                    event_iter.diagnostics.add(
254                        Diagnostic::new(
255                            DiagnosticLevel::Warning,
256                            DiagnosticCode::DuplicateObservationId,
257                            format!("I found a duplicate observation ID: '{}'", observation_id),
258                        )
259                        .with_location(filename.to_string(), line_number)
260                        .with_advice(
261                            "Each observation ID should be unique within the archive. \
262                             Consider using UUIDs or timestamps to ensure uniqueness."
263                                .to_string(),
264                        ),
265                    );
266                }
267
268                seen_observations.insert(observation_id.clone());
269                current_observation = Some((observation_id, line_number, change_count));
270                events_in_observation = 0;
271                observation_count += 1;
272            }
273
274            Event::Add {
275                path,
276                value,
277                observation_id,
278            } => {
279                events_in_observation += 1;
280
281                if mode == ReadMode::FullValidation && !seen_observations.contains(&observation_id)
282                {
283                    event_iter.diagnostics.add(
284                        Diagnostic::fatal(
285                            DiagnosticCode::NonExistentObservationId,
286                            format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
287                        )
288                        .with_location(filename.to_string(), line_number)
289                        .with_advice(
290                            "Each add/change/remove/move event must reference an observation ID from a preceding observe event."
291                                .to_string()
292                        )
293                    );
294                    continue;
295                }
296
297                if let Err(diag) = apply_add(&mut state, &path, value) {
298                    event_iter
299                        .diagnostics
300                        .add(diag.with_location(filename.to_string(), line_number));
301                    continue;
302                }
303            }
304
305            Event::Change {
306                path,
307                new_value,
308                observation_id,
309            } => {
310                events_in_observation += 1;
311
312                if mode == ReadMode::FullValidation && !seen_observations.contains(&observation_id)
313                {
314                    event_iter.diagnostics.add(
315                        Diagnostic::fatal(
316                            DiagnosticCode::NonExistentObservationId,
317                            format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
318                        )
319                        .with_location(filename.to_string(), line_number)
320                    );
321                    continue;
322                }
323
324                if let Err(diag) = apply_change(&mut state, &path, new_value) {
325                    event_iter
326                        .diagnostics
327                        .add(diag.with_location(filename.to_string(), line_number));
328                    continue;
329                }
330            }
331
332            Event::Remove {
333                path,
334                observation_id,
335            } => {
336                events_in_observation += 1;
337
338                if mode == ReadMode::FullValidation && !seen_observations.contains(&observation_id)
339                {
340                    event_iter.diagnostics.add(
341                        Diagnostic::fatal(
342                            DiagnosticCode::NonExistentObservationId,
343                            format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
344                        )
345                        .with_location(filename.to_string(), line_number)
346                    );
347                    continue;
348                }
349
350                if let Err(diag) = apply_remove(&mut state, &path) {
351                    event_iter
352                        .diagnostics
353                        .add(diag.with_location(filename.to_string(), line_number));
354                    continue;
355                }
356            }
357
358            Event::Move {
359                path,
360                moves,
361                observation_id,
362            } => {
363                events_in_observation += 1;
364
365                if mode == ReadMode::FullValidation && !seen_observations.contains(&observation_id)
366                {
367                    event_iter.diagnostics.add(
368                        Diagnostic::fatal(
369                            DiagnosticCode::NonExistentObservationId,
370                            format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
371                        )
372                        .with_location(filename.to_string(), line_number)
373                    );
374                    continue;
375                }
376
377                if let Err(diag) = apply_move(&mut state, &path, moves) {
378                    event_iter
379                        .diagnostics
380                        .add(diag.with_location(filename.to_string(), line_number));
381                    continue;
382                }
383            }
384
385            Event::Snapshot {
386                observation_id: _,
387                timestamp: _,
388                object,
389            } => {
390                if mode == ReadMode::FullValidation && state != object {
391                    event_iter.diagnostics.add(
392                        Diagnostic::fatal(
393                            DiagnosticCode::SnapshotStateMismatch,
394                            "I found a snapshot whose state doesn't match the replayed state up to this point.".to_string()
395                        )
396                        .with_location(filename.to_string(), line_number)
397                        .with_advice(
398                            "This could indicate corruption or that events were applied incorrectly. \
399                             The snapshot state should exactly match the result of replaying all events \
400                             from the initial state."
401                                .to_string()
402                        )
403                    );
404                }
405
406                state = object;
407            }
408        }
409    }
410
411    if let Some((_obs_id, obs_line, expected_count)) = &current_observation {
412        if events_in_observation != *expected_count {
413            event_iter.diagnostics.add(
414                Diagnostic::new(
415                    DiagnosticLevel::Warning,
416                    DiagnosticCode::ChangeCountMismatch,
417                    format!(
418                        "The observe event at line {} declared {} changes, but I found {}.",
419                        obs_line, expected_count, events_in_observation
420                    ),
421                )
422                .with_location(filename.to_string(), *obs_line),
423            );
424        }
425    }
426
427    Ok(ReadResult {
428        header,
429        final_state: state,
430        diagnostics: event_iter.diagnostics,
431        observation_count,
432    })
433}
434
435fn parse_header(filename: &str, line: &str, line_number: usize) -> Result<Header, Diagnostic> {
436    let value: Value = serde_json::from_str(line).map_err(|e| {
437        Diagnostic::fatal(
438            DiagnosticCode::MissingHeader,
439            format!("I couldn't parse the header as JSON: {}", e),
440        )
441        .with_location(filename.to_string(), line_number)
442        .with_snippet(format!("{} | {}", line_number, line))
443        .with_advice(
444            "The first line must be a JSON object containing the archive header.\n\
445             Required fields: type, version, created, initial"
446                .to_string(),
447        )
448    })?;
449
450    let header = serde_json::from_value::<Header>(value).map_err(|e| {
451        Diagnostic::fatal(
452            DiagnosticCode::MissingHeaderField,
453            format!("I couldn't parse the header: {}", e),
454        )
455        .with_location(filename.to_string(), line_number)
456        .with_snippet(format!("{} | {}", line_number, line))
457        .with_advice(
458            "The header must contain:\n\
459             - type: \"@peoplesgrocers/json-archive\"\n\
460             - version: 1\n\
461             - created: an ISO-8601 timestamp\n\
462             - initial: the initial state object"
463                .to_string(),
464        )
465    })?;
466
467    if header.version != 1 {
468        return Err(Diagnostic::fatal(
469            DiagnosticCode::UnsupportedVersion,
470            format!(
471                "I found version {}, but I only support version 1.",
472                header.version
473            ),
474        )
475        .with_location(filename.to_string(), line_number)
476        .with_advice(
477            "This archive was created with a newer or older version of the format. \
478                 You may need to upgrade your tools or convert the archive."
479                .to_string(),
480        ));
481    }
482
483    Ok(header)
484}
485
486pub fn apply_add(state: &mut Value, path: &str, value: Value) -> Result<(), Diagnostic> {
487    let pointer = JsonPointer::new(path).map_err(|diag| {
488        diag.with_advice(
489            "JSON Pointer paths must start with '/' and use '/' to separate segments.\n\
490             Special characters: use ~0 for ~ and ~1 for /"
491                .to_string(),
492        )
493    })?;
494
495    pointer.set(state, value).map_err(|diag| {
496        diag.with_advice(
497            "For add operations, the parent path must exist. \
498             For example, to add /a/b/c, the paths /a and /a/b must already exist."
499                .to_string(),
500        )
501    })
502}
503
504pub fn apply_change(state: &mut Value, path: &str, new_value: Value) -> Result<(), Diagnostic> {
505    let pointer = JsonPointer::new(path)?;
506    pointer.set(state, new_value)?;
507    Ok(())
508}
509
510pub fn apply_remove(state: &mut Value, path: &str) -> Result<(), Diagnostic> {
511    let pointer = JsonPointer::new(path)?;
512    pointer.remove(state)?;
513    Ok(())
514}
515
516pub fn apply_move(
517    state: &mut Value,
518    path: &str,
519    moves: Vec<(usize, usize)>,
520) -> Result<(), Diagnostic> {
521    let pointer = JsonPointer::new(path)?;
522
523    let array = pointer.get_mut(state)?;
524
525    if !array.is_array() {
526        return Err(Diagnostic::fatal(
527            DiagnosticCode::MoveOnNonArray,
528            format!(
529                "I can't apply move operations to '{}' because it's not an array.",
530                path
531            ),
532        )
533        .with_advice(
534            "Move operations can only reorder elements within an array. \
535                 The path must point to an array value."
536                .to_string(),
537        ));
538    }
539
540    let arr = array.as_array_mut().unwrap();
541
542    // Validate all moves upfront before mutating
543    for (from_idx, to_idx) in &moves {
544        if *from_idx >= arr.len() {
545            return Err(Diagnostic::fatal(
546                DiagnosticCode::MoveIndexOutOfBounds,
547                format!(
548                    "The 'from' index {} is out of bounds (array length is {}).",
549                    from_idx,
550                    arr.len()
551                ),
552            ));
553        }
554
555        if *to_idx > arr.len() {
556            return Err(Diagnostic::fatal(
557                DiagnosticCode::MoveIndexOutOfBounds,
558                format!(
559                    "The 'to' index {} is out of bounds (array length is {}).",
560                    to_idx,
561                    arr.len()
562                ),
563            ));
564        }
565    }
566
567    // Apply moves now that we know they're all valid
568    for (from_idx, to_idx) in moves {
569        let element = arr.remove(from_idx);
570        let insert_idx = if to_idx > from_idx {
571            to_idx - 1
572        } else {
573            to_idx
574        };
575        arr.insert(insert_idx, element);
576    }
577
578    Ok(())
579}
580
581#[cfg(test)]
582mod tests {
583    use super::*;
584    use serde_json::json;
585    use std::fs::File;
586    use std::io::{BufReader, Write};
587    use tempfile::NamedTempFile;
588
589    #[test]
590    fn test_read_valid_archive() {
591        let mut temp_file = NamedTempFile::new().unwrap();
592
593        let header = Header::new(json!({"count": 0}), Some("test".to_string()));
594        writeln!(temp_file, "{}", serde_json::to_string(&header).unwrap()).unwrap();
595        writeln!(
596            temp_file,
597            r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 1]"#
598        )
599        .unwrap();
600        writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#).unwrap();
601
602        let file = File::open(temp_file.path()).unwrap();
603        let reader = BufReader::new(file);
604        let result = read_archive(
605            reader,
606            &temp_file.path().display().to_string(),
607            ReadMode::FullValidation,
608        )
609        .unwrap();
610
611        assert_eq!(result.final_state, json!({"count": 1}));
612        assert_eq!(result.observation_count, 1);
613        assert!(!result.diagnostics.has_fatal());
614    }
615
616    #[test]
617    fn test_empty_file() {
618        let temp_file = NamedTempFile::new().unwrap();
619
620        let file = File::open(temp_file.path()).unwrap();
621        let reader = BufReader::new(file);
622        let result = read_archive(
623            reader,
624            &temp_file.path().display().to_string(),
625            ReadMode::FullValidation,
626        );
627
628        assert!(result.is_err());
629    }
630
631    #[test]
632    fn test_non_existent_observation_id() {
633        let mut temp_file = NamedTempFile::new().unwrap();
634
635        let header = Header::new(json!({"count": 0}), None);
636        writeln!(temp_file, "{}", serde_json::to_string(&header).unwrap()).unwrap();
637        writeln!(temp_file, r#"["change", "/count", 1, "obs-999"]"#).unwrap();
638
639        let file = File::open(temp_file.path()).unwrap();
640        let reader = BufReader::new(file);
641        let result = read_archive(
642            reader,
643            &temp_file.path().display().to_string(),
644            ReadMode::FullValidation,
645        )
646        .unwrap();
647
648        assert!(result.diagnostics.has_fatal());
649    }
650
651    #[test]
652    fn test_append_mode_ignores_observation_id() {
653        let mut temp_file = NamedTempFile::new().unwrap();
654
655        let header = Header::new(json!({"count": 0}), None);
656        writeln!(temp_file, "{}", serde_json::to_string(&header).unwrap()).unwrap();
657        writeln!(temp_file, r#"["change", "/count", 1, "obs-999"]"#).unwrap();
658
659        let file = File::open(temp_file.path()).unwrap();
660        let reader = BufReader::new(file);
661        let result = read_archive(
662            reader,
663            &temp_file.path().display().to_string(),
664            ReadMode::AppendSeek,
665        )
666        .unwrap();
667
668        assert!(!result.diagnostics.has_fatal());
669        assert_eq!(result.final_state, json!({"count": 1}));
670    }
671
672    #[test]
673    fn test_change_count_mismatch() {
674        let mut temp_file = NamedTempFile::new().unwrap();
675
676        let header = Header::new(json!({"count": 0}), None);
677        writeln!(temp_file, "{}", serde_json::to_string(&header).unwrap()).unwrap();
678        writeln!(
679            temp_file,
680            r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 2]"#
681        )
682        .unwrap();
683        writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#).unwrap();
684
685        let file = File::open(temp_file.path()).unwrap();
686        let reader = BufReader::new(file);
687        let result = read_archive(
688            reader,
689            &temp_file.path().display().to_string(),
690            ReadMode::FullValidation,
691        )
692        .unwrap();
693
694        let warnings: Vec<_> = result
695            .diagnostics
696            .diagnostics()
697            .iter()
698            .filter(|d| d.level == DiagnosticLevel::Warning)
699            .collect();
700
701        assert_eq!(warnings.len(), 1);
702    }
703
704    #[test]
705    fn test_simple_change() {
706        let mut temp_file = NamedTempFile::new().unwrap();
707
708        let header = Header::new(json!({"count": 5}), None);
709        writeln!(temp_file, "{}", serde_json::to_string(&header).unwrap()).unwrap();
710        writeln!(
711            temp_file,
712            r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 1]"#
713        )
714        .unwrap();
715        writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#).unwrap();
716
717        let file = File::open(temp_file.path()).unwrap();
718        let reader = BufReader::new(file);
719        let result = read_archive(
720            reader,
721            &temp_file.path().display().to_string(),
722            ReadMode::FullValidation,
723        )
724        .unwrap();
725
726        assert!(!result.diagnostics.has_fatal());
727        assert_eq!(result.final_state, json!({"count": 1}));
728    }
729}