1use anyhow::{Context, Result};
2use yrs::updates::decoder::Decode;
3use yrs::{Doc, GetString, ReadTxn, Text, TextRef, Transact, Update};
4
5const TEXT_KEY: &str = "content";
6
7pub struct CrdtDoc {
9 doc: Doc,
10}
11
12impl CrdtDoc {
13 pub fn from_text(content: &str) -> Self {
15 let doc = Doc::new();
16 let text = doc.get_or_insert_text(TEXT_KEY);
17 let mut txn = doc.transact_mut();
18 text.insert(&mut txn, 0, content);
19 drop(txn);
20 CrdtDoc { doc }
21 }
22
23 pub fn to_text(&self) -> String {
25 let text = self.doc.get_or_insert_text(TEXT_KEY);
26 let txn = self.doc.transact();
27 text.get_string(&txn)
28 }
29
30 #[allow(dead_code)] pub fn apply_edit(&self, offset: u32, delete_len: u32, insert: &str) {
33 let text = self.doc.get_or_insert_text(TEXT_KEY);
34 let mut txn = self.doc.transact_mut();
35 if delete_len > 0 {
36 text.remove_range(&mut txn, offset, delete_len);
37 }
38 if !insert.is_empty() {
39 text.insert(&mut txn, offset, insert);
40 }
41 }
42
43 pub fn encode_state(&self) -> Vec<u8> {
45 let txn = self.doc.transact();
46 txn.encode_state_as_update_v1(&yrs::StateVector::default())
47 }
48
49 pub fn decode_state(bytes: &[u8]) -> Result<Self> {
51 let doc = Doc::new();
52 let update = Update::decode_v1(bytes)
53 .map_err(|e| anyhow::anyhow!("failed to decode CRDT state: {}", e))?;
54 let mut txn = doc.transact_mut();
55 txn.apply_update(update)
56 .map_err(|e| anyhow::anyhow!("failed to apply CRDT update: {}", e))?;
57 drop(txn);
58 Ok(CrdtDoc { doc })
59 }
60}
61
62pub fn merge(base_state: Option<&[u8]>, ours_text: &str, theirs_text: &str) -> Result<String> {
72 if ours_text == theirs_text {
74 eprintln!("[crdt] ours == theirs, skipping merge");
75 return Ok(ours_text.to_string());
76 }
77
78 let base_doc = if let Some(bytes) = base_state {
80 CrdtDoc::decode_state(bytes)
81 .context("failed to decode base CRDT state")?
82 } else {
83 CrdtDoc::from_text("")
84 };
85 let mut base_text = base_doc.to_text();
86
87 eprintln!(
88 "[crdt] merge: base_len={} ours_len={} theirs_len={}",
89 base_text.len(),
90 ours_text.len(),
91 theirs_text.len()
92 );
93
94 let ours_common = common_prefix_len(&base_text, ours_text);
99 let theirs_common = common_prefix_len(&base_text, theirs_text);
100 let base_len = base_text.len();
101
102 if base_len > 0
103 && (ours_common as f64 / base_len as f64) < 0.5
104 && (theirs_common as f64 / base_len as f64) < 0.5
105 {
106 eprintln!(
107 "[crdt] Stale CRDT base detected (common prefix: ours={}%, theirs={}%). Using ours as base.",
108 (ours_common * 100) / base_len,
109 (theirs_common * 100) / base_len
110 );
111 base_text = ours_text.to_string();
112 }
113
114 let mutual_prefix = common_prefix_len(ours_text, theirs_text);
131 if mutual_prefix > base_text.len() {
132 let snap = &ours_text[..mutual_prefix];
139 let snapped = match snap.rfind('\n') {
140 Some(pos) if pos >= base_text.len() => pos + 1,
141 _ => base_text.len(), };
143 if snapped > base_text.len() {
144 eprintln!(
145 "[crdt] Advancing base to shared prefix (base_len={} → {})",
146 base_text.len(),
147 snapped
148 );
149 base_text = ours_text[..snapped].to_string();
150 }
151 }
152
153 let ours_ops = compute_edit_ops(&base_text, ours_text);
155 let theirs_ops = compute_edit_ops(&base_text, theirs_text);
156
157 let base_encoded = if base_text == base_doc.to_text() {
160 base_doc.encode_state()
161 } else {
162 CrdtDoc::from_text(&base_text).encode_state()
163 };
164
165 let ours_doc = Doc::with_client_id(1);
166 {
167 let update = Update::decode_v1(&base_encoded)
168 .map_err(|e| anyhow::anyhow!("decode error: {}", e))?;
169 let mut txn = ours_doc.transact_mut();
170 txn.apply_update(update)
171 .map_err(|e| anyhow::anyhow!("apply error: {}", e))?;
172 }
173
174 let theirs_doc = Doc::with_client_id(2);
175 {
176 let update = Update::decode_v1(&base_encoded)
177 .map_err(|e| anyhow::anyhow!("decode error: {}", e))?;
178 let mut txn = theirs_doc.transact_mut();
179 txn.apply_update(update)
180 .map_err(|e| anyhow::anyhow!("apply error: {}", e))?;
181 }
182
183 {
185 let text = ours_doc.get_or_insert_text(TEXT_KEY);
186 let mut txn = ours_doc.transact_mut();
187 apply_ops(&text, &mut txn, &ours_ops);
188 }
189
190 {
192 let text = theirs_doc.get_or_insert_text(TEXT_KEY);
193 let mut txn = theirs_doc.transact_mut();
194 apply_ops(&text, &mut txn, &theirs_ops);
195 }
196
197 let ours_sv = {
199 let txn = ours_doc.transact();
200 txn.state_vector()
201 };
202 let theirs_update = {
203 let txn = theirs_doc.transact();
204 txn.encode_state_as_update_v1(&ours_sv)
205 };
206 {
207 let update = Update::decode_v1(&theirs_update)
208 .map_err(|e| anyhow::anyhow!("decode error: {}", e))?;
209 let mut txn = ours_doc.transact_mut();
210 txn.apply_update(update)
211 .map_err(|e| anyhow::anyhow!("apply error: {}", e))?;
212 }
213
214 let text = ours_doc.get_or_insert_text(TEXT_KEY);
216 let txn = ours_doc.transact();
217 Ok(text.get_string(&txn))
218}
219
220pub fn compact(state: &[u8]) -> Result<Vec<u8>> {
222 let doc = CrdtDoc::decode_state(state)?;
223 Ok(doc.encode_state())
224}
225
226fn common_prefix_len(a: &str, b: &str) -> usize {
228 a.bytes().zip(b.bytes()).take_while(|(x, y)| x == y).count()
229}
230
231#[derive(Debug)]
233enum EditOp {
234 Retain(u32),
235 Delete(u32),
236 Insert(String),
237}
238
239fn compute_edit_ops(from: &str, to: &str) -> Vec<EditOp> {
241 use similar::{ChangeTag, TextDiff};
242
243 let diff = TextDiff::from_lines(from, to);
244 let mut ops = Vec::new();
245
246 for change in diff.iter_all_changes() {
247 match change.tag() {
248 ChangeTag::Equal => {
249 let len = change.value().len() as u32;
250 if let Some(EditOp::Retain(n)) = ops.last_mut() {
251 *n += len;
252 } else {
253 ops.push(EditOp::Retain(len));
254 }
255 }
256 ChangeTag::Delete => {
257 let len = change.value().len() as u32;
258 if let Some(EditOp::Delete(n)) = ops.last_mut() {
259 *n += len;
260 } else {
261 ops.push(EditOp::Delete(len));
262 }
263 }
264 ChangeTag::Insert => {
265 let s = change.value().to_string();
266 if let Some(EditOp::Insert(existing)) = ops.last_mut() {
267 existing.push_str(&s);
268 } else {
269 ops.push(EditOp::Insert(s));
270 }
271 }
272 }
273 }
274
275 ops
276}
277
278fn apply_ops(text: &TextRef, txn: &mut yrs::TransactionMut<'_>, ops: &[EditOp]) {
280 let mut cursor: u32 = 0;
281 for op in ops {
282 match op {
283 EditOp::Retain(n) => cursor += n,
284 EditOp::Delete(n) => {
285 text.remove_range(txn, cursor, *n);
286 }
288 EditOp::Insert(s) => {
289 text.insert(txn, cursor, s);
290 cursor += s.len() as u32;
291 }
292 }
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299
300 #[test]
301 fn roundtrip_text() {
302 let content = "Hello, world!\nLine two.\n";
303 let doc = CrdtDoc::from_text(content);
304 assert_eq!(doc.to_text(), content);
305 }
306
307 #[test]
308 fn roundtrip_encode_decode() {
309 let content = "Some document content.\n";
310 let doc = CrdtDoc::from_text(content);
311 let encoded = doc.encode_state();
312 let decoded = CrdtDoc::decode_state(&encoded).unwrap();
313 assert_eq!(decoded.to_text(), content);
314 }
315
316 #[test]
317 fn apply_edit_insert() {
318 let doc = CrdtDoc::from_text("Hello world");
319 doc.apply_edit(5, 0, ",");
320 assert_eq!(doc.to_text(), "Hello, world");
321 }
322
323 #[test]
324 fn apply_edit_delete() {
325 let doc = CrdtDoc::from_text("Hello, world");
326 doc.apply_edit(5, 1, "");
327 assert_eq!(doc.to_text(), "Hello world");
328 }
329
330 #[test]
331 fn apply_edit_replace() {
332 let doc = CrdtDoc::from_text("Hello world");
333 doc.apply_edit(6, 5, "Rust");
334 assert_eq!(doc.to_text(), "Hello Rust");
335 }
336
337 #[test]
338 fn concurrent_append_merge_no_conflict() {
339 let base = "# Document\n\nBase content.\n";
340 let base_doc = CrdtDoc::from_text(base);
341 let base_state = base_doc.encode_state();
342
343 let ours = format!("{base}## Agent\n\nAgent response.\n");
344 let theirs = format!("{base}## User\n\nUser addition.\n");
345
346 let merged = merge(Some(&base_state), &ours, &theirs).unwrap();
347
348 assert!(merged.contains("Agent response."), "missing agent text");
350 assert!(merged.contains("User addition."), "missing user text");
351 assert!(merged.contains("Base content."), "missing base text");
352 assert!(!merged.contains("<<<<<<<"));
354 assert!(!merged.contains(">>>>>>>"));
355 }
356
357 #[test]
358 fn concurrent_insert_same_position() {
359 let base = "Line 1\nLine 3\n";
360 let base_doc = CrdtDoc::from_text(base);
361 let base_state = base_doc.encode_state();
362
363 let ours = "Line 1\nAgent line\nLine 3\n";
364 let theirs = "Line 1\nUser line\nLine 3\n";
365
366 let merged = merge(Some(&base_state), ours, theirs).unwrap();
367
368 assert!(merged.contains("Agent line"), "missing agent insertion");
370 assert!(merged.contains("User line"), "missing user insertion");
371 assert!(merged.contains("Line 1"), "missing line 1");
372 assert!(merged.contains("Line 3"), "missing line 3");
373 }
374
375 #[test]
376 fn merge_no_base_state() {
377 let ours = "Agent wrote this.\n";
379 let theirs = "User wrote this.\n";
380
381 let merged = merge(None, ours, theirs).unwrap();
382
383 assert!(merged.contains("Agent wrote this."));
384 assert!(merged.contains("User wrote this."));
385 }
386
387 #[test]
388 fn compact_preserves_content() {
389 let doc = CrdtDoc::from_text("Hello");
390 doc.apply_edit(5, 0, " world");
391 doc.apply_edit(11, 0, "!");
392
393 let state = doc.encode_state();
394 let compacted = compact(&state).unwrap();
395 let restored = CrdtDoc::decode_state(&compacted).unwrap();
396
397 assert_eq!(restored.to_text(), "Hello world!");
398 assert!(compacted.len() <= state.len());
399 }
400
401 #[test]
402 fn compact_reduces_size_after_edits() {
403 let doc = CrdtDoc::from_text("aaaa");
404 for i in 0..20 {
406 let c = ((b'a' + (i % 26)) as char).to_string();
407 doc.apply_edit(0, 1, &c);
408 }
409 let state = doc.encode_state();
410 let compacted = compact(&state).unwrap();
411 let restored = CrdtDoc::decode_state(&compacted).unwrap();
412 assert_eq!(restored.to_text(), doc.to_text());
413 }
414
415 #[test]
416 fn empty_document() {
417 let doc = CrdtDoc::from_text("");
418 assert_eq!(doc.to_text(), "");
419
420 let encoded = doc.encode_state();
421 let decoded = CrdtDoc::decode_state(&encoded).unwrap();
422 assert_eq!(decoded.to_text(), "");
423 }
424
425 #[test]
426 fn decode_invalid_bytes_errors() {
427 let result = CrdtDoc::decode_state(&[0xff, 0xfe, 0xfd]);
428 assert!(result.is_err());
429 }
430
431 #[test]
432 fn merge_identical_texts() {
433 let base = "Same content.\n";
434 let base_doc = CrdtDoc::from_text(base);
435 let state = base_doc.encode_state();
436
437 let merged = merge(Some(&state), base, base).unwrap();
438 assert_eq!(merged, base);
439 }
440
441 #[test]
442 fn merge_one_side_unchanged() {
443 let base = "Original.\n";
444 let base_doc = CrdtDoc::from_text(base);
445 let state = base_doc.encode_state();
446
447 let ours = "Original.\nAgent added.\n";
448 let merged = merge(Some(&state), ours, base).unwrap();
449 assert_eq!(merged, ours);
450 }
451
452 #[test]
463 fn merge_stale_base_no_duplicate_user_prompt() {
464 let base_content = "\
466## Assistant
467
468Previous response content.
469
470Committed and pushed.
471
472";
473 let base_doc = CrdtDoc::from_text(base_content);
474 let base_state = base_doc.encode_state();
475
476 let user_prompt = "\
478Opening a video a shows video a.
479Closing video a then opening video b start video b but video b is hidden.
480Closing video b then reopening video b starts and shows video b. video b is visible.
481";
482
483 let ours = format!("\
485{}{}### Re: Close A → Open B still hidden
486
487Added explicit height and visibility reset.
488
489Committed and pushed.
490
491", base_content, user_prompt);
492
493 let theirs = format!("\
495{}{}
496", base_content, user_prompt);
497
498 let merged = merge(Some(&base_state), &ours, &theirs).unwrap();
499
500 let prompt_count = merged.matches("Opening a video a shows video a.").count();
502 assert_eq!(
503 prompt_count, 1,
504 "User prompt duplicated! Appeared {} times in:\n{}",
505 prompt_count, merged
506 );
507
508 assert!(
510 merged.contains("### Re: Close A → Open B still hidden"),
511 "Agent response missing from merge:\n{}", merged
512 );
513 }
514
515 #[test]
518 fn merge_stale_base_same_insertion_both_sides() {
519 let base_content = "Line 1\nLine 2\n";
520 let base_doc = CrdtDoc::from_text(base_content);
521 let base_state = base_doc.encode_state();
522
523 let shared_addition = "User typed this.\n";
525 let ours = format!("{}{}Agent response.\n", base_content, shared_addition);
526 let theirs = format!("{}{}", base_content, shared_addition);
527
528 let merged = merge(Some(&base_state), &ours, &theirs).unwrap();
529
530 let count = merged.matches("User typed this.").count();
531 assert_eq!(
532 count, 1,
533 "Shared text duplicated! Appeared {} times in:\n{}",
534 count, merged
535 );
536 assert!(merged.contains("Agent response."), "Agent text missing:\n{}", merged);
537 }
538
539 #[test]
555 fn merge_no_character_interleaving() {
556 let base = "# Doc\n\nPrevious content.\n\n";
558 let base_doc = CrdtDoc::from_text(base);
559 let base_state = base_doc.encode_state();
560
561 let ours = "# Doc\n\nPrevious content.\n\n*Compacted. Content archived to*\n";
563 let theirs = "# Doc\n\nPrevious content.\n\n**Soft-bristle brush only**\n";
565
566 let merged = merge(Some(&base_state), ours, theirs).unwrap();
567
568 assert!(
570 merged.contains("*Compacted. Content archived to*"),
571 "Agent text should be contiguous (not interleaved). Got:\n{}",
572 merged
573 );
574 assert!(
575 merged.contains("**Soft-bristle brush only**"),
576 "User text should be contiguous (not interleaved). Got:\n{}",
577 merged
578 );
579 }
580
581 #[test]
584 fn merge_concurrent_same_line_no_garbling() {
585 let base = "Some base text\n";
586 let base_doc = CrdtDoc::from_text(base);
587 let base_state = base_doc.encode_state();
588
589 let ours = "Agent wrote this line\n";
591 let theirs = "User wrote different text\n";
592
593 let merged = merge(Some(&base_state), ours, theirs).unwrap();
594
595 let has_agent_contiguous = merged.contains("Agent wrote this line");
597 let has_user_contiguous = merged.contains("User wrote different text");
598
599 assert!(
600 has_agent_contiguous || has_user_contiguous,
601 "At least one side should have contiguous text (no char interleaving). Got:\n{}",
602 merged
603 );
604 }
605
606 #[test]
622 fn merge_replace_vs_append_no_interleaving() {
623 let header = "---\nagent_doc_format: template\n---\n\n# Title\n\n<!-- agent:exchange -->\n";
625 let footer = "\n<!-- /agent:exchange -->\n";
626
627 let old_exchange = "\
629### Committed, Pushed & Released
630
631**project (v0.1.0):**
632- Committed initial implementation
633- Tagged v0.1.0 and pushed
634
635Add a README.md to the project.
636Also add AGENTS.md with a symlink CLAUDE.md
637
638**sub-project:**
639- Committed fix + SPEC.md
640- Pushed to remote
641";
642 let stale_base = format!("{header}{old_exchange}{footer}");
643 let stale_state = CrdtDoc::from_text(&stale_base).encode_state();
644
645 let _baseline = stale_base.clone();
648
649 let agent_exchange = "\
651### Done
652
653Added to project and pushed:
654
655- **README.md** — overview, usage, design notes
656- **AGENTS.md** — architecture, key decisions, commands, related projects
657- **CLAUDE.md** → symlink to AGENTS.md
658
659All committed and pushed.
660";
661 let ours = format!("{header}{agent_exchange}{footer}");
662
663 let theirs_exchange = "\
667### Committed, Pushed & Released
668
669**project (v0.1.0):**
670- Committed initial implementation
671- Tagged v0.1.0 and pushed
672
673Add a README.md to the project.
674Also add AGENTS.md with a symlink CLAUDE.md
675
676Please add tests.
677Please comprehensively test adherence to the spec.
678
679**sub-project:**
680- Committed fix + SPEC.md
681- Pushed to remote
682";
683 let theirs = format!("{header}{theirs_exchange}{footer}");
684
685 let merged = merge(Some(&stale_state), &ours, &theirs).unwrap();
687
688 assert!(
690 merged.contains("- **AGENTS.md** — architecture, key decisions, commands, related projects"),
691 "Agent text garbled (mid-word split). Got:\n{}", merged
692 );
693
694 assert!(
696 merged.contains("Please add tests."),
697 "User addition missing. Got:\n{}", merged
698 );
699
700 assert!(
702 !merged.contains("key deAdd") && !merged.contains("key de\n"),
703 "Old content interleaved into agent text. Got:\n{}", merged
704 );
705 }
706
707 #[test]
710 fn merge_replace_vs_append_with_baseline_base() {
711 let header = "---\nagent_doc_format: template\n---\n\n# Title\n\n<!-- agent:exchange -->\n";
712 let footer = "\n<!-- /agent:exchange -->\n";
713
714 let old_exchange = "\
715### Previous Response
716
717Old content here.
718
719Add a README.md to the project.
720Also add AGENTS.md with a symlink CLAUDE.md
721";
722 let baseline = format!("{header}{old_exchange}{footer}");
723
724 let agent_exchange = "\
726### Done
727
728- **README.md** — overview, usage, design notes
729- **AGENTS.md** — architecture, key decisions, commands, related projects
730- **CLAUDE.md** → symlink to AGENTS.md
731
732All committed and pushed.
733";
734 let ours = format!("{header}{agent_exchange}{footer}");
735
736 let user_addition = "\nPlease add tests.\n";
738 let theirs = format!("{header}{old_exchange}{user_addition}{footer}");
739
740 let baseline_state = CrdtDoc::from_text(&baseline).encode_state();
742 let merged = merge(Some(&baseline_state), &ours, &theirs).unwrap();
743
744 assert!(
746 merged.contains("key decisions, commands, related projects"),
747 "Agent text garbled. Got:\n{}", merged
748 );
749
750 assert!(
752 merged.contains("Please add tests."),
753 "User addition missing. Got:\n{}", merged
754 );
755 }
756
757 #[test]
764 fn merge_streaming_concurrent_edit_preserves_formatting() {
765 let base = "commit and push all rappstack packages.\n\n";
767 let base_doc = CrdtDoc::from_text(base);
768 let base_state = base_doc.encode_state();
769
770 let ours = "\
772commit and push all rappstack packages.
773
774### Re: commit and push
775
776*Compacted. Content archived to `docs/`*
777
778Done — all packages pushed.
779";
780
781 let theirs = "\
783commit and push all rappstack packages.
784
785**Soft-bristle brush only**
786";
787
788 let merged = merge(Some(&base_state), ours, theirs).unwrap();
789
790 assert!(
792 merged.contains("*Compacted. Content archived to `docs/`*"),
793 "Agent formatting broken. Got:\n{}",
794 merged
795 );
796 assert!(
798 merged.contains("**Soft-bristle brush only**"),
799 "User formatting broken. Got:\n{}",
800 merged
801 );
802 assert!(
804 !merged.contains("*C*C") && !merged.contains("**Sot"),
805 "Character interleaving detected. Got:\n{}",
806 merged
807 );
808 }
809
810 #[test]
817 fn merge_replace_vs_insert_no_interleaving() {
818 let header = "---\nagent_doc_format: template\nagent_doc_write: crdt\n---\n\n# Document Title\n\nSome preamble text that both sides share.\nThis provides enough common prefix to avoid stale detection.\n\n<!-- agent:exchange -->\n";
819 let footer = "<!-- /agent:exchange -->\n";
820
821 let old_exchange = "Line one of old content\nLine two of old content\nLine three of old content\n";
822 let baseline = format!("{header}{old_exchange}{footer}");
823 let baseline_doc = CrdtDoc::from_text(&baseline);
824 let baseline_state = baseline_doc.encode_state();
825
826 let agent_exchange = "Completely new line one\nCompletely new line two\nCompletely new line three\nCompletely new line four\n";
828 let ours = format!("{header}{agent_exchange}{footer}");
829
830 let theirs = format!("{header}Line one of old content\nUser inserted this line\nLine two of old content\nLine three of old content\n{footer}");
832
833 let merged = merge(Some(&baseline_state), &ours, &theirs).unwrap();
834
835 assert!(
837 merged.contains("Completely new line one"),
838 "Agent line 1 missing or garbled. Got:\n{}", merged
839 );
840 assert!(
841 merged.contains("Completely new line two"),
842 "Agent line 2 missing or garbled. Got:\n{}", merged
843 );
844
845 assert!(
847 merged.contains("User inserted this line"),
848 "User insertion missing. Got:\n{}", merged
849 );
850
851 assert!(
853 !merged.contains("CompleteUser") && !merged.contains("Complete\nUser"),
854 "Character interleaving detected. Got:\n{}", merged
855 );
856 }
857}