1use serde_json::Value;
23use std::collections::HashSet;
24use std::io::BufRead;
25
26use crate::diagnostics::{Diagnostic, DiagnosticCode, DiagnosticCollector, DiagnosticLevel};
27use crate::event_deserialize::EventDeserializer;
28use crate::events::{Event, Header};
29use crate::pointer::JsonPointer;
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum ReadMode {
33 FullValidation,
34 AppendSeek,
35}
36
37#[derive(Debug)]
38pub struct ReadResult {
39 pub header: Header,
40 pub final_state: Value,
41 pub diagnostics: DiagnosticCollector,
42 pub observation_count: usize,
43}
44
45pub struct EventIterator<R: BufRead> {
46 reader: R,
47 pub diagnostics: DiagnosticCollector,
48 pub header: Header,
49 filename: String,
50 line_number: usize,
51}
52
53impl<R: BufRead> Iterator for EventIterator<R> {
54 type Item = Event;
55
56 fn next(&mut self) -> Option<Self::Item> {
57 let mut line = String::new();
58
59 loop {
60 line.clear();
61 self.line_number += 1;
62
63 match self.reader.read_line(&mut line) {
64 Ok(0) => return None, Ok(_) => {
66 let trimmed = line.trim();
67
68 if trimmed.starts_with('#') || trimmed.is_empty() {
70 continue;
71 }
72
73 let event_deserializer = match serde_json::from_str::<EventDeserializer>(&line)
75 {
76 Ok(d) => d,
77 Err(e) => {
78 self.diagnostics.add(
79 Diagnostic::fatal(
80 DiagnosticCode::InvalidEventJson,
81 format!("I couldn't parse this line as JSON: {}", e),
82 )
83 .with_location(self.filename.clone(), self.line_number)
84 .with_snippet(format!("{} | {}", self.line_number, line.trim()))
85 .with_advice(
86 "Each line after the header must be either:\n\
87 - A comment starting with #\n\
88 - A valid JSON array representing an event\n\n\
89 Check for missing commas, quotes, or brackets."
90 .to_string(),
91 ),
92 );
93 continue;
94 }
95 };
96
97 for diagnostic in event_deserializer.diagnostics {
99 self.diagnostics.add(
100 diagnostic
101 .with_location(self.filename.clone(), self.line_number)
102 .with_snippet(format!("{} | {}", self.line_number, line.trim())),
103 );
104 }
105
106 if let Some(event) = event_deserializer.event {
108 return Some(event);
109 }
110
111 continue;
113 }
114 Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
115 self.diagnostics.add(
116 Diagnostic::fatal(
117 DiagnosticCode::InvalidUtf8,
118 format!("I found invalid UTF-8 bytes at line {}.", self.line_number)
119 )
120 .with_location(self.filename.clone(), self.line_number)
121 .with_advice(
122 "The JSON Archive format requires UTF-8 encoding. Make sure the file \
123 was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
124 .to_string()
125 )
126 );
127 return None;
128 }
129 Err(_) => return None,
130 }
131 }
132 }
133}
134
135pub fn read_events<R: BufRead>(
137 mut reader: R,
138 filename: &str,
139) -> Result<(Value, EventIterator<R>), Diagnostic> {
140 let mut header_line = String::new();
141 let mut line_number = 0;
142
143 loop {
145 header_line.clear();
146 line_number += 1;
147
148 match reader.read_line(&mut header_line) {
149 Ok(0) => {
150 return Err(Diagnostic::fatal(
152 DiagnosticCode::EmptyFile,
153 "I found an empty file (or only comments), but I need at least a header line."
154 .to_string(),
155 )
156 .with_location(filename.to_string(), line_number)
157 .with_advice(
158 "See the file format specification for header structure.".to_string(),
159 ));
160 }
161 Ok(_) => {}
162 Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
163 return Err(
165 Diagnostic::fatal(
166 DiagnosticCode::InvalidUtf8,
167 format!("I found invalid UTF-8 bytes at line {}.", line_number)
168 )
169 .with_location(filename.to_string(), line_number)
170 .with_advice(
171 "The JSON Archive format requires UTF-8 encoding. Make sure the file \
172 was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
173 .to_string()
174 )
175 );
176 }
177 Err(e) => {
178 return Err(Diagnostic::fatal(
179 DiagnosticCode::PathNotFound,
180 format!("I couldn't read from the archive: {}", e),
181 )
182 .with_location(filename.to_string(), line_number));
183 }
184 };
185
186 let trimmed = header_line.trim_start();
188 if !trimmed.starts_with('#') {
189 break;
190 }
191 }
192
193 let header = parse_header(filename, &header_line, line_number)?;
194
195 let iterator = EventIterator {
196 reader,
197 diagnostics: DiagnosticCollector::new(),
198 header: header.clone(),
199 filename: filename.to_string(),
200 line_number,
201 };
202
203 Ok((header.initial, iterator))
204}
205
206pub fn read_archive<R: BufRead>(
208 reader: R,
209 filename: &str,
210 mode: ReadMode,
211) -> Result<ReadResult, Diagnostic> {
212 let (initial_value, mut event_iter) = read_events(reader, filename)?;
213
214 let header = Header::new(initial_value.clone(), None);
215 let mut state = initial_value;
216 let mut seen_observations: HashSet<String> = HashSet::new();
217 let mut current_observation: Option<(String, usize, usize)> = None;
218 let mut events_in_observation = 0;
219 let mut observation_count = 0;
220
221 while let Some(event) = event_iter.next() {
223 let line_number = event_iter.line_number;
224
225 match event {
226 Event::Observe {
227 observation_id,
228 timestamp: _,
229 change_count,
230 } => {
231 if let Some((_obs_id, obs_line, expected_count)) = ¤t_observation {
232 if events_in_observation != *expected_count {
233 event_iter.diagnostics.add(
234 Diagnostic::new(
235 DiagnosticLevel::Warning,
236 DiagnosticCode::ChangeCountMismatch,
237 format!(
238 "The observe event at line {} declared {} changes, but I found {}.",
239 obs_line, expected_count, events_in_observation
240 )
241 )
242 .with_location(filename.to_string(), *obs_line)
243 .with_advice(
244 "Make sure the change_count in the observe event matches the number of \
245 add/change/remove/move events that follow it."
246 .to_string()
247 )
248 );
249 }
250 }
251
252 if seen_observations.contains(&observation_id) {
253 event_iter.diagnostics.add(
254 Diagnostic::new(
255 DiagnosticLevel::Warning,
256 DiagnosticCode::DuplicateObservationId,
257 format!("I found a duplicate observation ID: '{}'", observation_id),
258 )
259 .with_location(filename.to_string(), line_number)
260 .with_advice(
261 "Each observation ID should be unique within the archive. \
262 Consider using UUIDs or timestamps to ensure uniqueness."
263 .to_string(),
264 ),
265 );
266 }
267
268 seen_observations.insert(observation_id.clone());
269 current_observation = Some((observation_id, line_number, change_count));
270 events_in_observation = 0;
271 observation_count += 1;
272 }
273
274 Event::Add {
275 path,
276 value,
277 observation_id,
278 } => {
279 events_in_observation += 1;
280
281 if mode == ReadMode::FullValidation && !seen_observations.contains(&observation_id)
282 {
283 event_iter.diagnostics.add(
284 Diagnostic::fatal(
285 DiagnosticCode::NonExistentObservationId,
286 format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
287 )
288 .with_location(filename.to_string(), line_number)
289 .with_advice(
290 "Each add/change/remove/move event must reference an observation ID from a preceding observe event."
291 .to_string()
292 )
293 );
294 continue;
295 }
296
297 if let Err(diag) = apply_add(&mut state, &path, value) {
298 event_iter
299 .diagnostics
300 .add(diag.with_location(filename.to_string(), line_number));
301 continue;
302 }
303 }
304
305 Event::Change {
306 path,
307 new_value,
308 observation_id,
309 } => {
310 events_in_observation += 1;
311
312 if mode == ReadMode::FullValidation && !seen_observations.contains(&observation_id)
313 {
314 event_iter.diagnostics.add(
315 Diagnostic::fatal(
316 DiagnosticCode::NonExistentObservationId,
317 format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
318 )
319 .with_location(filename.to_string(), line_number)
320 );
321 continue;
322 }
323
324 if let Err(diag) = apply_change(&mut state, &path, new_value) {
325 event_iter
326 .diagnostics
327 .add(diag.with_location(filename.to_string(), line_number));
328 continue;
329 }
330 }
331
332 Event::Remove {
333 path,
334 observation_id,
335 } => {
336 events_in_observation += 1;
337
338 if mode == ReadMode::FullValidation && !seen_observations.contains(&observation_id)
339 {
340 event_iter.diagnostics.add(
341 Diagnostic::fatal(
342 DiagnosticCode::NonExistentObservationId,
343 format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
344 )
345 .with_location(filename.to_string(), line_number)
346 );
347 continue;
348 }
349
350 if let Err(diag) = apply_remove(&mut state, &path) {
351 event_iter
352 .diagnostics
353 .add(diag.with_location(filename.to_string(), line_number));
354 continue;
355 }
356 }
357
358 Event::Move {
359 path,
360 moves,
361 observation_id,
362 } => {
363 events_in_observation += 1;
364
365 if mode == ReadMode::FullValidation && !seen_observations.contains(&observation_id)
366 {
367 event_iter.diagnostics.add(
368 Diagnostic::fatal(
369 DiagnosticCode::NonExistentObservationId,
370 format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
371 )
372 .with_location(filename.to_string(), line_number)
373 );
374 continue;
375 }
376
377 if let Err(diag) = apply_move(&mut state, &path, moves) {
378 event_iter
379 .diagnostics
380 .add(diag.with_location(filename.to_string(), line_number));
381 continue;
382 }
383 }
384
385 Event::Snapshot {
386 observation_id: _,
387 timestamp: _,
388 object,
389 } => {
390 if mode == ReadMode::FullValidation && state != object {
391 event_iter.diagnostics.add(
392 Diagnostic::fatal(
393 DiagnosticCode::SnapshotStateMismatch,
394 "I found a snapshot whose state doesn't match the replayed state up to this point.".to_string()
395 )
396 .with_location(filename.to_string(), line_number)
397 .with_advice(
398 "This could indicate corruption or that events were applied incorrectly. \
399 The snapshot state should exactly match the result of replaying all events \
400 from the initial state."
401 .to_string()
402 )
403 );
404 }
405
406 state = object;
407 }
408 }
409 }
410
411 if let Some((_obs_id, obs_line, expected_count)) = ¤t_observation {
412 if events_in_observation != *expected_count {
413 event_iter.diagnostics.add(
414 Diagnostic::new(
415 DiagnosticLevel::Warning,
416 DiagnosticCode::ChangeCountMismatch,
417 format!(
418 "The observe event at line {} declared {} changes, but I found {}.",
419 obs_line, expected_count, events_in_observation
420 ),
421 )
422 .with_location(filename.to_string(), *obs_line),
423 );
424 }
425 }
426
427 Ok(ReadResult {
428 header,
429 final_state: state,
430 diagnostics: event_iter.diagnostics,
431 observation_count,
432 })
433}
434
435fn parse_header(filename: &str, line: &str, line_number: usize) -> Result<Header, Diagnostic> {
436 let value: Value = serde_json::from_str(line).map_err(|e| {
437 Diagnostic::fatal(
438 DiagnosticCode::MissingHeader,
439 format!("I couldn't parse the header as JSON: {}", e),
440 )
441 .with_location(filename.to_string(), line_number)
442 .with_snippet(format!("{} | {}", line_number, line))
443 .with_advice(
444 "The first line must be a JSON object containing the archive header.\n\
445 Required fields: type, version, created, initial"
446 .to_string(),
447 )
448 })?;
449
450 let header = serde_json::from_value::<Header>(value).map_err(|e| {
451 Diagnostic::fatal(
452 DiagnosticCode::MissingHeaderField,
453 format!("I couldn't parse the header: {}", e),
454 )
455 .with_location(filename.to_string(), line_number)
456 .with_snippet(format!("{} | {}", line_number, line))
457 .with_advice(
458 "The header must contain:\n\
459 - type: \"@peoplesgrocers/json-archive\"\n\
460 - version: 1\n\
461 - created: an ISO-8601 timestamp\n\
462 - initial: the initial state object"
463 .to_string(),
464 )
465 })?;
466
467 if header.version != 1 {
468 return Err(Diagnostic::fatal(
469 DiagnosticCode::UnsupportedVersion,
470 format!(
471 "I found version {}, but I only support version 1.",
472 header.version
473 ),
474 )
475 .with_location(filename.to_string(), line_number)
476 .with_advice(
477 "This archive was created with a newer or older version of the format. \
478 You may need to upgrade your tools or convert the archive."
479 .to_string(),
480 ));
481 }
482
483 Ok(header)
484}
485
486pub fn apply_add(state: &mut Value, path: &str, value: Value) -> Result<(), Diagnostic> {
487 let pointer = JsonPointer::new(path).map_err(|diag| {
488 diag.with_advice(
489 "JSON Pointer paths must start with '/' and use '/' to separate segments.\n\
490 Special characters: use ~0 for ~ and ~1 for /"
491 .to_string(),
492 )
493 })?;
494
495 pointer.set(state, value).map_err(|diag| {
496 diag.with_advice(
497 "For add operations, the parent path must exist. \
498 For example, to add /a/b/c, the paths /a and /a/b must already exist."
499 .to_string(),
500 )
501 })
502}
503
504pub fn apply_change(state: &mut Value, path: &str, new_value: Value) -> Result<(), Diagnostic> {
505 let pointer = JsonPointer::new(path)?;
506 pointer.set(state, new_value)?;
507 Ok(())
508}
509
510pub fn apply_remove(state: &mut Value, path: &str) -> Result<(), Diagnostic> {
511 let pointer = JsonPointer::new(path)?;
512 pointer.remove(state)?;
513 Ok(())
514}
515
516pub fn apply_move(
517 state: &mut Value,
518 path: &str,
519 moves: Vec<(usize, usize)>,
520) -> Result<(), Diagnostic> {
521 let pointer = JsonPointer::new(path)?;
522
523 let array = pointer.get_mut(state)?;
524
525 if !array.is_array() {
526 return Err(Diagnostic::fatal(
527 DiagnosticCode::MoveOnNonArray,
528 format!(
529 "I can't apply move operations to '{}' because it's not an array.",
530 path
531 ),
532 )
533 .with_advice(
534 "Move operations can only reorder elements within an array. \
535 The path must point to an array value."
536 .to_string(),
537 ));
538 }
539
540 let arr = array.as_array_mut().unwrap();
541
542 for (from_idx, to_idx) in &moves {
544 if *from_idx >= arr.len() {
545 return Err(Diagnostic::fatal(
546 DiagnosticCode::MoveIndexOutOfBounds,
547 format!(
548 "The 'from' index {} is out of bounds (array length is {}).",
549 from_idx,
550 arr.len()
551 ),
552 ));
553 }
554
555 if *to_idx > arr.len() {
556 return Err(Diagnostic::fatal(
557 DiagnosticCode::MoveIndexOutOfBounds,
558 format!(
559 "The 'to' index {} is out of bounds (array length is {}).",
560 to_idx,
561 arr.len()
562 ),
563 ));
564 }
565 }
566
567 for (from_idx, to_idx) in moves {
569 let element = arr.remove(from_idx);
570 let insert_idx = if to_idx > from_idx {
571 to_idx - 1
572 } else {
573 to_idx
574 };
575 arr.insert(insert_idx, element);
576 }
577
578 Ok(())
579}
580
581#[cfg(test)]
582mod tests {
583 use super::*;
584 use serde_json::json;
585 use std::fs::File;
586 use std::io::{BufReader, Write};
587 use tempfile::NamedTempFile;
588
589 #[test]
590 fn test_read_valid_archive() {
591 let mut temp_file = NamedTempFile::new().unwrap();
592
593 let header = Header::new(json!({"count": 0}), Some("test".to_string()));
594 writeln!(temp_file, "{}", serde_json::to_string(&header).unwrap()).unwrap();
595 writeln!(
596 temp_file,
597 r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 1]"#
598 )
599 .unwrap();
600 writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#).unwrap();
601
602 let file = File::open(temp_file.path()).unwrap();
603 let reader = BufReader::new(file);
604 let result = read_archive(
605 reader,
606 &temp_file.path().display().to_string(),
607 ReadMode::FullValidation,
608 )
609 .unwrap();
610
611 assert_eq!(result.final_state, json!({"count": 1}));
612 assert_eq!(result.observation_count, 1);
613 assert!(!result.diagnostics.has_fatal());
614 }
615
616 #[test]
617 fn test_empty_file() {
618 let temp_file = NamedTempFile::new().unwrap();
619
620 let file = File::open(temp_file.path()).unwrap();
621 let reader = BufReader::new(file);
622 let result = read_archive(
623 reader,
624 &temp_file.path().display().to_string(),
625 ReadMode::FullValidation,
626 );
627
628 assert!(result.is_err());
629 }
630
631 #[test]
632 fn test_non_existent_observation_id() {
633 let mut temp_file = NamedTempFile::new().unwrap();
634
635 let header = Header::new(json!({"count": 0}), None);
636 writeln!(temp_file, "{}", serde_json::to_string(&header).unwrap()).unwrap();
637 writeln!(temp_file, r#"["change", "/count", 1, "obs-999"]"#).unwrap();
638
639 let file = File::open(temp_file.path()).unwrap();
640 let reader = BufReader::new(file);
641 let result = read_archive(
642 reader,
643 &temp_file.path().display().to_string(),
644 ReadMode::FullValidation,
645 )
646 .unwrap();
647
648 assert!(result.diagnostics.has_fatal());
649 }
650
651 #[test]
652 fn test_append_mode_ignores_observation_id() {
653 let mut temp_file = NamedTempFile::new().unwrap();
654
655 let header = Header::new(json!({"count": 0}), None);
656 writeln!(temp_file, "{}", serde_json::to_string(&header).unwrap()).unwrap();
657 writeln!(temp_file, r#"["change", "/count", 1, "obs-999"]"#).unwrap();
658
659 let file = File::open(temp_file.path()).unwrap();
660 let reader = BufReader::new(file);
661 let result = read_archive(
662 reader,
663 &temp_file.path().display().to_string(),
664 ReadMode::AppendSeek,
665 )
666 .unwrap();
667
668 assert!(!result.diagnostics.has_fatal());
669 assert_eq!(result.final_state, json!({"count": 1}));
670 }
671
672 #[test]
673 fn test_change_count_mismatch() {
674 let mut temp_file = NamedTempFile::new().unwrap();
675
676 let header = Header::new(json!({"count": 0}), None);
677 writeln!(temp_file, "{}", serde_json::to_string(&header).unwrap()).unwrap();
678 writeln!(
679 temp_file,
680 r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 2]"#
681 )
682 .unwrap();
683 writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#).unwrap();
684
685 let file = File::open(temp_file.path()).unwrap();
686 let reader = BufReader::new(file);
687 let result = read_archive(
688 reader,
689 &temp_file.path().display().to_string(),
690 ReadMode::FullValidation,
691 )
692 .unwrap();
693
694 let warnings: Vec<_> = result
695 .diagnostics
696 .diagnostics()
697 .iter()
698 .filter(|d| d.level == DiagnosticLevel::Warning)
699 .collect();
700
701 assert_eq!(warnings.len(), 1);
702 }
703
704 #[test]
705 fn test_simple_change() {
706 let mut temp_file = NamedTempFile::new().unwrap();
707
708 let header = Header::new(json!({"count": 5}), None);
709 writeln!(temp_file, "{}", serde_json::to_string(&header).unwrap()).unwrap();
710 writeln!(
711 temp_file,
712 r#"["observe", "obs-1", "2025-01-01T00:00:00Z", 1]"#
713 )
714 .unwrap();
715 writeln!(temp_file, r#"["change", "/count", 1, "obs-1"]"#).unwrap();
716
717 let file = File::open(temp_file.path()).unwrap();
718 let reader = BufReader::new(file);
719 let result = read_archive(
720 reader,
721 &temp_file.path().display().to_string(),
722 ReadMode::FullValidation,
723 )
724 .unwrap();
725
726 assert!(!result.diagnostics.has_fatal());
727 assert_eq!(result.final_state, json!({"count": 1}));
728 }
729}