1#![allow(clippy::missing_errors_doc)]
32
33use std::collections::BTreeMap;
34use std::fmt;
35use std::path::Path;
36
37use serde::{Deserialize, Serialize};
38
39use crate::model::patch::PatchSet;
40use crate::model::types::{EpochId, GitOid, WorkspaceId};
41use crate::oplog::read::{OpLogReadError, walk_chain};
42use crate::oplog::types::{OpPayload, Operation};
43
44#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
52pub struct MaterializedView {
53 pub workspace_id: WorkspaceId,
55
56 pub epoch: Option<EpochId>,
58
59 pub patch_set: Option<PatchSet>,
63
64 pub patch_set_oid: Option<GitOid>,
68
69 pub description: Option<String>,
71
72 pub annotations: BTreeMap<String, BTreeMap<String, serde_json::Value>>,
74
75 pub op_count: usize,
77
78 pub is_destroyed: bool,
80}
81
82impl MaterializedView {
83 #[must_use]
85 pub const fn empty(workspace_id: WorkspaceId) -> Self {
86 Self {
87 workspace_id,
88 epoch: None,
89 patch_set: None,
90 patch_set_oid: None,
91 description: None,
92 annotations: BTreeMap::new(),
93 op_count: 0,
94 is_destroyed: false,
95 }
96 }
97
98 #[must_use]
100 pub const fn destroyed(&self) -> bool {
101 self.is_destroyed
102 }
103
104 #[must_use]
106 pub fn has_changes(&self) -> bool {
107 self.patch_set.as_ref().is_some_and(|ps| !ps.is_empty())
108 }
109}
110
111impl fmt::Display for MaterializedView {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 write!(f, "view({}", self.workspace_id)?;
114 if let Some(epoch) = &self.epoch {
115 write!(f, ", epoch={}", &epoch.as_str()[..12])?;
116 }
117 if let Some(ps) = &self.patch_set {
118 write!(f, ", {} patches", ps.len())?;
119 }
120 write!(f, ", {} ops", self.op_count)?;
121 if self.is_destroyed {
122 write!(f, ", DESTROYED")?;
123 }
124 write!(f, ")")
125 }
126}
127
128#[derive(Debug)]
134pub enum ViewError {
135 OpLog(OpLogReadError),
137
138 PatchSetRead {
140 oid: String,
142 detail: String,
144 },
145}
146
147impl fmt::Display for ViewError {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 match self {
150 Self::OpLog(e) => write!(f, "op log error: {e}"),
151 Self::PatchSetRead { oid, detail } => {
152 write!(f, "failed to read patch set blob {oid}: {detail}")
153 }
154 }
155 }
156}
157
158impl std::error::Error for ViewError {
159 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
160 match self {
161 Self::OpLog(e) => Some(e),
162 Self::PatchSetRead { .. } => None,
163 }
164 }
165}
166
167impl From<OpLogReadError> for ViewError {
168 fn from(e: OpLogReadError) -> Self {
169 Self::OpLog(e)
170 }
171}
172
173fn apply_operation<F>(
183 view: &mut MaterializedView,
184 _oid: &GitOid,
185 op: &Operation,
186 read_patch_set: &F,
187) -> Result<(), ViewError>
188where
189 F: Fn(&GitOid) -> Result<PatchSet, ViewError>,
190{
191 view.op_count += 1;
192
193 match &op.payload {
194 OpPayload::Create { epoch } => {
195 view.epoch = Some(epoch.clone());
196 view.patch_set = None;
197 view.patch_set_oid = None;
198 view.is_destroyed = false;
199 }
200
201 OpPayload::Snapshot { patch_set_oid } => {
202 let ps = read_patch_set(patch_set_oid)?;
203 view.patch_set = Some(ps);
204 view.patch_set_oid = Some(patch_set_oid.clone());
205 }
206
207 OpPayload::Compensate { .. } => {
208 view.patch_set = None;
210 view.patch_set_oid = None;
211 }
212
213 OpPayload::Merge { epoch_after, .. } => {
214 view.epoch = Some(epoch_after.clone());
215 view.patch_set = None;
216 view.patch_set_oid = None;
217 }
218
219 OpPayload::Describe { message } => {
220 view.description = Some(message.clone());
221 }
222
223 OpPayload::Annotate { key, data } => {
224 view.annotations.insert(key.clone(), data.clone());
225 }
226
227 OpPayload::Destroy => {
228 view.is_destroyed = true;
229 }
230 }
231
232 Ok(())
233}
234
235#[allow(dead_code)]
246pub fn materialize<F>(
247 root: &Path,
248 workspace: &WorkspaceId,
249 read_patch_set: F,
250) -> Result<MaterializedView, ViewError>
251where
252 F: Fn(&GitOid) -> Result<PatchSet, ViewError>,
253{
254 let no_stop: Option<&dyn Fn(&Operation) -> bool> = None;
255 let chain = walk_chain(root, workspace, None, no_stop)?;
256
257 let mut ops: Vec<_> = chain;
260 ops.reverse();
261
262 let mut view = MaterializedView::empty(workspace.clone());
263
264 for (oid, op) in &ops {
265 apply_operation(&mut view, oid, op, &read_patch_set)?;
266 }
267
268 Ok(view)
269}
270
271pub fn materialize_from_ops<F>(
276 workspace: WorkspaceId,
277 ops: &[(GitOid, Operation)],
278 read_patch_set: F,
279) -> Result<MaterializedView, ViewError>
280where
281 F: Fn(&GitOid) -> Result<PatchSet, ViewError>,
282{
283 let mut view = MaterializedView::empty(workspace);
284
285 for (oid, op) in ops {
286 apply_operation(&mut view, oid, op, &read_patch_set)?;
287 }
288
289 Ok(view)
290}
291
292#[allow(dead_code)]
306pub fn read_patch_set_blob(root: &Path, oid: &GitOid) -> Result<PatchSet, ViewError> {
307 let output = std::process::Command::new("git")
308 .args(["cat-file", "-p", oid.as_str()])
309 .current_dir(root)
310 .output()
311 .map_err(|e| ViewError::PatchSetRead {
312 oid: oid.as_str().to_owned(),
313 detail: format!("spawn git: {e}"),
314 })?;
315
316 if !output.status.success() {
317 return Err(ViewError::PatchSetRead {
318 oid: oid.as_str().to_owned(),
319 detail: String::from_utf8_lossy(&output.stderr).to_string(),
320 });
321 }
322
323 serde_json::from_slice(&output.stdout).map_err(|e| ViewError::PatchSetRead {
324 oid: oid.as_str().to_owned(),
325 detail: format!("JSON parse: {e}"),
326 })
327}
328
329#[cfg(test)]
334mod tests {
335 use super::*;
336 use std::collections::BTreeMap;
337 use std::path::PathBuf;
338
339 fn test_oid(c: char) -> GitOid {
341 GitOid::new(&c.to_string().repeat(40)).unwrap()
342 }
343
344 fn test_epoch(c: char) -> EpochId {
345 EpochId::new(&c.to_string().repeat(40)).unwrap()
346 }
347
348 fn test_ws(name: &str) -> WorkspaceId {
349 WorkspaceId::new(name).unwrap()
350 }
351
352 fn timestamp() -> String {
353 "2026-02-19T12:00:00Z".to_owned()
354 }
355
356 fn test_patch_set(epoch_char: char) -> PatchSet {
358 use crate::model::patch::{FileId, PatchValue};
359 let mut patches = BTreeMap::new();
360 patches.insert(
361 PathBuf::from("src/main.rs"),
362 PatchValue::Add {
363 blob: test_oid('f'),
364 file_id: FileId::new(1),
365 },
366 );
367 PatchSet {
368 base_epoch: test_epoch(epoch_char),
369 patches,
370 }
371 }
372
373 fn mock_reader(ps: PatchSet) -> impl Fn(&GitOid) -> Result<PatchSet, ViewError> {
375 move |_oid| Ok(ps.clone())
376 }
377
378 fn failing_reader() -> impl Fn(&GitOid) -> Result<PatchSet, ViewError> {
380 |oid| {
381 Err(ViewError::PatchSetRead {
382 oid: oid.as_str().to_owned(),
383 detail: "mock failure".to_owned(),
384 })
385 }
386 }
387
388 fn make_op(ws: &str, payload: OpPayload) -> Operation {
390 Operation {
391 parent_ids: vec![],
392 workspace_id: test_ws(ws),
393 timestamp: timestamp(),
394 payload,
395 }
396 }
397
398 #[test]
403 fn empty_view() {
404 let view = MaterializedView::empty(test_ws("test"));
405 assert_eq!(view.workspace_id, test_ws("test"));
406 assert!(view.epoch.is_none());
407 assert!(view.patch_set.is_none());
408 assert!(view.description.is_none());
409 assert!(view.annotations.is_empty());
410 assert_eq!(view.op_count, 0);
411 assert!(!view.is_destroyed);
412 assert!(!view.has_changes());
413 }
414
415 #[test]
416 fn view_display() {
417 let mut view = MaterializedView::empty(test_ws("agent-1"));
418 view.epoch = Some(test_epoch('a'));
419 view.op_count = 5;
420 let display = format!("{view}");
421 assert!(display.contains("agent-1"));
422 assert!(display.contains("5 ops"));
423 }
424
425 #[test]
426 fn view_display_destroyed() {
427 let mut view = MaterializedView::empty(test_ws("ws-1"));
428 view.is_destroyed = true;
429 view.op_count = 3;
430 let display = format!("{view}");
431 assert!(display.contains("DESTROYED"));
432 }
433
434 #[test]
435 fn view_serde_roundtrip() {
436 let mut view = MaterializedView::empty(test_ws("ws-1"));
437 view.epoch = Some(test_epoch('a'));
438 view.description = Some("test workspace".into());
439 view.op_count = 2;
440
441 let json = serde_json::to_string(&view).unwrap();
442 let decoded: MaterializedView = serde_json::from_str(&json).unwrap();
443 assert_eq!(decoded, view);
444 }
445
446 #[test]
451 fn replay_create() {
452 let ops = vec![(
453 test_oid('1'),
454 make_op(
455 "ws-1",
456 OpPayload::Create {
457 epoch: test_epoch('a'),
458 },
459 ),
460 )];
461 let view =
462 materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(test_patch_set('a'))).unwrap();
463
464 assert_eq!(view.epoch, Some(test_epoch('a')));
465 assert!(view.patch_set.is_none());
466 assert_eq!(view.op_count, 1);
467 assert!(!view.is_destroyed);
468 }
469
470 #[test]
475 fn replay_snapshot() {
476 let ps = test_patch_set('a');
477 let ops = vec![
478 (
479 test_oid('1'),
480 make_op(
481 "ws-1",
482 OpPayload::Create {
483 epoch: test_epoch('a'),
484 },
485 ),
486 ),
487 (
488 test_oid('2'),
489 make_op(
490 "ws-1",
491 OpPayload::Snapshot {
492 patch_set_oid: test_oid('d'),
493 },
494 ),
495 ),
496 ];
497 let view = materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(ps.clone())).unwrap();
498
499 assert_eq!(view.epoch, Some(test_epoch('a')));
500 assert_eq!(view.patch_set, Some(ps));
501 assert_eq!(view.patch_set_oid, Some(test_oid('d')));
502 assert_eq!(view.op_count, 2);
503 assert!(view.has_changes());
504 }
505
506 #[test]
507 fn snapshot_read_failure_propagates() {
508 let ops = vec![
509 (
510 test_oid('1'),
511 make_op(
512 "ws-1",
513 OpPayload::Create {
514 epoch: test_epoch('a'),
515 },
516 ),
517 ),
518 (
519 test_oid('2'),
520 make_op(
521 "ws-1",
522 OpPayload::Snapshot {
523 patch_set_oid: test_oid('d'),
524 },
525 ),
526 ),
527 ];
528 let result = materialize_from_ops(test_ws("ws-1"), &ops, failing_reader());
529 assert!(result.is_err());
530 }
531
532 #[test]
537 fn replay_compensate_clears_patch_set() {
538 let ps = test_patch_set('a');
539 let ops = vec![
540 (
541 test_oid('1'),
542 make_op(
543 "ws-1",
544 OpPayload::Create {
545 epoch: test_epoch('a'),
546 },
547 ),
548 ),
549 (
550 test_oid('2'),
551 make_op(
552 "ws-1",
553 OpPayload::Snapshot {
554 patch_set_oid: test_oid('d'),
555 },
556 ),
557 ),
558 (
559 test_oid('3'),
560 make_op(
561 "ws-1",
562 OpPayload::Compensate {
563 target_op: test_oid('2'),
564 reason: "undo snapshot".into(),
565 },
566 ),
567 ),
568 ];
569 let view = materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(ps)).unwrap();
570
571 assert!(view.patch_set.is_none());
572 assert!(view.patch_set_oid.is_none());
573 assert_eq!(view.op_count, 3);
574 assert!(!view.has_changes());
575 }
576
577 #[test]
582 fn replay_merge_updates_epoch() {
583 let ops = vec![
584 (
585 test_oid('1'),
586 make_op(
587 "ws-1",
588 OpPayload::Create {
589 epoch: test_epoch('a'),
590 },
591 ),
592 ),
593 (
594 test_oid('2'),
595 make_op(
596 "ws-1",
597 OpPayload::Snapshot {
598 patch_set_oid: test_oid('d'),
599 },
600 ),
601 ),
602 (
603 test_oid('3'),
604 make_op(
605 "ws-1",
606 OpPayload::Merge {
607 sources: vec![test_ws("ws-1"), test_ws("ws-2")],
608 epoch_before: test_epoch('a'),
609 epoch_after: test_epoch('b'),
610 },
611 ),
612 ),
613 ];
614 let view =
615 materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(test_patch_set('a'))).unwrap();
616
617 assert_eq!(view.epoch, Some(test_epoch('b')));
618 assert!(view.patch_set.is_none(), "merge clears patch set");
619 assert_eq!(view.op_count, 3);
620 }
621
622 #[test]
627 fn replay_describe_updates_metadata() {
628 let ops = vec![
629 (
630 test_oid('1'),
631 make_op(
632 "ws-1",
633 OpPayload::Create {
634 epoch: test_epoch('a'),
635 },
636 ),
637 ),
638 (
639 test_oid('2'),
640 make_op(
641 "ws-1",
642 OpPayload::Describe {
643 message: "implementing auth".into(),
644 },
645 ),
646 ),
647 ];
648 let view =
649 materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(test_patch_set('a'))).unwrap();
650
651 assert_eq!(view.description, Some("implementing auth".into()));
652 assert_eq!(view.op_count, 2);
653 }
654
655 #[test]
656 fn describe_latest_wins() {
657 let ops = vec![
658 (
659 test_oid('1'),
660 make_op(
661 "ws-1",
662 OpPayload::Create {
663 epoch: test_epoch('a'),
664 },
665 ),
666 ),
667 (
668 test_oid('2'),
669 make_op(
670 "ws-1",
671 OpPayload::Describe {
672 message: "first description".into(),
673 },
674 ),
675 ),
676 (
677 test_oid('3'),
678 make_op(
679 "ws-1",
680 OpPayload::Describe {
681 message: "updated description".into(),
682 },
683 ),
684 ),
685 ];
686 let view =
687 materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(test_patch_set('a'))).unwrap();
688
689 assert_eq!(view.description, Some("updated description".into()));
690 }
691
692 #[test]
697 fn replay_annotate_adds_annotation() {
698 let mut data = BTreeMap::new();
699 data.insert("passed".into(), serde_json::Value::Bool(true));
700
701 let ops = vec![
702 (
703 test_oid('1'),
704 make_op(
705 "ws-1",
706 OpPayload::Create {
707 epoch: test_epoch('a'),
708 },
709 ),
710 ),
711 (
712 test_oid('2'),
713 make_op(
714 "ws-1",
715 OpPayload::Annotate {
716 key: "validation".into(),
717 data: data.clone(),
718 },
719 ),
720 ),
721 ];
722 let view =
723 materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(test_patch_set('a'))).unwrap();
724
725 assert!(view.annotations.contains_key("validation"));
726 assert_eq!(
727 view.annotations["validation"]["passed"],
728 serde_json::Value::Bool(true)
729 );
730 }
731
732 #[test]
733 fn annotate_latest_wins_per_key() {
734 let mut data1 = BTreeMap::new();
735 data1.insert("status".into(), serde_json::Value::String("pending".into()));
736
737 let mut data2 = BTreeMap::new();
738 data2.insert(
739 "status".into(),
740 serde_json::Value::String("approved".into()),
741 );
742
743 let ops = vec![
744 (
745 test_oid('1'),
746 make_op(
747 "ws-1",
748 OpPayload::Create {
749 epoch: test_epoch('a'),
750 },
751 ),
752 ),
753 (
754 test_oid('2'),
755 make_op(
756 "ws-1",
757 OpPayload::Annotate {
758 key: "review".into(),
759 data: data1,
760 },
761 ),
762 ),
763 (
764 test_oid('3'),
765 make_op(
766 "ws-1",
767 OpPayload::Annotate {
768 key: "review".into(),
769 data: data2,
770 },
771 ),
772 ),
773 ];
774 let view =
775 materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(test_patch_set('a'))).unwrap();
776
777 assert_eq!(
778 view.annotations["review"]["status"],
779 serde_json::Value::String("approved".into())
780 );
781 }
782
783 #[test]
788 fn replay_destroy() {
789 let ops = vec![
790 (
791 test_oid('1'),
792 make_op(
793 "ws-1",
794 OpPayload::Create {
795 epoch: test_epoch('a'),
796 },
797 ),
798 ),
799 (test_oid('2'), make_op("ws-1", OpPayload::Destroy)),
800 ];
801 let view =
802 materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(test_patch_set('a'))).unwrap();
803
804 assert!(view.is_destroyed);
805 assert!(view.destroyed());
806 assert_eq!(view.op_count, 2);
807 }
808
809 #[test]
814 fn full_lifecycle_create_snapshot_describe_merge() {
815 let ps = test_patch_set('a');
816 let ops = vec![
817 (
818 test_oid('1'),
819 make_op(
820 "ws-1",
821 OpPayload::Create {
822 epoch: test_epoch('a'),
823 },
824 ),
825 ),
826 (
827 test_oid('2'),
828 make_op(
829 "ws-1",
830 OpPayload::Describe {
831 message: "implementing feature X".into(),
832 },
833 ),
834 ),
835 (
836 test_oid('3'),
837 make_op(
838 "ws-1",
839 OpPayload::Snapshot {
840 patch_set_oid: test_oid('d'),
841 },
842 ),
843 ),
844 (
845 test_oid('4'),
846 make_op(
847 "ws-1",
848 OpPayload::Merge {
849 sources: vec![test_ws("ws-1")],
850 epoch_before: test_epoch('a'),
851 epoch_after: test_epoch('b'),
852 },
853 ),
854 ),
855 ];
856
857 let view = materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(ps)).unwrap();
858
859 assert_eq!(view.epoch, Some(test_epoch('b')));
860 assert!(view.patch_set.is_none(), "merge clears patches");
861 assert_eq!(view.description, Some("implementing feature X".into()));
862 assert_eq!(view.op_count, 4);
863 assert!(!view.is_destroyed);
864 }
865
866 #[test]
867 fn empty_op_list_produces_empty_view() {
868 let ops: Vec<(GitOid, Operation)> = vec![];
869 let view =
870 materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(test_patch_set('a'))).unwrap();
871
872 assert_eq!(view.op_count, 0);
873 assert!(view.epoch.is_none());
874 assert!(view.patch_set.is_none());
875 }
876
877 #[test]
878 fn multiple_snapshots_last_wins() {
879 use crate::model::patch::{FileId, PatchValue};
880
881 let ps1 = test_patch_set('a');
882 let mut ps2_patches = BTreeMap::new();
883 ps2_patches.insert(
884 PathBuf::from("src/lib.rs"),
885 PatchValue::Add {
886 blob: test_oid('9'),
887 file_id: FileId::new(2),
888 },
889 );
890 let ps2 = PatchSet {
891 base_epoch: test_epoch('a'),
892 patches: ps2_patches,
893 };
894
895 let patch_sets: BTreeMap<String, PatchSet> = [
896 (test_oid('d').as_str().to_owned(), ps1),
897 (test_oid('e').as_str().to_owned(), ps2.clone()),
898 ]
899 .into_iter()
900 .collect();
901
902 let reader = move |oid: &GitOid| {
903 patch_sets
904 .get(oid.as_str())
905 .cloned()
906 .ok_or_else(|| ViewError::PatchSetRead {
907 oid: oid.as_str().to_owned(),
908 detail: "not found".into(),
909 })
910 };
911
912 let ops = vec![
913 (
914 test_oid('1'),
915 make_op(
916 "ws-1",
917 OpPayload::Create {
918 epoch: test_epoch('a'),
919 },
920 ),
921 ),
922 (
923 test_oid('2'),
924 make_op(
925 "ws-1",
926 OpPayload::Snapshot {
927 patch_set_oid: test_oid('d'),
928 },
929 ),
930 ),
931 (
932 test_oid('3'),
933 make_op(
934 "ws-1",
935 OpPayload::Snapshot {
936 patch_set_oid: test_oid('e'),
937 },
938 ),
939 ),
940 ];
941 let view = materialize_from_ops(test_ws("ws-1"), &ops, reader).unwrap();
942
943 assert_eq!(view.patch_set, Some(ps2));
944 assert_eq!(view.patch_set_oid, Some(test_oid('e')));
945 }
946
947 #[test]
952 fn causal_order_matters_create_then_destroy_vs_destroy_then_create() {
953 let ops1 = vec![
955 (
956 test_oid('1'),
957 make_op(
958 "ws-1",
959 OpPayload::Create {
960 epoch: test_epoch('a'),
961 },
962 ),
963 ),
964 (test_oid('2'), make_op("ws-1", OpPayload::Destroy)),
965 ];
966 let view1 =
967 materialize_from_ops(test_ws("ws-1"), &ops1, mock_reader(test_patch_set('a'))).unwrap();
968 assert!(view1.is_destroyed);
969
970 let ops2 = vec![
972 (test_oid('1'), make_op("ws-1", OpPayload::Destroy)),
973 (
974 test_oid('2'),
975 make_op(
976 "ws-1",
977 OpPayload::Create {
978 epoch: test_epoch('b'),
979 },
980 ),
981 ),
982 ];
983 let view2 =
984 materialize_from_ops(test_ws("ws-1"), &ops2, mock_reader(test_patch_set('a'))).unwrap();
985 assert!(!view2.is_destroyed);
986 assert_eq!(view2.epoch, Some(test_epoch('b')));
987 }
988}