1use std::collections::{BTreeSet, HashMap, HashSet};
14use std::sync::Arc;
15use std::time::{SystemTime, UNIX_EPOCH};
16
17use sha2::{Digest, Sha256};
18
19use crate::application::vcs::{
20 AsOfSpec, Author, CheckoutInput, CheckoutTarget, Commit, CommitHash, Conflict,
21 CreateBranchInput, CreateCommitInput, CreateTagInput, Diff, DiffChange, DiffEntry, DiffInput,
22 LogInput, LogRange, MergeInput, MergeOutcome, MergeStrategy, Ref, RefKind, RefName, ResetInput,
23 ResetMode, Status, StatusInput,
24};
25use crate::application::vcs_collections as vc;
26use crate::json::Value as JsonValue;
27use crate::runtime::RedDBRuntime;
28use crate::storage::schema::Value;
29use crate::storage::transaction::snapshot::{Xid, XID_NONE};
30use crate::storage::unified::entity::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
31use crate::storage::unified::UnifiedStore;
32use crate::{RedDBError, RedDBResult};
33
34fn unimplemented(method: &str) -> RedDBError {
39 RedDBError::Internal(format!("vcs: {method} not yet implemented"))
40}
41
42fn now_ms() -> i64 {
43 SystemTime::now()
44 .duration_since(UNIX_EPOCH)
45 .map(|d| d.as_millis() as i64)
46 .unwrap_or(0)
47}
48
49fn row_text(row: &RowData, field: &str) -> Option<String> {
50 match row.get_field(field)?.clone() {
51 Value::Text(v) => Some(v.to_string()),
52 _ => None,
53 }
54}
55
56fn row_u64(row: &RowData, field: &str) -> Option<u64> {
57 match row.get_field(field)?.clone() {
58 Value::UnsignedInteger(v) => Some(v),
59 Value::Integer(v) if v >= 0 => Some(v as u64),
60 _ => None,
61 }
62}
63
64fn row_i64(row: &RowData, field: &str) -> Option<i64> {
65 match row.get_field(field)?.clone() {
66 Value::Integer(v) => Some(v),
67 Value::UnsignedInteger(v) => Some(v as i64),
68 Value::TimestampMs(v) | Value::Timestamp(v) => Some(v),
69 _ => None,
70 }
71}
72
73fn row_json(row: &RowData, field: &str) -> JsonValue {
74 match row.get_field(field) {
75 Some(Value::Json(bytes)) => {
76 crate::json::from_slice::<JsonValue>(bytes).unwrap_or(JsonValue::Null)
77 }
78 Some(Value::Text(s)) => crate::json::from_str::<JsonValue>(s)
79 .unwrap_or_else(|_| JsonValue::String(s.to_string())),
80 _ => JsonValue::Null,
81 }
82}
83
84fn row_string_list(row: &RowData, field: &str) -> Vec<String> {
85 match row.get_field(field) {
86 Some(Value::Array(items)) => items
87 .iter()
88 .filter_map(|v| match v {
89 Value::Text(s) => Some(s.to_string()),
90 _ => None,
91 })
92 .collect(),
93 _ => Vec::new(),
94 }
95}
96
97fn insert_meta_row(
98 store: &UnifiedStore,
99 collection: &str,
100 fields: HashMap<String, Value>,
101) -> RedDBResult<EntityId> {
102 let _ = store.get_or_create_collection(collection);
103 store
104 .insert_auto(
105 collection,
106 UnifiedEntity::new(
107 EntityId::new(0),
108 EntityKind::TableRow {
109 table: Arc::from(collection),
110 row_id: 0,
111 },
112 EntityData::Row(RowData {
113 columns: Vec::new(),
114 named: Some(fields),
115 schema: None,
116 }),
117 ),
118 )
119 .map_err(|e| RedDBError::Internal(e.to_string()))
120}
121
122fn compute_commit_hash(
123 root_xid: Xid,
124 parents: &[CommitHash],
125 author: &Author,
126 message: &str,
127 timestamp_ms: i64,
128) -> CommitHash {
129 let mut h = Sha256::new();
130 h.update(b"reddb-commit-v1\n");
131 h.update(root_xid.to_be_bytes());
132 let mut sorted = parents.to_vec();
133 sorted.sort();
134 for p in &sorted {
135 h.update(b"\np=");
136 h.update(p.as_bytes());
137 }
138 h.update(b"\na=");
139 h.update(author.name.as_bytes());
140 h.update(b"\n");
141 h.update(author.email.as_bytes());
142 h.update(b"\nm=");
143 h.update(message.as_bytes());
144 h.update(b"\nt=");
145 h.update(timestamp_ms.to_be_bytes());
146 let digest = h.finalize();
147 hex::encode(digest)
148}
149
150fn normalize_branch_name(raw: &str) -> String {
151 if raw.starts_with(vc::BRANCH_REF_PREFIX) {
152 raw.to_string()
153 } else {
154 format!("{}{}", vc::BRANCH_REF_PREFIX, raw)
155 }
156}
157
158fn normalize_tag_name(raw: &str) -> String {
159 if raw.starts_with(vc::TAG_REF_PREFIX) {
160 raw.to_string()
161 } else {
162 format!("{}{}", vc::TAG_REF_PREFIX, raw)
163 }
164}
165
166fn head_ref_id(connection_id: u64) -> String {
167 format!("{}{}", vc::HEAD_ID_PREFIX, connection_id)
168}
169
170fn load_commit_entity(store: &UnifiedStore, hash: &str) -> Option<UnifiedEntity> {
175 let manager = store.get_collection(vc::COMMITS)?;
176 manager
177 .query_all(|entity| {
178 entity
179 .data
180 .as_row()
181 .is_some_and(|row| row_text(row, "id").as_deref() == Some(hash))
182 })
183 .into_iter()
184 .next()
185}
186
187fn commit_from_row(row: &RowData) -> Option<Commit> {
188 Some(Commit {
189 hash: row_text(row, "id")?,
190 root_xid: row_u64(row, "root_xid")?,
191 parents: row_string_list(row, "parents"),
192 height: row_u64(row, "height").unwrap_or(0),
193 author: Author {
194 name: row_text(row, "author_name").unwrap_or_default(),
195 email: row_text(row, "author_email").unwrap_or_default(),
196 },
197 committer: Author {
198 name: row_text(row, "committer_name").unwrap_or_default(),
199 email: row_text(row, "committer_email").unwrap_or_default(),
200 },
201 message: row_text(row, "message").unwrap_or_default(),
202 timestamp_ms: row_i64(row, "timestamp_ms").unwrap_or(0),
203 signature: row_text(row, "signature"),
204 })
205}
206
207fn load_commit(store: &UnifiedStore, hash: &str) -> Option<Commit> {
208 let entity = load_commit_entity(store, hash)?;
209 let row = entity.data.as_row()?;
210 commit_from_row(row)
211}
212
213fn save_commit(store: &UnifiedStore, commit: &Commit) -> RedDBResult<()> {
214 let mut fields: HashMap<String, Value> = HashMap::new();
215 fields.insert("id".to_string(), Value::text(commit.hash.as_str()));
216 fields.insert(
217 "root_xid".to_string(),
218 Value::UnsignedInteger(commit.root_xid),
219 );
220 fields.insert(
221 "parents".to_string(),
222 Value::Array(
223 commit
224 .parents
225 .iter()
226 .map(|p| Value::text(p.as_str()))
227 .collect(),
228 ),
229 );
230 fields.insert("height".to_string(), Value::UnsignedInteger(commit.height));
231 fields.insert(
232 "author_name".to_string(),
233 Value::text(commit.author.name.as_str()),
234 );
235 fields.insert(
236 "author_email".to_string(),
237 Value::text(commit.author.email.as_str()),
238 );
239 fields.insert(
240 "committer_name".to_string(),
241 Value::text(commit.committer.name.as_str()),
242 );
243 fields.insert(
244 "committer_email".to_string(),
245 Value::text(commit.committer.email.as_str()),
246 );
247 fields.insert("message".to_string(), Value::text(commit.message.as_str()));
248 fields.insert(
249 "timestamp_ms".to_string(),
250 Value::TimestampMs(commit.timestamp_ms),
251 );
252 if let Some(sig) = &commit.signature {
253 fields.insert("signature".to_string(), Value::text(sig.as_str()));
254 }
255 insert_meta_row(store, vc::COMMITS, fields)?;
256 Ok(())
257}
258
259fn ref_kind_from_str(s: &str) -> RefKind {
264 match s {
265 "tag" => RefKind::Tag,
266 "head" => RefKind::Head,
267 _ => RefKind::Branch,
268 }
269}
270
271fn ref_from_row(row: &RowData) -> Option<Ref> {
272 Some(Ref {
273 name: row_text(row, "id")?,
274 kind: ref_kind_from_str(&row_text(row, "type").unwrap_or_default()),
275 target: row_text(row, "target").unwrap_or_default(),
276 protected: row
277 .get_field("protected")
278 .and_then(|v| match v {
279 Value::Boolean(b) => Some(*b),
280 _ => None,
281 })
282 .unwrap_or(false),
283 })
284}
285
286fn load_ref_entity(store: &UnifiedStore, name: &str) -> Option<(EntityId, UnifiedEntity)> {
287 let manager = store.get_collection(vc::REFS)?;
288 manager
289 .query_all(|entity| {
290 entity
291 .data
292 .as_row()
293 .is_some_and(|row| row_text(row, "id").as_deref() == Some(name))
294 })
295 .into_iter()
296 .next()
297 .map(|entity| (entity.id, entity))
298}
299
300fn load_ref(store: &UnifiedStore, name: &str) -> Option<Ref> {
301 let (_, entity) = load_ref_entity(store, name)?;
302 ref_from_row(entity.data.as_row()?)
303}
304
305fn save_ref(store: &UnifiedStore, r: &Ref) -> RedDBResult<()> {
306 if let Some((id, _)) = load_ref_entity(store, &r.name) {
309 let _ = store.delete(vc::REFS, id);
310 }
311 let mut fields: HashMap<String, Value> = HashMap::new();
312 fields.insert("id".to_string(), Value::text(r.name.as_str()));
313 let kind_str = match r.kind {
314 RefKind::Branch => "branch",
315 RefKind::Tag => "tag",
316 RefKind::Head => "head",
317 };
318 fields.insert("type".to_string(), Value::text(kind_str));
319 fields.insert("target".to_string(), Value::text(r.target.as_str()));
320 fields.insert("protected".to_string(), Value::Boolean(r.protected));
321 insert_meta_row(store, vc::REFS, fields)?;
322 Ok(())
323}
324
325fn delete_ref(store: &UnifiedStore, name: &str) -> RedDBResult<bool> {
326 let Some((id, _)) = load_ref_entity(store, name) else {
327 return Ok(false);
328 };
329 store
330 .delete(vc::REFS, id)
331 .map_err(|e| RedDBError::Internal(e.to_string()))?;
332 Ok(true)
333}
334
335fn is_versioned(store: &UnifiedStore, name: &str) -> bool {
346 if name.starts_with("red_") {
347 return false;
348 }
349 let Some(manager) = store.get_collection(vc::SETTINGS) else {
350 return false;
351 };
352 let target = name.to_string();
353 manager
354 .query_all(|entity| {
355 entity
356 .data
357 .as_row()
358 .is_some_and(|row| row_text(row, "id").as_deref() == Some(&target))
359 })
360 .into_iter()
361 .any(|entity| {
362 entity
363 .data
364 .as_row()
365 .and_then(|row| row.get_field("versioned"))
366 .map(|v| matches!(v, Value::Boolean(true)))
367 .unwrap_or(false)
368 })
369}
370
371fn versioned_collections(store: &UnifiedStore) -> Vec<String> {
375 let Some(manager) = store.get_collection(vc::SETTINGS) else {
376 return Vec::new();
377 };
378 manager
379 .query_all(|entity| {
380 entity
381 .data
382 .as_row()
383 .and_then(|row| row.get_field("versioned"))
384 .map(|v| matches!(v, Value::Boolean(true)))
385 .unwrap_or(false)
386 })
387 .into_iter()
388 .filter_map(|entity| row_text(entity.data.as_row()?, "id"))
389 .collect()
390}
391
392fn set_versioned_flag(store: &UnifiedStore, name: &str, enabled: bool) -> RedDBResult<()> {
398 if name.starts_with("red_") {
399 return Err(RedDBError::InvalidConfig(format!(
400 "cannot version internal collection `{name}`"
401 )));
402 }
403 let target = name.to_string();
404 if let Some(manager) = store.get_collection(vc::SETTINGS) {
405 let rows = manager.query_all(|entity| {
406 entity
407 .data
408 .as_row()
409 .is_some_and(|row| row_text(row, "id").as_deref() == Some(&target))
410 });
411 for row in rows {
412 let _ = store.delete(vc::SETTINGS, row.id);
413 }
414 }
415 if !enabled {
416 return Ok(());
417 }
418 let mut fields: HashMap<String, Value> = HashMap::new();
419 fields.insert("id".to_string(), Value::text(name));
420 fields.insert("versioned".to_string(), Value::Boolean(true));
421 fields.insert("ts_ms".to_string(), Value::TimestampMs(now_ms()));
422 insert_meta_row(store, vc::SETTINGS, fields)?;
423 Ok(())
424}
425
426fn list_refs_by_prefix(store: &UnifiedStore, prefix: Option<&str>) -> Vec<Ref> {
427 let Some(manager) = store.get_collection(vc::REFS) else {
428 return Vec::new();
429 };
430 let prefix_owned = prefix.map(|s| s.to_string());
431 manager
432 .query_all(|entity| {
433 entity.data.as_row().is_some_and(|row| {
434 let id = row_text(row, "id").unwrap_or_default();
435 match &prefix_owned {
436 Some(p) => id.starts_with(p),
437 None => true,
438 }
439 })
440 })
441 .into_iter()
442 .filter_map(|entity| ref_from_row(entity.data.as_row()?))
443 .collect()
444}
445
446fn ancestor_set(store: &UnifiedStore, start: &str, max_steps: usize) -> HashSet<CommitHash> {
451 let mut visited: HashSet<CommitHash> = HashSet::new();
452 let mut stack: Vec<CommitHash> = vec![start.to_string()];
453 let mut steps = 0usize;
454 while let Some(hash) = stack.pop() {
455 if !visited.insert(hash.clone()) {
456 continue;
457 }
458 if let Some(c) = load_commit(store, &hash) {
459 for p in c.parents {
460 if !visited.contains(&p) {
461 stack.push(p);
462 }
463 }
464 }
465 steps += 1;
466 if steps >= max_steps {
467 break;
468 }
469 }
470 visited
471}
472
473fn topo_walk(store: &UnifiedStore, start: &str, range: &LogRange) -> Vec<Commit> {
474 let limit = range.limit.unwrap_or(usize::MAX);
475 let skip = range.skip.unwrap_or(0);
476 let exclude = range
477 .from
478 .as_ref()
479 .map(|h| ancestor_set(store, h, 100_000))
480 .unwrap_or_default();
481
482 let mut visited: HashSet<CommitHash> = HashSet::new();
483 let mut stack: Vec<CommitHash> = vec![start.to_string()];
484 let mut out: Vec<Commit> = Vec::new();
485 let mut skipped = 0usize;
486 while let Some(hash) = stack.pop() {
487 if !visited.insert(hash.clone()) {
488 continue;
489 }
490 if exclude.contains(&hash) {
491 continue;
492 }
493 let Some(commit) = load_commit(store, &hash) else {
494 continue;
495 };
496 if range.no_merges && commit.parents.len() > 1 {
497 for p in &commit.parents {
498 if !visited.contains(p) {
499 stack.push(p.clone());
500 }
501 }
502 continue;
503 }
504 for p in &commit.parents {
505 if !visited.contains(p) {
506 stack.push(p.clone());
507 }
508 }
509 if skipped < skip {
510 skipped += 1;
511 continue;
512 }
513 if out.len() >= limit {
514 break;
515 }
516 out.push(commit);
517 }
518 out.sort_by(|a, b| {
520 b.height
521 .cmp(&a.height)
522 .then(b.timestamp_ms.cmp(&a.timestamp_ms))
523 });
524 out
525}
526
527fn resolve_ref_chain(store: &UnifiedStore, name: &str) -> Option<CommitHash> {
532 let mut current = name.to_string();
534 for _ in 0..4 {
535 let r = load_ref(store, ¤t)?;
536 match r.kind {
537 RefKind::Branch | RefKind::Tag => return Some(r.target),
538 RefKind::Head => {
539 current = r.target;
540 continue;
541 }
542 }
543 }
544 None
545}
546
547fn resolve_short_commit(store: &UnifiedStore, prefix: &str) -> Option<CommitHash> {
548 if prefix.len() < 4 {
549 return None;
550 }
551 let manager = store.get_collection(vc::COMMITS)?;
552 let matches: Vec<String> = manager
553 .query_all(|entity| {
554 entity.data.as_row().is_some_and(|row| {
555 row_text(row, "id")
556 .map(|id| id.starts_with(prefix))
557 .unwrap_or(false)
558 })
559 })
560 .into_iter()
561 .filter_map(|entity| row_text(entity.data.as_row()?, "id"))
562 .collect();
563 if matches.len() == 1 {
564 matches.into_iter().next()
565 } else {
566 None
567 }
568}
569
570fn upsert_workset(
575 store: &UnifiedStore,
576 connection_id: u64,
577 branch: &str,
578 base_commit: Option<&str>,
579 working_xid: Xid,
580) -> RedDBResult<()> {
581 let conn_id = connection_id.to_string();
582 if let Some(manager) = store.get_collection(vc::WORKSETS) {
584 let rows = manager.query_all(|entity| {
585 entity
586 .data
587 .as_row()
588 .is_some_and(|row| row_text(row, "id").as_deref() == Some(&conn_id))
589 });
590 for row in rows {
591 let _ = store.delete(vc::WORKSETS, row.id);
592 }
593 }
594 let mut fields: HashMap<String, Value> = HashMap::new();
595 fields.insert("id".to_string(), Value::text(conn_id.as_str()));
596 fields.insert("branch".to_string(), Value::text(branch));
597 if let Some(base) = base_commit {
598 fields.insert("base_commit".to_string(), Value::text(base));
599 }
600 fields.insert(
601 "working_xid".to_string(),
602 Value::UnsignedInteger(working_xid),
603 );
604 insert_meta_row(store, vc::WORKSETS, fields)?;
605 Ok(())
606}
607
608fn load_workset(store: &UnifiedStore, connection_id: u64) -> Option<(RefName, Option<CommitHash>)> {
609 let manager = store.get_collection(vc::WORKSETS)?;
610 let conn_id = connection_id.to_string();
611 manager
612 .query_all(|entity| {
613 entity
614 .data
615 .as_row()
616 .is_some_and(|row| row_text(row, "id").as_deref() == Some(&conn_id))
617 })
618 .into_iter()
619 .find_map(|entity| {
620 let row = entity.data.as_row()?;
621 Some((
622 row_text(row, "branch").unwrap_or_else(|| vc::DEFAULT_BRANCH_REF.to_string()),
623 row_text(row, "base_commit"),
624 ))
625 })
626}
627
628impl RedDBRuntime {
633 pub fn vcs_commit(&self, input: CreateCommitInput) -> RedDBResult<Commit> {
634 let store_arc = self.inner.db.store();
635 let store: &UnifiedStore = &store_arc;
636
637 let workset = load_workset(store, input.connection_id);
640 let branch_ref = workset
641 .as_ref()
642 .map(|(b, _)| b.clone())
643 .or_else(|| resolve_ref_chain(store, &head_ref_id(input.connection_id)).and(None))
644 .unwrap_or_else(|| vc::DEFAULT_BRANCH_REF.to_string());
645
646 let parent_hash = workset
647 .as_ref()
648 .and_then(|(_, base)| base.clone())
649 .or_else(|| load_ref(store, &branch_ref).map(|r| r.target));
650
651 let parents: Vec<CommitHash> = match parent_hash.clone() {
652 Some(h) if !h.is_empty() => vec![h],
653 _ => Vec::new(),
654 };
655
656 let parent_height = parents
657 .iter()
658 .filter_map(|p| load_commit(store, p).map(|c| c.height))
659 .max()
660 .unwrap_or(0);
661 let height = if parents.is_empty() {
662 0
663 } else {
664 parent_height + 1
665 };
666
667 let root_xid = self.inner.snapshot_manager.begin();
673 self.inner.snapshot_manager.commit(root_xid);
674 let timestamp_ms = now_ms();
675 let committer = input.committer.unwrap_or_else(|| input.author.clone());
676
677 let hash = compute_commit_hash(
678 root_xid,
679 &parents,
680 &input.author,
681 &input.message,
682 timestamp_ms,
683 );
684
685 if load_commit(store, &hash).is_some() {
686 return Err(RedDBError::InvalidConfig(format!(
687 "commit {hash} already exists (duplicate timestamp+content)"
688 )));
689 }
690
691 let commit = Commit {
692 hash: hash.clone(),
693 root_xid,
694 parents,
695 height,
696 author: input.author,
697 committer,
698 message: input.message,
699 timestamp_ms,
700 signature: None,
701 };
702
703 save_commit(store, &commit)?;
704
705 if root_xid != XID_NONE {
708 self.inner.snapshot_manager.pin(root_xid);
709 }
710
711 let branch_name = if branch_ref.is_empty() {
713 vc::DEFAULT_BRANCH_REF.to_string()
714 } else {
715 branch_ref
716 };
717 save_ref(
718 store,
719 &Ref {
720 name: branch_name.clone(),
721 kind: RefKind::Branch,
722 target: hash.clone(),
723 protected: false,
724 },
725 )?;
726
727 upsert_workset(
728 store,
729 input.connection_id,
730 &branch_name,
731 Some(&hash),
732 root_xid,
733 )?;
734
735 Ok(commit)
736 }
737
738 pub fn vcs_branch_create(&self, input: CreateBranchInput) -> RedDBResult<Ref> {
739 let store_arc = self.inner.db.store();
740 let store: &UnifiedStore = &store_arc;
741 let full_name = normalize_branch_name(&input.name);
742 if load_ref(store, &full_name).is_some() {
743 return Err(RedDBError::InvalidConfig(format!(
744 "branch `{full_name}` already exists"
745 )));
746 }
747 let target_hash = match &input.from {
748 Some(spec) => RedDBRuntime::vcs_resolve_commitish(self, spec)?,
749 None => {
750 let workset = load_workset(store, input.connection_id);
752
753 workset
754 .and_then(|(_, base)| base)
755 .or_else(|| load_ref(store, vc::DEFAULT_BRANCH_REF).map(|r| r.target))
756 .unwrap_or_default()
757 }
758 };
759 let r = Ref {
760 name: full_name,
761 kind: RefKind::Branch,
762 target: target_hash,
763 protected: false,
764 };
765 save_ref(store, &r)?;
766 Ok(r)
767 }
768
769 pub fn vcs_branch_delete(&self, name: &str) -> RedDBResult<()> {
770 let store_arc = self.inner.db.store();
771 let store: &UnifiedStore = &store_arc;
772 let full = normalize_branch_name(name);
773 let existing = load_ref(store, &full)
774 .ok_or_else(|| RedDBError::NotFound(format!("branch `{full}` does not exist")))?;
775 if existing.protected {
776 return Err(RedDBError::ReadOnly(format!(
777 "branch `{full}` is protected"
778 )));
779 }
780 delete_ref(store, &full)?;
781 Ok(())
782 }
783
784 pub fn vcs_tag_create(&self, input: CreateTagInput) -> RedDBResult<Ref> {
785 let store_arc = self.inner.db.store();
786 let store: &UnifiedStore = &store_arc;
787 let full_name = normalize_tag_name(&input.name);
788 if load_ref(store, &full_name).is_some() {
789 return Err(RedDBError::InvalidConfig(format!(
790 "tag `{full_name}` already exists"
791 )));
792 }
793 let target_hash = RedDBRuntime::vcs_resolve_commitish(self, &input.target)?;
794 let r = Ref {
795 name: full_name,
796 kind: RefKind::Tag,
797 target: target_hash,
798 protected: false,
799 };
800 save_ref(store, &r)?;
801 Ok(r)
802 }
803
804 pub fn vcs_list_refs(&self, prefix: Option<&str>) -> RedDBResult<Vec<Ref>> {
805 let store_arc = self.inner.db.store();
806 let store: &UnifiedStore = &store_arc;
807 Ok(list_refs_by_prefix(store, prefix))
808 }
809
810 pub fn vcs_checkout(&self, input: CheckoutInput) -> RedDBResult<Ref> {
811 let store_arc = self.inner.db.store();
812 let store: &UnifiedStore = &store_arc;
813 let (branch_ref, target_hash) = match &input.target {
814 CheckoutTarget::Branch(name) => {
815 let full = normalize_branch_name(name);
816 let r = load_ref(store, &full).ok_or_else(|| {
817 RedDBError::NotFound(format!("branch `{full}` does not exist"))
818 })?;
819 (full, r.target)
820 }
821 CheckoutTarget::Tag(name) => {
822 let full = normalize_tag_name(name);
823 let r = load_ref(store, &full)
824 .ok_or_else(|| RedDBError::NotFound(format!("tag `{full}` does not exist")))?;
825 (full, r.target)
826 }
827 CheckoutTarget::Commit(hash) => {
828 let resolved = RedDBRuntime::vcs_resolve_commitish(self, hash)?;
829 (String::new(), resolved)
832 }
833 };
834
835 upsert_workset(
836 store,
837 input.connection_id,
838 &branch_ref,
839 Some(&target_hash),
840 self.inner.snapshot_manager.peek_next_xid(),
841 )?;
842
843 save_ref(
845 store,
846 &Ref {
847 name: head_ref_id(input.connection_id),
848 kind: RefKind::Head,
849 target: if branch_ref.is_empty() {
850 target_hash.clone()
851 } else {
852 branch_ref.clone()
853 },
854 protected: false,
855 },
856 )?;
857
858 Ok(Ref {
859 name: if branch_ref.is_empty() {
860 format!("detached:{target_hash}")
861 } else {
862 branch_ref
863 },
864 kind: if target_hash.is_empty() {
865 RefKind::Head
866 } else {
867 RefKind::Branch
868 },
869 target: target_hash,
870 protected: false,
871 })
872 }
873
874 pub fn vcs_merge(&self, input: MergeInput) -> RedDBResult<MergeOutcome> {
875 let store_arc = self.inner.db.store();
876 let store: &UnifiedStore = &store_arc;
877
878 let from_hash = RedDBRuntime::vcs_resolve_commitish(self, &input.from)?;
880 let from_commit = load_commit(store, &from_hash).ok_or_else(|| {
881 RedDBError::NotFound(format!("source commit `{from_hash}` not found"))
882 })?;
883
884 let workset = load_workset(store, input.connection_id);
886 let (head_branch, head_hash) = match workset {
887 Some((branch, Some(head))) => (branch, head),
888 Some((branch, None)) => {
889 let head = load_ref(store, &branch)
890 .map(|r| r.target)
891 .unwrap_or_default();
892 (branch, head)
893 }
894 None => {
895 let head = load_ref(store, vc::DEFAULT_BRANCH_REF)
896 .map(|r| r.target)
897 .unwrap_or_default();
898 (vc::DEFAULT_BRANCH_REF.to_string(), head)
899 }
900 };
901 if head_hash.is_empty() {
902 return Err(RedDBError::InvalidConfig(
903 "cannot merge: HEAD has no commits".to_string(),
904 ));
905 }
906
907 let from_ancestors = ancestor_set(store, &from_hash, 100_000);
909 let can_fast_forward = from_ancestors.contains(&head_hash);
910
911 match input.opts.strategy {
912 MergeStrategy::FastForwardOnly if !can_fast_forward => {
913 return Err(RedDBError::InvalidConfig(
914 "not a fast-forward — use --strategy auto or no-ff".to_string(),
915 ));
916 }
917 _ => {}
918 }
919
920 if can_fast_forward && input.opts.strategy != MergeStrategy::NoFastForward {
921 if head_branch.is_empty() {
922 return Err(RedDBError::InvalidConfig(
923 "cannot fast-forward a detached HEAD".to_string(),
924 ));
925 }
926 save_ref(
927 store,
928 &Ref {
929 name: head_branch.clone(),
930 kind: RefKind::Branch,
931 target: from_hash.clone(),
932 protected: false,
933 },
934 )?;
935 upsert_workset(
936 store,
937 input.connection_id,
938 &head_branch,
939 Some(&from_hash),
940 from_commit.root_xid,
941 )?;
942 return Ok(MergeOutcome {
943 merge_commit: Some(from_commit),
944 fast_forward: true,
945 conflicts: Vec::new(),
946 merge_state_id: None,
947 });
948 }
949
950 let lca = RedDBRuntime::vcs_lca(self, &head_hash, &from_hash)?;
958
959 let message = input
960 .opts
961 .message
962 .unwrap_or_else(|| format!("Merge {} into {}", input.from, head_branch));
963 let author = input.author.clone();
964 let timestamp_ms = now_ms();
965 let parents = vec![head_hash.clone(), from_hash.clone()];
966 let parent_height = parents
967 .iter()
968 .filter_map(|p| load_commit(store, p).map(|c| c.height))
969 .max()
970 .unwrap_or(0);
971 let height = parent_height + 1;
972 let root_xid = self.inner.snapshot_manager.begin();
973 self.inner.snapshot_manager.commit(root_xid);
974
975 let hash = compute_commit_hash(root_xid, &parents, &author, &message, timestamp_ms);
976
977 let merge_commit = Commit {
978 hash: hash.clone(),
979 root_xid,
980 parents,
981 height,
982 author: author.clone(),
983 committer: author,
984 message,
985 timestamp_ms,
986 signature: None,
987 };
988
989 save_commit(store, &merge_commit)?;
994 if root_xid != XID_NONE {
995 self.inner.snapshot_manager.pin(root_xid);
996 }
997 if !head_branch.is_empty() {
998 save_ref(
999 store,
1000 &Ref {
1001 name: head_branch.clone(),
1002 kind: RefKind::Branch,
1003 target: hash.clone(),
1004 protected: false,
1005 },
1006 )?;
1007 }
1008 upsert_workset(
1009 store,
1010 input.connection_id,
1011 &head_branch,
1012 Some(&hash),
1013 root_xid,
1014 )?;
1015
1016 let merge_state_id = format!("ms:{}", &hash[..16]);
1017
1018 let conflicts = if let Some(lca_hash) = &lca {
1023 materialize_merge_conflicts(
1024 self,
1025 store,
1026 lca_hash,
1027 &head_hash,
1028 &from_hash,
1029 &merge_state_id,
1030 )?
1031 } else {
1032 Vec::new()
1033 };
1034
1035 let mut ms_fields: HashMap<String, Value> = HashMap::new();
1036 ms_fields.insert("id".to_string(), Value::text(merge_state_id.as_str()));
1037 ms_fields.insert("kind".to_string(), Value::text("merge"));
1038 ms_fields.insert("branch".to_string(), Value::text(head_branch.as_str()));
1039 if let Some(base_hash) = &lca {
1040 ms_fields.insert("base".to_string(), Value::text(base_hash.as_str()));
1041 }
1042 ms_fields.insert("ours".to_string(), Value::text(head_hash.as_str()));
1043 ms_fields.insert("theirs".to_string(), Value::text(from_hash.as_str()));
1044 ms_fields.insert(
1045 "conflicts_count".to_string(),
1046 Value::UnsignedInteger(conflicts.len() as u64),
1047 );
1048 insert_meta_row(store, vc::MERGE_STATE, ms_fields)?;
1049
1050 Ok(MergeOutcome {
1051 merge_commit: Some(merge_commit),
1052 fast_forward: false,
1053 conflicts,
1054 merge_state_id: Some(merge_state_id),
1055 })
1056 }
1057
1058 pub fn vcs_cherry_pick(
1059 &self,
1060 connection_id: u64,
1061 commit: &str,
1062 author: Author,
1063 ) -> RedDBResult<MergeOutcome> {
1064 let store_arc = self.inner.db.store();
1065 let store: &UnifiedStore = &store_arc;
1066
1067 let src_hash = RedDBRuntime::vcs_resolve_commitish(self, commit)?;
1069 let src_commit = load_commit(store, &src_hash).ok_or_else(|| {
1070 RedDBError::NotFound(format!("cherry-pick source `{src_hash}` not found"))
1071 })?;
1072 if src_commit.parents.is_empty() {
1073 return Err(RedDBError::InvalidConfig(
1074 "cannot cherry-pick a root commit".to_string(),
1075 ));
1076 }
1077 if src_commit.parents.len() > 1 {
1078 return Err(RedDBError::InvalidConfig(
1079 "cannot cherry-pick a merge commit; resolve manually".to_string(),
1080 ));
1081 }
1082 let parent_hash = src_commit.parents[0].clone();
1083
1084 let workset = load_workset(store, connection_id);
1086 let (head_branch, head_hash) = match workset {
1087 Some((branch, Some(head))) => (branch, head),
1088 Some((branch, None)) => {
1089 let head = load_ref(store, &branch)
1090 .map(|r| r.target)
1091 .unwrap_or_default();
1092 (branch, head)
1093 }
1094 None => {
1095 let head = load_ref(store, vc::DEFAULT_BRANCH_REF)
1096 .map(|r| r.target)
1097 .unwrap_or_default();
1098 (vc::DEFAULT_BRANCH_REF.to_string(), head)
1099 }
1100 };
1101 if head_hash.is_empty() {
1102 return Err(RedDBError::InvalidConfig(
1103 "cannot cherry-pick onto empty HEAD".to_string(),
1104 ));
1105 }
1106
1107 let message = format!("cherry-pick: {}", src_commit.message);
1112 let parents = vec![head_hash.clone()];
1113 let parent_height = load_commit(store, &head_hash)
1114 .map(|c| c.height)
1115 .unwrap_or(0);
1116 let height = parent_height + 1;
1117 let root_xid = self.inner.snapshot_manager.begin();
1118 self.inner.snapshot_manager.commit(root_xid);
1119 let timestamp_ms = now_ms();
1120
1121 let hash = compute_commit_hash(root_xid, &parents, &author, &message, timestamp_ms);
1122 let pick_commit = Commit {
1123 hash: hash.clone(),
1124 root_xid,
1125 parents,
1126 height,
1127 author: author.clone(),
1128 committer: author,
1129 message,
1130 timestamp_ms,
1131 signature: None,
1132 };
1133 save_commit(store, &pick_commit)?;
1134 if root_xid != XID_NONE {
1135 self.inner.snapshot_manager.pin(root_xid);
1136 }
1137 if !head_branch.is_empty() {
1138 save_ref(
1139 store,
1140 &Ref {
1141 name: head_branch.clone(),
1142 kind: RefKind::Branch,
1143 target: hash.clone(),
1144 protected: false,
1145 },
1146 )?;
1147 }
1148 upsert_workset(store, connection_id, &head_branch, Some(&hash), root_xid)?;
1149
1150 let merge_state_id = format!("cp:{}", &hash[..16]);
1151 let conflicts = materialize_merge_conflicts(
1152 self,
1153 store,
1154 &parent_hash,
1155 &head_hash,
1156 &src_hash,
1157 &merge_state_id,
1158 )?;
1159
1160 let mut ms_fields: HashMap<String, Value> = HashMap::new();
1161 ms_fields.insert("id".to_string(), Value::text(merge_state_id.as_str()));
1162 ms_fields.insert("kind".to_string(), Value::text("cherry_pick"));
1163 ms_fields.insert("branch".to_string(), Value::text(head_branch.as_str()));
1164 ms_fields.insert("base".to_string(), Value::text(parent_hash.as_str()));
1165 ms_fields.insert("ours".to_string(), Value::text(head_hash.as_str()));
1166 ms_fields.insert("theirs".to_string(), Value::text(src_hash.as_str()));
1167 ms_fields.insert(
1168 "conflicts_count".to_string(),
1169 Value::UnsignedInteger(conflicts.len() as u64),
1170 );
1171 insert_meta_row(store, vc::MERGE_STATE, ms_fields)?;
1172
1173 Ok(MergeOutcome {
1174 merge_commit: Some(pick_commit),
1175 fast_forward: false,
1176 conflicts,
1177 merge_state_id: Some(merge_state_id),
1178 })
1179 }
1180
1181 pub fn vcs_revert(
1182 &self,
1183 connection_id: u64,
1184 commit: &str,
1185 author: Author,
1186 ) -> RedDBResult<Commit> {
1187 let store_arc = self.inner.db.store();
1188 let store: &UnifiedStore = &store_arc;
1189
1190 let src_hash = RedDBRuntime::vcs_resolve_commitish(self, commit)?;
1191 let src_commit = load_commit(store, &src_hash)
1192 .ok_or_else(|| RedDBError::NotFound(format!("revert source `{src_hash}` not found")))?;
1193 if src_commit.parents.is_empty() {
1194 return Err(RedDBError::InvalidConfig(
1195 "cannot revert a root commit".to_string(),
1196 ));
1197 }
1198 let parent_hash = src_commit.parents[0].clone();
1199
1200 let workset = load_workset(store, connection_id);
1201 let (head_branch, head_hash) = match workset {
1202 Some((branch, Some(head))) => (branch, head),
1203 Some((branch, None)) => {
1204 let head = load_ref(store, &branch)
1205 .map(|r| r.target)
1206 .unwrap_or_default();
1207 (branch, head)
1208 }
1209 None => {
1210 let head = load_ref(store, vc::DEFAULT_BRANCH_REF)
1211 .map(|r| r.target)
1212 .unwrap_or_default();
1213 (vc::DEFAULT_BRANCH_REF.to_string(), head)
1214 }
1215 };
1216 if head_hash.is_empty() {
1217 return Err(RedDBError::InvalidConfig(
1218 "cannot revert onto empty HEAD".to_string(),
1219 ));
1220 }
1221
1222 let message = format!("Revert \"{}\"", src_commit.message);
1227 let parents = vec![head_hash.clone()];
1228 let parent_height = load_commit(store, &head_hash)
1229 .map(|c| c.height)
1230 .unwrap_or(0);
1231 let height = parent_height + 1;
1232 let root_xid = self.inner.snapshot_manager.begin();
1233 self.inner.snapshot_manager.commit(root_xid);
1234 let timestamp_ms = now_ms();
1235
1236 let hash = compute_commit_hash(root_xid, &parents, &author, &message, timestamp_ms);
1237 let rv_commit = Commit {
1238 hash: hash.clone(),
1239 root_xid,
1240 parents,
1241 height,
1242 author: author.clone(),
1243 committer: author,
1244 message,
1245 timestamp_ms,
1246 signature: None,
1247 };
1248 save_commit(store, &rv_commit)?;
1249 if root_xid != XID_NONE {
1250 self.inner.snapshot_manager.pin(root_xid);
1251 }
1252 if !head_branch.is_empty() {
1253 save_ref(
1254 store,
1255 &Ref {
1256 name: head_branch.clone(),
1257 kind: RefKind::Branch,
1258 target: hash.clone(),
1259 protected: false,
1260 },
1261 )?;
1262 }
1263 upsert_workset(store, connection_id, &head_branch, Some(&hash), root_xid)?;
1264
1265 let merge_state_id = format!("rv:{}", &hash[..16]);
1266 let mut ms_fields: HashMap<String, Value> = HashMap::new();
1270 ms_fields.insert("id".to_string(), Value::text(merge_state_id.as_str()));
1271 ms_fields.insert("kind".to_string(), Value::text("revert"));
1272 ms_fields.insert("branch".to_string(), Value::text(head_branch.as_str()));
1273 ms_fields.insert("base".to_string(), Value::text(src_hash.as_str()));
1274 ms_fields.insert("ours".to_string(), Value::text(head_hash.as_str()));
1275 ms_fields.insert("theirs".to_string(), Value::text(parent_hash.as_str()));
1276 ms_fields.insert("conflicts_count".to_string(), Value::UnsignedInteger(0));
1277 insert_meta_row(store, vc::MERGE_STATE, ms_fields)?;
1278
1279 Ok(rv_commit)
1280 }
1281
1282 pub fn vcs_reset(&self, input: ResetInput) -> RedDBResult<()> {
1283 let store_arc = self.inner.db.store();
1284 let store: &UnifiedStore = &store_arc;
1285 let target_hash = RedDBRuntime::vcs_resolve_commitish(self, &input.target)?;
1286 let target_commit = load_commit(store, &target_hash).ok_or_else(|| {
1287 RedDBError::NotFound(format!("target commit `{target_hash}` not found"))
1288 })?;
1289
1290 let workset = load_workset(store, input.connection_id);
1292 let branch = workset
1293 .as_ref()
1294 .map(|(b, _)| b.clone())
1295 .unwrap_or_else(|| vc::DEFAULT_BRANCH_REF.to_string());
1296
1297 match input.mode {
1301 ResetMode::Soft | ResetMode::Mixed => {
1302 if !branch.is_empty() {
1303 save_ref(
1304 store,
1305 &Ref {
1306 name: branch.clone(),
1307 kind: RefKind::Branch,
1308 target: target_hash.clone(),
1309 protected: false,
1310 },
1311 )?;
1312 }
1313 upsert_workset(
1314 store,
1315 input.connection_id,
1316 &branch,
1317 Some(&target_hash),
1318 target_commit.root_xid,
1319 )?;
1320 Ok(())
1321 }
1322 ResetMode::Hard => Err(unimplemented("reset --hard (Phase 4)")),
1323 }
1324 }
1325
1326 pub fn vcs_log(&self, input: LogInput) -> RedDBResult<Vec<Commit>> {
1327 let store_arc = self.inner.db.store();
1328 let store: &UnifiedStore = &store_arc;
1329 let start = match &input.range.to {
1330 Some(spec) => RedDBRuntime::vcs_resolve_commitish(self, spec)?,
1331 None => {
1332 let workset = load_workset(store, input.connection_id);
1333 workset
1334 .and_then(|(_, base)| base)
1335 .or_else(|| load_ref(store, vc::DEFAULT_BRANCH_REF).map(|r| r.target))
1336 .unwrap_or_default()
1337 }
1338 };
1339 if start.is_empty() {
1340 return Ok(Vec::new());
1341 }
1342 Ok(topo_walk(store, &start, &input.range))
1343 }
1344
1345 pub fn vcs_diff(&self, input: DiffInput) -> RedDBResult<Diff> {
1346 let store_arc = self.inner.db.store();
1347 let store: &UnifiedStore = &store_arc;
1348 let from_hash = RedDBRuntime::vcs_resolve_commitish(self, &input.from)?;
1349 let to_hash = RedDBRuntime::vcs_resolve_commitish(self, &input.to)?;
1350 let from_xid = RedDBRuntime::vcs_resolve_as_of(self, AsOfSpec::Commit(from_hash.clone()))?;
1351 let to_xid = RedDBRuntime::vcs_resolve_as_of(self, AsOfSpec::Commit(to_hash.clone()))?;
1352
1353 let sm = &self.inner.snapshot_manager;
1354 let from_snap = sm.snapshot(from_xid);
1355 let to_snap = sm.snapshot(to_xid);
1356
1357 let mut entries: Vec<DiffEntry> = Vec::new();
1359 let mut added = 0usize;
1360 let mut removed = 0usize;
1361 let mut modified = 0usize;
1362 let collections = store.list_collections();
1363 for coll in collections {
1364 if coll.starts_with("red_") {
1365 continue;
1366 }
1367 if !is_versioned(store, &coll) {
1368 continue;
1369 }
1370 if let Some(filter) = &input.collection {
1371 if filter != &coll {
1372 continue;
1373 }
1374 }
1375 let Some(manager) = store.get_collection(&coll) else {
1376 continue;
1377 };
1378 let entities = manager.query_all(|_| true);
1379 for entity in &entities {
1381 let xmin = entity.xmin;
1382 let xmax = entity.xmax;
1383 let in_from = from_snap.sees(xmin, xmax) && !sm.is_aborted(xmin);
1384 let in_to = to_snap.sees(xmin, xmax) && !sm.is_aborted(xmin);
1385 if in_from == in_to {
1386 continue;
1387 }
1388 let entity_id = entity.id.raw().to_string();
1389 let payload = if input.summary_only {
1390 JsonValue::Null
1391 } else {
1392 JsonValue::String(format!("entity#{} xmin={} xmax={}", entity_id, xmin, xmax))
1393 };
1394 let change = match (in_from, in_to) {
1395 (false, true) => {
1396 added += 1;
1397 DiffChange::Added { after: payload }
1398 }
1399 (true, false) => {
1400 removed += 1;
1401 DiffChange::Removed { before: payload }
1402 }
1403 _ => unreachable!(),
1404 };
1405 entries.push(DiffEntry {
1406 collection: coll.clone(),
1407 entity_id,
1408 change,
1409 });
1410 }
1411 }
1412
1413 entries = coalesce_modifications(entries, &mut added, &mut removed, &mut modified);
1418
1419 Ok(Diff {
1420 from: from_hash,
1421 to: to_hash,
1422 entries,
1423 added,
1424 removed,
1425 modified,
1426 })
1427 }
1428
1429 pub fn vcs_status(&self, input: StatusInput) -> RedDBResult<Status> {
1430 let store_arc = self.inner.db.store();
1431 let store: &UnifiedStore = &store_arc;
1432 let workset = load_workset(store, input.connection_id);
1433 let (head_ref, head_commit) = match workset {
1434 Some((branch, base)) => {
1435 let base = base.or_else(|| load_ref(store, &branch).map(|r| r.target));
1436 (Some(branch), base)
1437 }
1438 None => {
1439 let base = load_ref(store, vc::DEFAULT_BRANCH_REF).map(|r| r.target);
1440 (Some(vc::DEFAULT_BRANCH_REF.to_string()), base)
1441 }
1442 };
1443 let detached = matches!(&head_ref, Some(s) if s.is_empty());
1444 Ok(Status {
1445 connection_id: input.connection_id,
1446 head_ref: head_ref.filter(|s| !s.is_empty()),
1447 head_commit,
1448 detached,
1449 staged_changes: 0,
1450 working_changes: 0,
1451 unresolved_conflicts: 0,
1452 merge_state_id: None,
1453 })
1454 }
1455
1456 pub fn vcs_lca(&self, a: &str, b: &str) -> RedDBResult<Option<CommitHash>> {
1457 let store_arc = self.inner.db.store();
1458 let store: &UnifiedStore = &store_arc;
1459 let a_hash = RedDBRuntime::vcs_resolve_commitish(self, a)?;
1460 let b_hash = RedDBRuntime::vcs_resolve_commitish(self, b)?;
1461 let a_ancestors = ancestor_set(store, &a_hash, 100_000);
1462
1463 let mut visited: HashSet<CommitHash> = HashSet::new();
1466 let mut stack: Vec<CommitHash> = vec![b_hash];
1467 let mut best: Option<(u64, CommitHash)> = None;
1468 while let Some(hash) = stack.pop() {
1469 if !visited.insert(hash.clone()) {
1470 continue;
1471 }
1472 if a_ancestors.contains(&hash) {
1473 let height = load_commit(store, &hash).map(|c| c.height).unwrap_or(0);
1474 match &best {
1475 Some((h, _)) if *h >= height => {}
1476 _ => best = Some((height, hash.clone())),
1477 }
1478 continue;
1481 }
1482 if let Some(commit) = load_commit(store, &hash) {
1483 for p in commit.parents {
1484 if !visited.contains(&p) {
1485 stack.push(p);
1486 }
1487 }
1488 }
1489 }
1490 Ok(best.map(|(_, h)| h))
1491 }
1492
1493 pub fn vcs_conflicts_list(&self, merge_state_id: &str) -> RedDBResult<Vec<Conflict>> {
1494 let store_arc = self.inner.db.store();
1495 let store: &UnifiedStore = &store_arc;
1496 let Some(manager) = store.get_collection(vc::CONFLICTS) else {
1497 return Ok(Vec::new());
1498 };
1499 let msid = merge_state_id.to_string();
1500 let out = manager
1501 .query_all(|entity| {
1502 entity
1503 .data
1504 .as_row()
1505 .is_some_and(|row| row_text(row, "merge_state_id").as_deref() == Some(&msid))
1506 })
1507 .into_iter()
1508 .filter_map(|entity| {
1509 let row = entity.data.as_row()?;
1510 Some(Conflict {
1511 id: row_text(row, "id")?,
1512 collection: row_text(row, "collection")?,
1513 entity_id: row_text(row, "entity_id").unwrap_or_default(),
1514 base: row_json(row, "base_json"),
1515 ours: row_json(row, "ours_json"),
1516 theirs: row_json(row, "theirs_json"),
1517 conflicting_paths: row_string_list(row, "conflicting_paths"),
1518 merge_state_id: row_text(row, "merge_state_id").unwrap_or_default(),
1519 })
1520 })
1521 .collect();
1522 Ok(out)
1523 }
1524
1525 pub fn vcs_conflict_resolve(&self, conflict_id: &str, _resolved: JsonValue) -> RedDBResult<()> {
1526 let store_arc = self.inner.db.store();
1527 let store: &UnifiedStore = &store_arc;
1528 let Some(manager) = store.get_collection(vc::CONFLICTS) else {
1529 return Err(RedDBError::NotFound(format!(
1530 "conflict `{conflict_id}` not found"
1531 )));
1532 };
1533 let cid = conflict_id.to_string();
1534 let mut deleted = 0usize;
1535 let matches = manager.query_all(|entity| {
1536 entity
1537 .data
1538 .as_row()
1539 .is_some_and(|row| row_text(row, "id").as_deref() == Some(&cid))
1540 });
1541 for entity in matches {
1542 store
1543 .delete(vc::CONFLICTS, entity.id)
1544 .map_err(|e| RedDBError::Internal(e.to_string()))?;
1545 deleted += 1;
1546 }
1547 if deleted == 0 {
1551 return Err(RedDBError::NotFound(format!(
1552 "conflict `{conflict_id}` not found"
1553 )));
1554 }
1555 Ok(())
1556 }
1557
1558 pub fn vcs_resolve_as_of(&self, spec: AsOfSpec) -> RedDBResult<Xid> {
1559 let store_arc = self.inner.db.store();
1560 let store: &UnifiedStore = &store_arc;
1561 match spec {
1562 AsOfSpec::Snapshot(x) => Ok(x),
1563 AsOfSpec::Commit(h) => {
1564 let c = load_commit(store, &h)
1565 .ok_or_else(|| RedDBError::NotFound(format!("commit `{h}` not found")))?;
1566 Ok(c.root_xid)
1567 }
1568 AsOfSpec::Branch(name) => {
1569 let full = normalize_branch_name(&name);
1570 let r = load_ref(store, &full).ok_or_else(|| {
1571 RedDBError::NotFound(format!("branch `{full}` does not exist"))
1572 })?;
1573 let c = load_commit(store, &r.target).ok_or_else(|| {
1574 RedDBError::NotFound(format!("branch `{full}` points to missing commit"))
1575 })?;
1576 Ok(c.root_xid)
1577 }
1578 AsOfSpec::Tag(name) => {
1579 let full = normalize_tag_name(&name);
1580 let r = load_ref(store, &full)
1581 .ok_or_else(|| RedDBError::NotFound(format!("tag `{full}` does not exist")))?;
1582 let c = load_commit(store, &r.target).ok_or_else(|| {
1583 RedDBError::NotFound(format!("tag `{full}` points to missing commit"))
1584 })?;
1585 Ok(c.root_xid)
1586 }
1587 AsOfSpec::TimestampMs(ts) => {
1588 let manager = store
1589 .get_collection(vc::COMMITS)
1590 .ok_or_else(|| RedDBError::NotFound("no commits exist yet".to_string()))?;
1591 let mut best: Option<(i64, Xid)> = None;
1593 let entities = manager.query_all(|_| true);
1594 for entity in entities {
1595 if let Some(row) = entity.data.as_row() {
1596 let t = row_i64(row, "timestamp_ms").unwrap_or(0);
1597 if t <= ts {
1598 let xid = row_u64(row, "root_xid").unwrap_or(0);
1599 match &best {
1600 Some((bt, _)) if *bt >= t => {}
1601 _ => best = Some((t, xid)),
1602 }
1603 }
1604 }
1605 }
1606 best.map(|(_, x)| x)
1607 .ok_or_else(|| RedDBError::NotFound(format!("no commit at or before ts={ts}")))
1608 }
1609 }
1610 }
1611
1612 pub fn vcs_set_versioned(&self, collection: &str, enabled: bool) -> RedDBResult<()> {
1613 let store_arc = self.inner.db.store();
1614 let store: &UnifiedStore = &store_arc;
1615 set_versioned_flag(store, collection, enabled)
1616 }
1617
1618 pub fn vcs_list_versioned(&self) -> RedDBResult<Vec<String>> {
1619 let store_arc = self.inner.db.store();
1620 let store: &UnifiedStore = &store_arc;
1621 let mut list = versioned_collections(store);
1622 list.sort();
1623 Ok(list)
1624 }
1625
1626 pub fn vcs_is_versioned(&self, collection: &str) -> RedDBResult<bool> {
1627 let store_arc = self.inner.db.store();
1628 let store: &UnifiedStore = &store_arc;
1629 Ok(is_versioned(store, collection))
1630 }
1631
1632 pub fn vcs_resolve_commitish(&self, spec: &str) -> RedDBResult<CommitHash> {
1633 let store_arc = self.inner.db.store();
1634 let store: &UnifiedStore = &store_arc;
1635 if spec.is_empty() {
1636 return Err(RedDBError::InvalidConfig("empty commitish".to_string()));
1637 }
1638
1639 if spec.len() == 64
1641 && spec.chars().all(|c| c.is_ascii_hexdigit())
1642 && load_commit(store, spec).is_some()
1643 {
1644 return Ok(spec.to_string());
1645 }
1646
1647 if spec.starts_with("refs/") {
1649 if let Some(hash) = resolve_ref_chain(store, spec) {
1650 return Ok(hash);
1651 }
1652 }
1653
1654 let normalized_branch = normalize_branch_name(spec);
1656 if let Some(hash) = resolve_ref_chain(store, &normalized_branch) {
1657 return Ok(hash);
1658 }
1659 let normalized_tag = normalize_tag_name(spec);
1660 if let Some(hash) = resolve_ref_chain(store, &normalized_tag) {
1661 return Ok(hash);
1662 }
1663
1664 if let Some(hash) = resolve_short_commit(store, spec) {
1666 return Ok(hash);
1667 }
1668
1669 Err(RedDBError::NotFound(format!(
1670 "commitish `{spec}` did not resolve to any ref or commit"
1671 )))
1672 }
1673}
1674
1675fn materialize_merge_conflicts(
1684 rt: &RedDBRuntime,
1685 store: &UnifiedStore,
1686 base_hash: &str,
1687 ours_hash: &str,
1688 theirs_hash: &str,
1689 merge_state_id: &str,
1690) -> RedDBResult<Vec<Conflict>> {
1691 use crate::application::merge_json::three_way_merge;
1692 use crate::application::vcs::AsOfSpec;
1693
1694 let base_xid = rt.vcs_resolve_as_of(AsOfSpec::Commit(base_hash.to_string()))?;
1695 let ours_xid = rt.vcs_resolve_as_of(AsOfSpec::Commit(ours_hash.to_string()))?;
1696 let theirs_xid = rt.vcs_resolve_as_of(AsOfSpec::Commit(theirs_hash.to_string()))?;
1697
1698 let sm = &rt.inner.snapshot_manager;
1699 let base_snap = sm.snapshot(base_xid);
1700 let ours_snap = sm.snapshot(ours_xid);
1701 let theirs_snap = sm.snapshot(theirs_xid);
1702
1703 let mut conflicts: Vec<Conflict> = Vec::new();
1709 for coll in store.list_collections() {
1710 if coll.starts_with("red_") {
1711 continue;
1712 }
1713 if !is_versioned(store, &coll) {
1714 continue;
1715 }
1716 let Some(manager) = store.get_collection(&coll) else {
1717 continue;
1718 };
1719 let mut at_base: HashMap<u64, JsonValue> = HashMap::new();
1720 let mut at_ours: HashMap<u64, JsonValue> = HashMap::new();
1721 let mut at_theirs: HashMap<u64, JsonValue> = HashMap::new();
1722 for entity in manager.query_all(|_| true) {
1723 let xmin = entity.xmin;
1724 let xmax = entity.xmax;
1725 if sm.is_aborted(xmin) {
1726 continue;
1727 }
1728 let eid = entity.id.raw();
1729 let body = crate::presentation::entity_json::compact_entity_json(&entity);
1730 if base_snap.sees(xmin, xmax) {
1731 at_base.insert(eid, body.clone());
1732 }
1733 if ours_snap.sees(xmin, xmax) {
1734 at_ours.insert(eid, body.clone());
1735 }
1736 if theirs_snap.sees(xmin, xmax) {
1737 at_theirs.insert(eid, body);
1738 }
1739 }
1740
1741 let mut all_ids: std::collections::BTreeSet<u64> = std::collections::BTreeSet::new();
1742 all_ids.extend(at_base.keys().copied());
1743 all_ids.extend(at_ours.keys().copied());
1744 all_ids.extend(at_theirs.keys().copied());
1745
1746 for eid in all_ids {
1747 let b = at_base.get(&eid).cloned().unwrap_or(JsonValue::Null);
1748 let o = at_ours.get(&eid).cloned().unwrap_or(JsonValue::Null);
1749 let t = at_theirs.get(&eid).cloned().unwrap_or(JsonValue::Null);
1750 let ours_changed = b != o;
1751 let theirs_changed = b != t;
1752 if !(ours_changed && theirs_changed) {
1753 continue;
1754 }
1755 if o == t {
1756 continue;
1757 }
1758 let merge = three_way_merge(&b, &o, &t);
1759 if merge.is_clean() {
1760 continue;
1764 }
1765 let conflict_id = format!("{}:{}/{}", merge_state_id, coll, eid);
1766 let paths: Vec<String> = merge
1767 .conflicts
1768 .iter()
1769 .map(|c| {
1770 if c.path.is_empty() {
1771 "*".to_string()
1772 } else {
1773 c.path.clone()
1774 }
1775 })
1776 .collect();
1777
1778 let mut fields: HashMap<String, Value> = HashMap::new();
1779 fields.insert("id".to_string(), Value::text(conflict_id.as_str()));
1780 fields.insert("collection".to_string(), Value::text(coll.as_str()));
1781 fields.insert(
1782 "entity_id".to_string(),
1783 Value::text(eid.to_string().as_str()),
1784 );
1785 fields.insert("merge_state_id".to_string(), Value::text(merge_state_id));
1786 fields.insert(
1787 "conflicting_paths".to_string(),
1788 Value::Array(paths.iter().map(|p| Value::text(p.as_str())).collect()),
1789 );
1790 if let Ok(bytes) = crate::json::to_vec(&b) {
1794 fields.insert("base_json".to_string(), Value::Json(bytes));
1795 }
1796 if let Ok(bytes) = crate::json::to_vec(&o) {
1797 fields.insert("ours_json".to_string(), Value::Json(bytes));
1798 }
1799 if let Ok(bytes) = crate::json::to_vec(&t) {
1800 fields.insert("theirs_json".to_string(), Value::Json(bytes));
1801 }
1802 insert_meta_row(store, vc::CONFLICTS, fields)?;
1803 conflicts.push(Conflict {
1804 id: conflict_id,
1805 collection: coll.clone(),
1806 entity_id: eid.to_string(),
1807 base: b,
1808 ours: o,
1809 theirs: t,
1810 conflicting_paths: paths,
1811 merge_state_id: merge_state_id.to_string(),
1812 });
1813 }
1814 }
1815 Ok(conflicts)
1816}
1817
1818fn coalesce_modifications(
1822 entries: Vec<DiffEntry>,
1823 added: &mut usize,
1824 removed: &mut usize,
1825 modified: &mut usize,
1826) -> Vec<DiffEntry> {
1827 let mut by_key: HashMap<(String, String), Vec<DiffEntry>> = HashMap::new();
1828 for e in entries {
1829 by_key
1830 .entry((e.collection.clone(), e.entity_id.clone()))
1831 .or_default()
1832 .push(e);
1833 }
1834 let mut out: Vec<DiffEntry> = Vec::new();
1835 for ((coll, eid), group) in by_key {
1836 if group.len() >= 2 {
1837 let mut before = JsonValue::Null;
1838 let mut after = JsonValue::Null;
1839 for item in group {
1840 match item.change {
1841 DiffChange::Removed { before: b } => before = b,
1842 DiffChange::Added { after: a } => after = a,
1843 DiffChange::Modified { .. } => {}
1844 }
1845 }
1846 *added = added.saturating_sub(1);
1847 *removed = removed.saturating_sub(1);
1848 *modified += 1;
1849 out.push(DiffEntry {
1850 collection: coll,
1851 entity_id: eid,
1852 change: DiffChange::Modified { before, after },
1853 });
1854 } else {
1855 out.extend(group);
1856 }
1857 }
1858 out.sort_by(|a, b| {
1859 a.collection
1860 .cmp(&b.collection)
1861 .then(a.entity_id.cmp(&b.entity_id))
1862 });
1863 out
1864}
1865
1866#[allow(dead_code)]
1868fn _touch_types(_: BTreeSet<()>) {}