1use serde_json::Value;
23use std::collections::HashSet;
24use std::fs::File;
25use std::io::{BufRead, BufReader, Read};
26use std::path::Path;
27
28use crate::diagnostics::{Diagnostic, DiagnosticCode, DiagnosticCollector, DiagnosticLevel};
29use crate::event_deserialize::EventDeserializer;
30use crate::events::{Event, Header};
31use crate::pointer::JsonPointer;
32use crate::detection::{CompressionFormat, detect_compression_format};
33
34#[cfg(feature = "compression")]
35use flate2::read::{DeflateDecoder, GzDecoder, ZlibDecoder};
36#[cfg(feature = "compression")]
37use brotli::Decompressor;
38#[cfg(feature = "compression")]
39use zstd::stream::read::Decoder as ZstdDecoder;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ReadMode {
43 FullValidation,
44 AppendSeek,
45}
46
47pub struct ArchiveReader {
48 mode: ReadMode,
49 filename: String,
50}
51
52#[derive(Debug)]
53pub struct ReadResult {
54 pub header: Header,
55 pub final_state: Value,
56 pub diagnostics: DiagnosticCollector,
57 pub observation_count: usize,
58}
59
60pub struct EventIterator {
61 reader: Box<dyn BufRead>,
62 pub diagnostics: DiagnosticCollector,
63 pub header: Header,
64 filename: String,
65 line_number: usize,
66}
67
68impl Iterator for EventIterator {
69 type Item = Event;
70
71 fn next(&mut self) -> Option<Self::Item> {
72 let mut line = String::new();
73
74 loop {
75 line.clear();
76 self.line_number += 1;
77
78 match self.reader.read_line(&mut line) {
79 Ok(0) => return None, Ok(_) => {
81 let trimmed = line.trim();
82
83 if trimmed.starts_with('#') || trimmed.is_empty() {
85 continue;
86 }
87
88 let event_deserializer = match serde_json::from_str::<EventDeserializer>(&line) {
90 Ok(d) => d,
91 Err(e) => {
92 self.diagnostics.add(
93 Diagnostic::fatal(
94 DiagnosticCode::InvalidEventJson,
95 format!("I couldn't parse this line as JSON: {}", e),
96 )
97 .with_location(self.filename.clone(), self.line_number)
98 .with_snippet(format!("{} | {}", self.line_number, line.trim()))
99 .with_advice(
100 "Each line after the header must be either:\n\
101 - A comment starting with #\n\
102 - A valid JSON array representing an event\n\n\
103 Check for missing commas, quotes, or brackets."
104 .to_string(),
105 ),
106 );
107 continue;
108 }
109 };
110
111 for diagnostic in event_deserializer.diagnostics {
113 self.diagnostics.add(
114 diagnostic
115 .with_location(self.filename.clone(), self.line_number)
116 .with_snippet(format!("{} | {}", self.line_number, line.trim()))
117 );
118 }
119
120 if let Some(event) = event_deserializer.event {
122 return Some(event);
123 }
124
125 continue;
127 }
128 Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
129 self.diagnostics.add(
130 Diagnostic::fatal(
131 DiagnosticCode::InvalidUtf8,
132 format!("I found invalid UTF-8 bytes at line {}.", self.line_number)
133 )
134 .with_location(self.filename.clone(), self.line_number)
135 .with_advice(
136 "The JSON Archive format requires UTF-8 encoding. Make sure the file \
137 was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
138 .to_string()
139 )
140 );
141 return None;
142 }
143 Err(_) => return None,
144 }
145 }
146 }
147}
148
149impl ArchiveReader {
150 pub fn new<P: AsRef<Path>>(path: P, mode: ReadMode) -> std::io::Result<Self> {
151 let filename = path.as_ref().display().to_string();
152 Ok(Self { mode, filename })
153 }
154
155 pub fn events<P: AsRef<Path>>(&self, path: P) -> std::io::Result<(Value, EventIterator)> {
156 let path = path.as_ref();
157 let mut file = File::open(path)?;
158
159 let mut magic_bytes = [0u8; 4];
161 let bytes_read = file.read(&mut magic_bytes)?;
162 let compression_format = detect_compression_format(path, &magic_bytes[..bytes_read]);
163
164 file = File::open(path)?;
166
167 let mut diagnostics = DiagnosticCollector::new();
168
169 #[cfg(not(feature = "compression"))]
171 if compression_format != CompressionFormat::None {
172 let format_name = match compression_format {
173 CompressionFormat::Gzip => "gzip",
174 CompressionFormat::Deflate => "deflate",
175 CompressionFormat::Zlib => "zlib",
176 CompressionFormat::Brotli => "brotli",
177 CompressionFormat::Zstd => "zstd",
178 CompressionFormat::None => unreachable!(),
179 };
180
181 diagnostics.add(
182 Diagnostic::fatal(
183 DiagnosticCode::UnsupportedVersion,
184 format!("I detected a {}-compressed archive, but this build doesn't support compression.", format_name)
185 )
186 .with_location(self.filename.clone(), 1)
187 .with_advice(
188 "This binary was built without compression support to reduce binary size and dependencies.\n\
189 You have two options:\n\
190 1. Install the version with compression support: cargo install json-archive --features compression\n\
191 2. Manually decompress the file first, then use this tool on the uncompressed archive"
192 .to_string()
193 )
194 );
195
196 let iterator = EventIterator {
198 reader: Box::new(BufReader::new(std::io::empty())),
199 diagnostics,
200 header: Header::new(Value::Null, None),
201 filename: self.filename.clone(),
202 line_number: 1,
203 };
204 return Ok((Value::Null, iterator));
205 }
206
207 #[cfg(feature = "compression")]
209 let reader: Box<dyn BufRead> = match compression_format {
210 CompressionFormat::Gzip => Box::new(BufReader::new(GzDecoder::new(file))),
211 CompressionFormat::Deflate => Box::new(BufReader::new(DeflateDecoder::new(file))),
212 CompressionFormat::Zlib => Box::new(BufReader::new(ZlibDecoder::new(file))),
213 CompressionFormat::Brotli => Box::new(BufReader::new(Decompressor::new(file, 4096))),
214 CompressionFormat::Zstd => Box::new(BufReader::new(ZstdDecoder::new(file)?)),
215 CompressionFormat::None => Box::new(BufReader::new(file)),
216 };
217
218 #[cfg(not(feature = "compression"))]
219 let reader: Box<dyn BufRead> = Box::new(BufReader::new(file));
220
221 let mut reader = reader;
222 let mut header_line = String::new();
223
224 let _bytes_read = match reader.read_line(&mut header_line) {
225 Ok(0) => {
226 diagnostics.add(
228 Diagnostic::fatal(
229 DiagnosticCode::EmptyFile,
230 "I found an empty file, but I need at least a header line.".to_string(),
231 )
232 .with_location(self.filename.clone(), 1)
233 .with_advice(
234 "See the file format specification for header structure."
235 .to_string(),
236 ),
237 );
238 let iterator = EventIterator {
239 reader,
240 diagnostics,
241 header: Header::new(Value::Null, None),
242 filename: self.filename.clone(),
243 line_number: 1,
244 };
245 return Ok((Value::Null, iterator));
246 }
247 Ok(n) => n,
248 Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
249 diagnostics.add(
251 Diagnostic::fatal(
252 DiagnosticCode::InvalidUtf8,
253 "I found invalid UTF-8 bytes at line 1.".to_string()
254 )
255 .with_location(self.filename.clone(), 1)
256 .with_advice(
257 "The JSON Archive format requires UTF-8 encoding. Make sure the file \
258 was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
259 .to_string()
260 )
261 );
262 let iterator = EventIterator {
263 reader,
264 diagnostics,
265 header: Header::new(Value::Null, None),
266 filename: self.filename.clone(),
267 line_number: 1,
268 };
269 return Ok((Value::Null, iterator));
270 }
271 Err(e) => return Err(e),
272 };
273
274 let header = match self.parse_header(&header_line, 1, &mut diagnostics) {
275 Some(h) => h,
276 None => {
277 let iterator = EventIterator {
278 reader,
279 diagnostics,
280 header: Header::new(Value::Null, None),
281 filename: self.filename.clone(),
282 line_number: 1,
283 };
284 return Ok((Value::Null, iterator));
285 }
286 };
287
288 let iterator = EventIterator {
289 reader,
290 diagnostics,
291 header: header.clone(),
292 filename: self.filename.clone(),
293 line_number: 1,
294 };
295
296 Ok((header.initial, iterator))
297 }
298
299 pub fn read<P: AsRef<Path>>(&self, path: P) -> std::io::Result<ReadResult> {
300 let (initial_value, mut event_iter) = self.events(&path)?;
301
302 if event_iter.diagnostics.has_fatal() {
304 return Ok(ReadResult {
305 header: Header::new(Value::Null, None),
306 final_state: Value::Null,
307 diagnostics: event_iter.diagnostics,
308 observation_count: 0,
309 });
310 }
311
312 let header = Header::new(initial_value.clone(), None);
313 let mut state = initial_value;
314 let mut seen_observations: HashSet<String> = HashSet::new();
315 let mut current_observation: Option<(String, usize, usize)> = None;
316 let mut events_in_observation = 0;
317 let mut observation_count = 0;
318
319 while let Some(event) = event_iter.next() {
321 let line_number = event_iter.line_number;
322
323 match event {
324 Event::Observe { observation_id, timestamp: _, change_count } => {
325 if let Some((_obs_id, obs_line, expected_count)) = ¤t_observation {
326 if events_in_observation != *expected_count {
327 event_iter.diagnostics.add(
328 Diagnostic::new(
329 DiagnosticLevel::Warning,
330 DiagnosticCode::ChangeCountMismatch,
331 format!(
332 "The observe event at line {} declared {} changes, but I found {}.",
333 obs_line, expected_count, events_in_observation
334 )
335 )
336 .with_location(self.filename.clone(), *obs_line)
337 .with_advice(
338 "Make sure the change_count in the observe event matches the number of \
339 add/change/remove/move events that follow it."
340 .to_string()
341 )
342 );
343 }
344 }
345
346 if seen_observations.contains(&observation_id) {
347 event_iter.diagnostics.add(
348 Diagnostic::new(
349 DiagnosticLevel::Warning,
350 DiagnosticCode::DuplicateObservationId,
351 format!("I found a duplicate observation ID: '{}'", observation_id),
352 )
353 .with_location(self.filename.clone(), line_number)
354 .with_advice(
355 "Each observation ID should be unique within the archive. \
356 Consider using UUIDs or timestamps to ensure uniqueness."
357 .to_string(),
358 ),
359 );
360 }
361
362 seen_observations.insert(observation_id.clone());
363 current_observation = Some((observation_id, line_number, change_count));
364 events_in_observation = 0;
365 observation_count += 1;
366 }
367
368 Event::Add { path, value, observation_id } => {
369 events_in_observation += 1;
370
371 if self.mode == ReadMode::FullValidation
372 && !seen_observations.contains(&observation_id)
373 {
374 event_iter.diagnostics.add(
375 Diagnostic::fatal(
376 DiagnosticCode::NonExistentObservationId,
377 format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
378 )
379 .with_location(self.filename.clone(), line_number)
380 .with_advice(
381 "Each add/change/remove/move event must reference an observation ID from a preceding observe event."
382 .to_string()
383 )
384 );
385 continue;
386 }
387
388 if let Err(diag) = apply_add(&mut state, &path, value) {
389 event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
390 continue;
391 }
392 }
393
394 Event::Change { path, new_value, observation_id } => {
395 events_in_observation += 1;
396
397 if self.mode == ReadMode::FullValidation
398 && !seen_observations.contains(&observation_id)
399 {
400 event_iter.diagnostics.add(
401 Diagnostic::fatal(
402 DiagnosticCode::NonExistentObservationId,
403 format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
404 )
405 .with_location(self.filename.clone(), line_number)
406 );
407 continue;
408 }
409
410 if let Err(diag) = apply_change(&mut state, &path, new_value) {
411 event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
412 continue;
413 }
414 }
415
416 Event::Remove { path, observation_id } => {
417 events_in_observation += 1;
418
419 if self.mode == ReadMode::FullValidation
420 && !seen_observations.contains(&observation_id)
421 {
422 event_iter.diagnostics.add(
423 Diagnostic::fatal(
424 DiagnosticCode::NonExistentObservationId,
425 format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
426 )
427 .with_location(self.filename.clone(), line_number)
428 );
429 continue;
430 }
431
432 if let Err(diag) = apply_remove(&mut state, &path) {
433 event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
434 continue;
435 }
436 }
437
438 Event::Move { path, moves, observation_id } => {
439 events_in_observation += 1;
440
441 if self.mode == ReadMode::FullValidation
442 && !seen_observations.contains(&observation_id)
443 {
444 event_iter.diagnostics.add(
445 Diagnostic::fatal(
446 DiagnosticCode::NonExistentObservationId,
447 format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
448 )
449 .with_location(self.filename.clone(), line_number)
450 );
451 continue;
452 }
453
454 if let Err(diag) = apply_move(&mut state, &path, moves) {
455 event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
456 continue;
457 }
458 }
459
460 Event::Snapshot { observation_id: _, timestamp: _, object } => {
461 if self.mode == ReadMode::FullValidation && state != object {
462 event_iter.diagnostics.add(
463 Diagnostic::fatal(
464 DiagnosticCode::SnapshotStateMismatch,
465 "I found a snapshot whose state doesn't match the replayed state up to this point.".to_string()
466 )
467 .with_location(self.filename.clone(), line_number)
468 .with_advice(
469 "This could indicate corruption or that events were applied incorrectly. \
470 The snapshot state should exactly match the result of replaying all events \
471 from the initial state."
472 .to_string()
473 )
474 );
475 }
476
477 state = object;
478 }
479 }
480 }
481
482 if let Some((_obs_id, obs_line, expected_count)) = ¤t_observation {
483 if events_in_observation != *expected_count {
484 event_iter.diagnostics.add(
485 Diagnostic::new(
486 DiagnosticLevel::Warning,
487 DiagnosticCode::ChangeCountMismatch,
488 format!(
489 "The observe event at line {} declared {} changes, but I found {}.",
490 obs_line, expected_count, events_in_observation
491 ),
492 )
493 .with_location(self.filename.clone(), *obs_line),
494 );
495 }
496 }
497
498 Ok(ReadResult {
499 header,
500 final_state: state,
501 diagnostics: event_iter.diagnostics,
502 observation_count,
503 })
504 }
505
506 fn parse_header(
507 &self,
508 line: &str,
509 line_number: usize,
510 diagnostics: &mut DiagnosticCollector,
511 ) -> Option<Header> {
512 let value: Value = match serde_json::from_str(line) {
513 Ok(v) => v,
514 Err(e) => {
515 diagnostics.add(
516 Diagnostic::fatal(
517 DiagnosticCode::MissingHeader,
518 format!("I couldn't parse the header as JSON: {}", e),
519 )
520 .with_location(self.filename.clone(), line_number)
521 .with_snippet(format!("{} | {}", line_number, line))
522 .with_advice(
523 "The first line must be a JSON object containing the archive header.\n\
524 Required fields: type, version, created, initial"
525 .to_string(),
526 ),
527 );
528 return None;
529 }
530 };
531
532 match serde_json::from_value::<Header>(value.clone()) {
533 Ok(header) => {
534 if header.version != 1 {
535 diagnostics.add(
536 Diagnostic::fatal(
537 DiagnosticCode::UnsupportedVersion,
538 format!("I found version {}, but I only support version 1.", header.version)
539 )
540 .with_location(self.filename.clone(), line_number)
541 .with_advice(
542 "This archive was created with a newer or older version of the format. \
543 You may need to upgrade your tools or convert the archive."
544 .to_string()
545 )
546 );
547 return None;
548 }
549
550 Some(header)
551 }
552 Err(e) => {
553 diagnostics.add(
554 Diagnostic::fatal(
555 DiagnosticCode::MissingHeaderField,
556 format!("I couldn't parse the header: {}", e),
557 )
558 .with_location(self.filename.clone(), line_number)
559 .with_snippet(format!("{} | {}", line_number, line))
560 .with_advice(
561 "The header must contain:\n\
562 - type: \"@peoplesgrocers/json-archive\"\n\
563 - version: 1\n\
564 - created: an ISO-8601 timestamp\n\
565 - initial: the initial state object"
566 .to_string(),
567 ),
568 );
569 None
570 }
571 }
572 }
573
574}
575
576pub fn apply_add(state: &mut Value, path: &str, value: Value) -> Result<(), Diagnostic> {
577 let pointer = JsonPointer::new(path).map_err(|diag| {
578 diag.with_advice(
579 "JSON Pointer paths must start with '/' and use '/' to separate segments.\n\
580 Special characters: use ~0 for ~ and ~1 for /"
581 .to_string()
582 )
583 })?;
584
585 pointer.set(state, value).map_err(|diag| {
586 diag.with_advice(
587 "For add operations, the parent path must exist. \
588 For example, to add /a/b/c, the paths /a and /a/b must already exist."
589 .to_string()
590 )
591 })
592}
593
594pub fn apply_change(state: &mut Value, path: &str, new_value: Value) -> Result<(), Diagnostic> {
595 let pointer = JsonPointer::new(path)?;
596 pointer.set(state, new_value)?;
597 Ok(())
598}
599
600pub fn apply_remove(state: &mut Value, path: &str) -> Result<(), Diagnostic> {
601 let pointer = JsonPointer::new(path)?;
602 pointer.remove(state)?;
603 Ok(())
604}
605
606pub fn apply_move(
607 state: &mut Value,
608 path: &str,
609 moves: Vec<(usize, usize)>,
610) -> Result<(), Diagnostic> {
611 let pointer = JsonPointer::new(path)?;
612
613 let array = pointer.get_mut(state)?;
614
615 if !array.is_array() {
616 return Err(
617 Diagnostic::fatal(
618 DiagnosticCode::MoveOnNonArray,
619 format!(
620 "I can't apply move operations to '{}' because it's not an array.",
621 path
622 ),
623 )
624 .with_advice(
625 "Move operations can only reorder elements within an array. \
626 The path must point to an array value."
627 .to_string(),
628 ),
629 );
630 }
631
632 let arr = array.as_array_mut().unwrap();
633
634 for (from_idx, to_idx) in &moves {
636 if *from_idx >= arr.len() {
637 return Err(Diagnostic::fatal(
638 DiagnosticCode::MoveIndexOutOfBounds,
639 format!(
640 "The 'from' index {} is out of bounds (array length is {}).",
641 from_idx,
642 arr.len()
643 ),
644 ));
645 }
646
647 if *to_idx > arr.len() {
648 return Err(Diagnostic::fatal(
649 DiagnosticCode::MoveIndexOutOfBounds,
650 format!(
651 "The 'to' index {} is out of bounds (array length is {}).",
652 to_idx,
653 arr.len()
654 ),
655 ));
656 }
657 }
658
659 for (from_idx, to_idx) in moves {
661 let element = arr.remove(from_idx);
662 let insert_idx = if to_idx > from_idx { to_idx - 1 } else { to_idx };
663 arr.insert(insert_idx, element);
664 }
665
666 Ok(())
667}
668
669#[cfg(test)]
670mod tests {
671 use super::*;
672 use serde_json::json;
673 use std::io::Write;
674 use tempfile::NamedTempFile;
675
676 #[test]
677 fn test_read_valid_archive() -> Result<(), Box<dyn std::error::Error>> {
678 let mut temp_file = NamedTempFile::new()?;
679
680 let header = Header::new(json!({"count": 0}), Some("test".to_string()));
681 writeln!(temp_file, "{}", serde_json::to_string(&header)?)?;
682 writeln!(
683 temp_file,
684 r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 1]"#
685 )?;
686 writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#)?;
687
688 let reader = ArchiveReader::new(temp_file.path(), ReadMode::FullValidation)?;
689 let result = reader.read(temp_file.path())?;
690
691 assert_eq!(result.final_state, json!({"count": 1}));
692 assert_eq!(result.observation_count, 1);
693 assert!(!result.diagnostics.has_fatal());
694
695 Ok(())
696 }
697
698 #[test]
699 fn test_empty_file() -> Result<(), Box<dyn std::error::Error>> {
700 let temp_file = NamedTempFile::new()?;
701
702 let reader = ArchiveReader::new(temp_file.path(), ReadMode::FullValidation)?;
703 let result = reader.read(temp_file.path())?;
704
705 assert!(result.diagnostics.has_fatal());
706 assert_eq!(result.diagnostics.len(), 1);
707
708 Ok(())
709 }
710
711 #[test]
712 fn test_non_existent_observation_id() -> Result<(), Box<dyn std::error::Error>> {
713 let mut temp_file = NamedTempFile::new()?;
714
715 let header = Header::new(json!({"count": 0}), None);
716 writeln!(temp_file, "{}", serde_json::to_string(&header)?)?;
717 writeln!(temp_file, r#"["change", "/count", 1, "obs-999"]"#)?;
718
719 let reader = ArchiveReader::new(temp_file.path(), ReadMode::FullValidation)?;
720 let result = reader.read(temp_file.path())?;
721
722 assert!(result.diagnostics.has_fatal());
723
724 Ok(())
725 }
726
727 #[test]
728 fn test_append_mode_ignores_observation_id() -> Result<(), Box<dyn std::error::Error>> {
729 let mut temp_file = NamedTempFile::new()?;
730
731 let header = Header::new(json!({"count": 0}), None);
732 writeln!(temp_file, "{}", serde_json::to_string(&header)?)?;
733 writeln!(temp_file, r#"["change", "/count", 1, "obs-999"]"#)?;
734
735 let reader = ArchiveReader::new(temp_file.path(), ReadMode::AppendSeek)?;
736 let result = reader.read(temp_file.path())?;
737
738 assert!(!result.diagnostics.has_fatal());
739 assert_eq!(result.final_state, json!({"count": 1}));
740
741 Ok(())
742 }
743
744 #[test]
745 fn test_change_count_mismatch() -> Result<(), Box<dyn std::error::Error>> {
746 let mut temp_file = NamedTempFile::new()?;
747
748 let header = Header::new(json!({"count": 0}), None);
749 writeln!(temp_file, "{}", serde_json::to_string(&header)?)?;
750 writeln!(
751 temp_file,
752 r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 2]"#
753 )?;
754 writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#)?;
755
756 let reader = ArchiveReader::new(temp_file.path(), ReadMode::FullValidation)?;
757 let result = reader.read(temp_file.path())?;
758
759 let warnings: Vec<_> = result
760 .diagnostics
761 .diagnostics()
762 .iter()
763 .filter(|d| d.level == DiagnosticLevel::Warning)
764 .collect();
765
766 assert_eq!(warnings.len(), 1);
767
768 Ok(())
769 }
770
771 #[test]
772 fn test_simple_change() -> Result<(), Box<dyn std::error::Error>> {
773 let mut temp_file = NamedTempFile::new()?;
774
775 let header = Header::new(json!({"count": 5}), None);
776 writeln!(temp_file, "{}", serde_json::to_string(&header)?)?;
777 writeln!(
778 temp_file,
779 r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 1]"#
780 )?;
781 writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#)?;
782
783 let reader = ArchiveReader::new(temp_file.path(), ReadMode::FullValidation)?;
784 let result = reader.read(temp_file.path())?;
785
786 assert!(!result.diagnostics.has_fatal());
787 assert_eq!(result.final_state, json!({"count": 1}));
788
789 Ok(())
790 }
791}