Skip to main content

reddb_server/runtime/
impl_vcs.rs

1//! Runtime implementation of the VCS ("Git for Data") surface.
2//!
3//! Phase 2: real persistence for commit / branch / tag / refs / log /
4//! status / checkout / lca / resolve_commitish / resolve_as_of.
5//! Merge / cherry-pick / revert / reset / diff / conflict handling
6//! remain stubbed and land in Phase 3.
7//!
8//! Every VCS entity is stored as a plain TableRow in a `red_*`
9//! collection (same pattern as `red_queue_meta` / `red_stats`).
10//! Commits reuse the MVCC snapshot xid as their immutable root, and
11//! pin that xid so VACUUM cannot reclaim historical row versions.
12
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::sync::Arc;
15use std::time::{SystemTime, UNIX_EPOCH};
16
17use sha2::{Digest, Sha256};
18
19use crate::application::vcs::{
20    AsOfSpec, Author, CheckoutInput, CheckoutTarget, Commit, CommitHash, Conflict,
21    CreateBranchInput, CreateCommitInput, CreateTagInput, Diff, DiffChange, DiffEntry, DiffInput,
22    LogInput, LogRange, MergeInput, MergeOutcome, MergeStrategy, Ref, RefKind, RefName, ResetInput,
23    ResetMode, Status, StatusInput,
24};
25use crate::application::vcs_collections as vc;
26use crate::json::Value as JsonValue;
27use crate::runtime::RedDBRuntime;
28use crate::storage::schema::Value;
29use crate::storage::transaction::snapshot::{Xid, XID_NONE};
30use crate::storage::unified::entity::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
31use crate::storage::unified::UnifiedStore;
32use crate::{RedDBError, RedDBResult};
33
34// ---------------------------------------------------------------------------
35// Utilities
36// ---------------------------------------------------------------------------
37
38fn unimplemented(method: &str) -> RedDBError {
39    RedDBError::Internal(format!("vcs: {method} not yet implemented"))
40}
41
42fn now_ms() -> i64 {
43    SystemTime::now()
44        .duration_since(UNIX_EPOCH)
45        .map(|d| d.as_millis() as i64)
46        .unwrap_or(0)
47}
48
49fn row_text(row: &RowData, field: &str) -> Option<String> {
50    match row.get_field(field)?.clone() {
51        Value::Text(v) => Some(v.to_string()),
52        _ => None,
53    }
54}
55
56fn row_u64(row: &RowData, field: &str) -> Option<u64> {
57    match row.get_field(field)?.clone() {
58        Value::UnsignedInteger(v) => Some(v),
59        Value::Integer(v) if v >= 0 => Some(v as u64),
60        _ => None,
61    }
62}
63
64fn row_i64(row: &RowData, field: &str) -> Option<i64> {
65    match row.get_field(field)?.clone() {
66        Value::Integer(v) => Some(v),
67        Value::UnsignedInteger(v) => Some(v as i64),
68        Value::TimestampMs(v) | Value::Timestamp(v) => Some(v),
69        _ => None,
70    }
71}
72
73fn row_json(row: &RowData, field: &str) -> JsonValue {
74    match row.get_field(field) {
75        Some(Value::Json(bytes)) => {
76            crate::json::from_slice::<JsonValue>(bytes).unwrap_or(JsonValue::Null)
77        }
78        Some(Value::Text(s)) => crate::json::from_str::<JsonValue>(s)
79            .unwrap_or_else(|_| JsonValue::String(s.to_string())),
80        _ => JsonValue::Null,
81    }
82}
83
84fn row_string_list(row: &RowData, field: &str) -> Vec<String> {
85    match row.get_field(field) {
86        Some(Value::Array(items)) => items
87            .iter()
88            .filter_map(|v| match v {
89                Value::Text(s) => Some(s.to_string()),
90                _ => None,
91            })
92            .collect(),
93        _ => Vec::new(),
94    }
95}
96
97fn insert_meta_row(
98    store: &UnifiedStore,
99    collection: &str,
100    fields: HashMap<String, Value>,
101) -> RedDBResult<EntityId> {
102    let _ = store.get_or_create_collection(collection);
103    store
104        .insert_auto(
105            collection,
106            UnifiedEntity::new(
107                EntityId::new(0),
108                EntityKind::TableRow {
109                    table: Arc::from(collection),
110                    row_id: 0,
111                },
112                EntityData::Row(RowData {
113                    columns: Vec::new(),
114                    named: Some(fields),
115                    schema: None,
116                }),
117            ),
118        )
119        .map_err(|e| RedDBError::Internal(e.to_string()))
120}
121
122fn compute_commit_hash(
123    root_xid: Xid,
124    parents: &[CommitHash],
125    author: &Author,
126    message: &str,
127    timestamp_ms: i64,
128) -> CommitHash {
129    let mut h = Sha256::new();
130    h.update(b"reddb-commit-v1\n");
131    h.update(root_xid.to_be_bytes());
132    let mut sorted = parents.to_vec();
133    sorted.sort();
134    for p in &sorted {
135        h.update(b"\np=");
136        h.update(p.as_bytes());
137    }
138    h.update(b"\na=");
139    h.update(author.name.as_bytes());
140    h.update(b"\n");
141    h.update(author.email.as_bytes());
142    h.update(b"\nm=");
143    h.update(message.as_bytes());
144    h.update(b"\nt=");
145    h.update(timestamp_ms.to_be_bytes());
146    let digest = h.finalize();
147    hex::encode(digest)
148}
149
150fn normalize_branch_name(raw: &str) -> String {
151    if raw.starts_with(vc::BRANCH_REF_PREFIX) {
152        raw.to_string()
153    } else {
154        format!("{}{}", vc::BRANCH_REF_PREFIX, raw)
155    }
156}
157
158fn normalize_tag_name(raw: &str) -> String {
159    if raw.starts_with(vc::TAG_REF_PREFIX) {
160        raw.to_string()
161    } else {
162        format!("{}{}", vc::TAG_REF_PREFIX, raw)
163    }
164}
165
166fn head_ref_id(connection_id: u64) -> String {
167    format!("{}{}", vc::HEAD_ID_PREFIX, connection_id)
168}
169
170// ---------------------------------------------------------------------------
171// Commit load / save
172// ---------------------------------------------------------------------------
173
174fn load_commit_entity(store: &UnifiedStore, hash: &str) -> Option<UnifiedEntity> {
175    let manager = store.get_collection(vc::COMMITS)?;
176    manager
177        .query_all(|entity| {
178            entity
179                .data
180                .as_row()
181                .is_some_and(|row| row_text(row, "id").as_deref() == Some(hash))
182        })
183        .into_iter()
184        .next()
185}
186
187fn commit_from_row(row: &RowData) -> Option<Commit> {
188    Some(Commit {
189        hash: row_text(row, "id")?,
190        root_xid: row_u64(row, "root_xid")?,
191        parents: row_string_list(row, "parents"),
192        height: row_u64(row, "height").unwrap_or(0),
193        author: Author {
194            name: row_text(row, "author_name").unwrap_or_default(),
195            email: row_text(row, "author_email").unwrap_or_default(),
196        },
197        committer: Author {
198            name: row_text(row, "committer_name").unwrap_or_default(),
199            email: row_text(row, "committer_email").unwrap_or_default(),
200        },
201        message: row_text(row, "message").unwrap_or_default(),
202        timestamp_ms: row_i64(row, "timestamp_ms").unwrap_or(0),
203        signature: row_text(row, "signature"),
204    })
205}
206
207fn load_commit(store: &UnifiedStore, hash: &str) -> Option<Commit> {
208    let entity = load_commit_entity(store, hash)?;
209    let row = entity.data.as_row()?;
210    commit_from_row(row)
211}
212
213fn save_commit(store: &UnifiedStore, commit: &Commit) -> RedDBResult<()> {
214    let mut fields: HashMap<String, Value> = HashMap::new();
215    fields.insert("id".to_string(), Value::text(commit.hash.as_str()));
216    fields.insert(
217        "root_xid".to_string(),
218        Value::UnsignedInteger(commit.root_xid),
219    );
220    fields.insert(
221        "parents".to_string(),
222        Value::Array(
223            commit
224                .parents
225                .iter()
226                .map(|p| Value::text(p.as_str()))
227                .collect(),
228        ),
229    );
230    fields.insert("height".to_string(), Value::UnsignedInteger(commit.height));
231    fields.insert(
232        "author_name".to_string(),
233        Value::text(commit.author.name.as_str()),
234    );
235    fields.insert(
236        "author_email".to_string(),
237        Value::text(commit.author.email.as_str()),
238    );
239    fields.insert(
240        "committer_name".to_string(),
241        Value::text(commit.committer.name.as_str()),
242    );
243    fields.insert(
244        "committer_email".to_string(),
245        Value::text(commit.committer.email.as_str()),
246    );
247    fields.insert("message".to_string(), Value::text(commit.message.as_str()));
248    fields.insert(
249        "timestamp_ms".to_string(),
250        Value::TimestampMs(commit.timestamp_ms),
251    );
252    if let Some(sig) = &commit.signature {
253        fields.insert("signature".to_string(), Value::text(sig.as_str()));
254    }
255    insert_meta_row(store, vc::COMMITS, fields)?;
256    Ok(())
257}
258
259// ---------------------------------------------------------------------------
260// Ref load / save / delete
261// ---------------------------------------------------------------------------
262
263fn ref_kind_from_str(s: &str) -> RefKind {
264    match s {
265        "tag" => RefKind::Tag,
266        "head" => RefKind::Head,
267        _ => RefKind::Branch,
268    }
269}
270
271fn ref_from_row(row: &RowData) -> Option<Ref> {
272    Some(Ref {
273        name: row_text(row, "id")?,
274        kind: ref_kind_from_str(&row_text(row, "type").unwrap_or_default()),
275        target: row_text(row, "target").unwrap_or_default(),
276        protected: row
277            .get_field("protected")
278            .and_then(|v| match v {
279                Value::Boolean(b) => Some(*b),
280                _ => None,
281            })
282            .unwrap_or(false),
283    })
284}
285
286fn load_ref_entity(store: &UnifiedStore, name: &str) -> Option<(EntityId, UnifiedEntity)> {
287    let manager = store.get_collection(vc::REFS)?;
288    manager
289        .query_all(|entity| {
290            entity
291                .data
292                .as_row()
293                .is_some_and(|row| row_text(row, "id").as_deref() == Some(name))
294        })
295        .into_iter()
296        .next()
297        .map(|entity| (entity.id, entity))
298}
299
300fn load_ref(store: &UnifiedStore, name: &str) -> Option<Ref> {
301    let (_, entity) = load_ref_entity(store, name)?;
302    ref_from_row(entity.data.as_row()?)
303}
304
305fn save_ref(store: &UnifiedStore, r: &Ref) -> RedDBResult<()> {
306    // Delete-then-insert gives us upsert semantics over the TableRow
307    // primary-key `_id` used by every red_* collection.
308    if let Some((id, _)) = load_ref_entity(store, &r.name) {
309        let _ = store.delete(vc::REFS, id);
310    }
311    let mut fields: HashMap<String, Value> = HashMap::new();
312    fields.insert("id".to_string(), Value::text(r.name.as_str()));
313    let kind_str = match r.kind {
314        RefKind::Branch => "branch",
315        RefKind::Tag => "tag",
316        RefKind::Head => "head",
317    };
318    fields.insert("type".to_string(), Value::text(kind_str));
319    fields.insert("target".to_string(), Value::text(r.target.as_str()));
320    fields.insert("protected".to_string(), Value::Boolean(r.protected));
321    insert_meta_row(store, vc::REFS, fields)?;
322    Ok(())
323}
324
325fn delete_ref(store: &UnifiedStore, name: &str) -> RedDBResult<bool> {
326    let Some((id, _)) = load_ref_entity(store, name) else {
327        return Ok(false);
328    };
329    store
330        .delete(vc::REFS, id)
331        .map_err(|e| RedDBError::Internal(e.to_string()))?;
332    Ok(true)
333}
334
335// ---------------------------------------------------------------------------
336// Opt-in per-collection versioning (Phase 7 — disk-cost isolation)
337// ---------------------------------------------------------------------------
338
339/// Is this user collection opted in to Git-for-Data?
340///
341/// Default is `false`: a fresh collection stays outside the VCS so
342/// transactional churn (sessions, caches, queues) doesn't pin
343/// extra row versions. Internal `red_*` collections are never
344/// versioned — they store VCS metadata itself.
345fn is_versioned(store: &UnifiedStore, name: &str) -> bool {
346    if name.starts_with("red_") {
347        return false;
348    }
349    let Some(manager) = store.get_collection(vc::SETTINGS) else {
350        return false;
351    };
352    let target = name.to_string();
353    manager
354        .query_all(|entity| {
355            entity
356                .data
357                .as_row()
358                .is_some_and(|row| row_text(row, "id").as_deref() == Some(&target))
359        })
360        .into_iter()
361        .any(|entity| {
362            entity
363                .data
364                .as_row()
365                .and_then(|row| row.get_field("versioned"))
366                .map(|v| matches!(v, Value::Boolean(true)))
367                .unwrap_or(false)
368        })
369}
370
371/// Enumerate every user collection currently opted in. Order
372/// undefined — callers that need a deterministic iteration should
373/// sort the returned list.
374fn versioned_collections(store: &UnifiedStore) -> Vec<String> {
375    let Some(manager) = store.get_collection(vc::SETTINGS) else {
376        return Vec::new();
377    };
378    manager
379        .query_all(|entity| {
380            entity
381                .data
382                .as_row()
383                .and_then(|row| row.get_field("versioned"))
384                .map(|v| matches!(v, Value::Boolean(true)))
385                .unwrap_or(false)
386        })
387        .into_iter()
388        .filter_map(|entity| row_text(entity.data.as_row()?, "id"))
389        .collect()
390}
391
392/// Upsert / delete the `red_vcs_settings` row for `name`. `true`
393/// opts the collection into VCS; `false` opts it out (subsequent
394/// merges / diffs / AS OF queries stop seeing it). Does NOT touch
395/// existing row versions — you control whether history is retained
396/// by deciding *when* to opt out.
397fn set_versioned_flag(store: &UnifiedStore, name: &str, enabled: bool) -> RedDBResult<()> {
398    if name.starts_with("red_") {
399        return Err(RedDBError::InvalidConfig(format!(
400            "cannot version internal collection `{name}`"
401        )));
402    }
403    let target = name.to_string();
404    if let Some(manager) = store.get_collection(vc::SETTINGS) {
405        let rows = manager.query_all(|entity| {
406            entity
407                .data
408                .as_row()
409                .is_some_and(|row| row_text(row, "id").as_deref() == Some(&target))
410        });
411        for row in rows {
412            let _ = store.delete(vc::SETTINGS, row.id);
413        }
414    }
415    if !enabled {
416        return Ok(());
417    }
418    let mut fields: HashMap<String, Value> = HashMap::new();
419    fields.insert("id".to_string(), Value::text(name));
420    fields.insert("versioned".to_string(), Value::Boolean(true));
421    fields.insert("ts_ms".to_string(), Value::TimestampMs(now_ms()));
422    insert_meta_row(store, vc::SETTINGS, fields)?;
423    Ok(())
424}
425
426fn list_refs_by_prefix(store: &UnifiedStore, prefix: Option<&str>) -> Vec<Ref> {
427    let Some(manager) = store.get_collection(vc::REFS) else {
428        return Vec::new();
429    };
430    let prefix_owned = prefix.map(|s| s.to_string());
431    manager
432        .query_all(|entity| {
433            entity.data.as_row().is_some_and(|row| {
434                let id = row_text(row, "id").unwrap_or_default();
435                match &prefix_owned {
436                    Some(p) => id.starts_with(p),
437                    None => true,
438                }
439            })
440        })
441        .into_iter()
442        .filter_map(|entity| ref_from_row(entity.data.as_row()?))
443        .collect()
444}
445
446// ---------------------------------------------------------------------------
447// Ancestry helpers
448// ---------------------------------------------------------------------------
449
450fn ancestor_set(store: &UnifiedStore, start: &str, max_steps: usize) -> HashSet<CommitHash> {
451    let mut visited: HashSet<CommitHash> = HashSet::new();
452    let mut stack: Vec<CommitHash> = vec![start.to_string()];
453    let mut steps = 0usize;
454    while let Some(hash) = stack.pop() {
455        if !visited.insert(hash.clone()) {
456            continue;
457        }
458        if let Some(c) = load_commit(store, &hash) {
459            for p in c.parents {
460                if !visited.contains(&p) {
461                    stack.push(p);
462                }
463            }
464        }
465        steps += 1;
466        if steps >= max_steps {
467            break;
468        }
469    }
470    visited
471}
472
473fn topo_walk(store: &UnifiedStore, start: &str, range: &LogRange) -> Vec<Commit> {
474    let limit = range.limit.unwrap_or(usize::MAX);
475    let skip = range.skip.unwrap_or(0);
476    let exclude = range
477        .from
478        .as_ref()
479        .map(|h| ancestor_set(store, h, 100_000))
480        .unwrap_or_default();
481
482    let mut visited: HashSet<CommitHash> = HashSet::new();
483    let mut stack: Vec<CommitHash> = vec![start.to_string()];
484    let mut out: Vec<Commit> = Vec::new();
485    let mut skipped = 0usize;
486    while let Some(hash) = stack.pop() {
487        if !visited.insert(hash.clone()) {
488            continue;
489        }
490        if exclude.contains(&hash) {
491            continue;
492        }
493        let Some(commit) = load_commit(store, &hash) else {
494            continue;
495        };
496        if range.no_merges && commit.parents.len() > 1 {
497            for p in &commit.parents {
498                if !visited.contains(p) {
499                    stack.push(p.clone());
500                }
501            }
502            continue;
503        }
504        for p in &commit.parents {
505            if !visited.contains(p) {
506                stack.push(p.clone());
507            }
508        }
509        if skipped < skip {
510            skipped += 1;
511            continue;
512        }
513        if out.len() >= limit {
514            break;
515        }
516        out.push(commit);
517    }
518    // Highest height first (newest commits on top).
519    out.sort_by(|a, b| {
520        b.height
521            .cmp(&a.height)
522            .then(b.timestamp_ms.cmp(&a.timestamp_ms))
523    });
524    out
525}
526
527// ---------------------------------------------------------------------------
528// Commitish resolution
529// ---------------------------------------------------------------------------
530
531fn resolve_ref_chain(store: &UnifiedStore, name: &str) -> Option<CommitHash> {
532    // Follow up to 4 levels of ref indirection.
533    let mut current = name.to_string();
534    for _ in 0..4 {
535        let r = load_ref(store, &current)?;
536        match r.kind {
537            RefKind::Branch | RefKind::Tag => return Some(r.target),
538            RefKind::Head => {
539                current = r.target;
540                continue;
541            }
542        }
543    }
544    None
545}
546
547fn resolve_short_commit(store: &UnifiedStore, prefix: &str) -> Option<CommitHash> {
548    if prefix.len() < 4 {
549        return None;
550    }
551    let manager = store.get_collection(vc::COMMITS)?;
552    let matches: Vec<String> = manager
553        .query_all(|entity| {
554            entity.data.as_row().is_some_and(|row| {
555                row_text(row, "id")
556                    .map(|id| id.starts_with(prefix))
557                    .unwrap_or(false)
558            })
559        })
560        .into_iter()
561        .filter_map(|entity| row_text(entity.data.as_row()?, "id"))
562        .collect();
563    if matches.len() == 1 {
564        matches.into_iter().next()
565    } else {
566        None
567    }
568}
569
570// ---------------------------------------------------------------------------
571// Workset helpers (lightweight — full WIP tracking lands in Phase 3)
572// ---------------------------------------------------------------------------
573
574fn upsert_workset(
575    store: &UnifiedStore,
576    connection_id: u64,
577    branch: &str,
578    base_commit: Option<&str>,
579    working_xid: Xid,
580) -> RedDBResult<()> {
581    let conn_id = connection_id.to_string();
582    // Delete old workset for this connection
583    if let Some(manager) = store.get_collection(vc::WORKSETS) {
584        let rows = manager.query_all(|entity| {
585            entity
586                .data
587                .as_row()
588                .is_some_and(|row| row_text(row, "id").as_deref() == Some(&conn_id))
589        });
590        for row in rows {
591            let _ = store.delete(vc::WORKSETS, row.id);
592        }
593    }
594    let mut fields: HashMap<String, Value> = HashMap::new();
595    fields.insert("id".to_string(), Value::text(conn_id.as_str()));
596    fields.insert("branch".to_string(), Value::text(branch));
597    if let Some(base) = base_commit {
598        fields.insert("base_commit".to_string(), Value::text(base));
599    }
600    fields.insert(
601        "working_xid".to_string(),
602        Value::UnsignedInteger(working_xid),
603    );
604    insert_meta_row(store, vc::WORKSETS, fields)?;
605    Ok(())
606}
607
608fn load_workset(store: &UnifiedStore, connection_id: u64) -> Option<(RefName, Option<CommitHash>)> {
609    let manager = store.get_collection(vc::WORKSETS)?;
610    let conn_id = connection_id.to_string();
611    manager
612        .query_all(|entity| {
613            entity
614                .data
615                .as_row()
616                .is_some_and(|row| row_text(row, "id").as_deref() == Some(&conn_id))
617        })
618        .into_iter()
619        .find_map(|entity| {
620            let row = entity.data.as_row()?;
621            Some((
622                row_text(row, "branch").unwrap_or_else(|| vc::DEFAULT_BRANCH_REF.to_string()),
623                row_text(row, "base_commit"),
624            ))
625        })
626}
627
628// ---------------------------------------------------------------------------
629// Runtime impl
630// ---------------------------------------------------------------------------
631
632impl RedDBRuntime {
633    pub fn vcs_commit(&self, input: CreateCommitInput) -> RedDBResult<Commit> {
634        let store_arc = self.inner.db.store();
635        let store: &UnifiedStore = &store_arc;
636
637        // Resolve current HEAD for this connection: workset -> HEAD:<conn>
638        // -> default branch.
639        let workset = load_workset(store, input.connection_id);
640        let branch_ref = workset
641            .as_ref()
642            .map(|(b, _)| b.clone())
643            .or_else(|| resolve_ref_chain(store, &head_ref_id(input.connection_id)).and(None))
644            .unwrap_or_else(|| vc::DEFAULT_BRANCH_REF.to_string());
645
646        let parent_hash = workset
647            .as_ref()
648            .and_then(|(_, base)| base.clone())
649            .or_else(|| load_ref(store, &branch_ref).map(|r| r.target));
650
651        let parents: Vec<CommitHash> = match parent_hash.clone() {
652            Some(h) if !h.is_empty() => vec![h],
653            _ => Vec::new(),
654        };
655
656        let parent_height = parents
657            .iter()
658            .filter_map(|p| load_commit(store, p).map(|c| c.height))
659            .max()
660            .unwrap_or(0);
661        let height = if parents.is_empty() {
662            0
663        } else {
664            parent_height + 1
665        };
666
667        // Allocate a fresh xid for this commit and immediately mark
668        // it committed so all future snapshots see the commit record.
669        // Each commit therefore has a unique, monotonic root_xid — a
670        // prerequisite for `AS OF BRANCH` to map to distinct
671        // snapshots across divergent branches.
672        let root_xid = self.inner.snapshot_manager.begin();
673        self.inner.snapshot_manager.commit(root_xid);
674        let timestamp_ms = now_ms();
675        let committer = input.committer.unwrap_or_else(|| input.author.clone());
676
677        let hash = compute_commit_hash(
678            root_xid,
679            &parents,
680            &input.author,
681            &input.message,
682            timestamp_ms,
683        );
684
685        if load_commit(store, &hash).is_some() {
686            return Err(RedDBError::InvalidConfig(format!(
687                "commit {hash} already exists (duplicate timestamp+content)"
688            )));
689        }
690
691        let commit = Commit {
692            hash: hash.clone(),
693            root_xid,
694            parents,
695            height,
696            author: input.author,
697            committer,
698            message: input.message,
699            timestamp_ms,
700            signature: None,
701        };
702
703        save_commit(store, &commit)?;
704
705        // Pin the root xid so VACUUM cannot reclaim history while the
706        // commit is reachable.
707        if root_xid != XID_NONE {
708            self.inner.snapshot_manager.pin(root_xid);
709        }
710
711        // Advance (or create) the branch ref.
712        let branch_name = if branch_ref.is_empty() {
713            vc::DEFAULT_BRANCH_REF.to_string()
714        } else {
715            branch_ref
716        };
717        save_ref(
718            store,
719            &Ref {
720                name: branch_name.clone(),
721                kind: RefKind::Branch,
722                target: hash.clone(),
723                protected: false,
724            },
725        )?;
726
727        upsert_workset(
728            store,
729            input.connection_id,
730            &branch_name,
731            Some(&hash),
732            root_xid,
733        )?;
734
735        Ok(commit)
736    }
737
738    pub fn vcs_branch_create(&self, input: CreateBranchInput) -> RedDBResult<Ref> {
739        let store_arc = self.inner.db.store();
740        let store: &UnifiedStore = &store_arc;
741        let full_name = normalize_branch_name(&input.name);
742        if load_ref(store, &full_name).is_some() {
743            return Err(RedDBError::InvalidConfig(format!(
744                "branch `{full_name}` already exists"
745            )));
746        }
747        let target_hash = match &input.from {
748            Some(spec) => RedDBRuntime::vcs_resolve_commitish(self, spec)?,
749            None => {
750                // Fall back to the connection's current HEAD branch, then default.
751                let workset = load_workset(store, input.connection_id);
752
753                workset
754                    .and_then(|(_, base)| base)
755                    .or_else(|| load_ref(store, vc::DEFAULT_BRANCH_REF).map(|r| r.target))
756                    .unwrap_or_default()
757            }
758        };
759        let r = Ref {
760            name: full_name,
761            kind: RefKind::Branch,
762            target: target_hash,
763            protected: false,
764        };
765        save_ref(store, &r)?;
766        Ok(r)
767    }
768
769    pub fn vcs_branch_delete(&self, name: &str) -> RedDBResult<()> {
770        let store_arc = self.inner.db.store();
771        let store: &UnifiedStore = &store_arc;
772        let full = normalize_branch_name(name);
773        let existing = load_ref(store, &full)
774            .ok_or_else(|| RedDBError::NotFound(format!("branch `{full}` does not exist")))?;
775        if existing.protected {
776            return Err(RedDBError::ReadOnly(format!(
777                "branch `{full}` is protected"
778            )));
779        }
780        delete_ref(store, &full)?;
781        Ok(())
782    }
783
784    pub fn vcs_tag_create(&self, input: CreateTagInput) -> RedDBResult<Ref> {
785        let store_arc = self.inner.db.store();
786        let store: &UnifiedStore = &store_arc;
787        let full_name = normalize_tag_name(&input.name);
788        if load_ref(store, &full_name).is_some() {
789            return Err(RedDBError::InvalidConfig(format!(
790                "tag `{full_name}` already exists"
791            )));
792        }
793        let target_hash = RedDBRuntime::vcs_resolve_commitish(self, &input.target)?;
794        let r = Ref {
795            name: full_name,
796            kind: RefKind::Tag,
797            target: target_hash,
798            protected: false,
799        };
800        save_ref(store, &r)?;
801        Ok(r)
802    }
803
804    pub fn vcs_list_refs(&self, prefix: Option<&str>) -> RedDBResult<Vec<Ref>> {
805        let store_arc = self.inner.db.store();
806        let store: &UnifiedStore = &store_arc;
807        Ok(list_refs_by_prefix(store, prefix))
808    }
809
810    pub fn vcs_checkout(&self, input: CheckoutInput) -> RedDBResult<Ref> {
811        let store_arc = self.inner.db.store();
812        let store: &UnifiedStore = &store_arc;
813        let (branch_ref, target_hash) = match &input.target {
814            CheckoutTarget::Branch(name) => {
815                let full = normalize_branch_name(name);
816                let r = load_ref(store, &full).ok_or_else(|| {
817                    RedDBError::NotFound(format!("branch `{full}` does not exist"))
818                })?;
819                (full, r.target)
820            }
821            CheckoutTarget::Tag(name) => {
822                let full = normalize_tag_name(name);
823                let r = load_ref(store, &full)
824                    .ok_or_else(|| RedDBError::NotFound(format!("tag `{full}` does not exist")))?;
825                (full, r.target)
826            }
827            CheckoutTarget::Commit(hash) => {
828                let resolved = RedDBRuntime::vcs_resolve_commitish(self, hash)?;
829                // Detached HEAD — we point HEAD directly at a commit via
830                // the workset base; no branch ref is updated.
831                (String::new(), resolved)
832            }
833        };
834
835        upsert_workset(
836            store,
837            input.connection_id,
838            &branch_ref,
839            Some(&target_hash),
840            self.inner.snapshot_manager.peek_next_xid(),
841        )?;
842
843        // Update per-connection HEAD pointer for status/introspection.
844        save_ref(
845            store,
846            &Ref {
847                name: head_ref_id(input.connection_id),
848                kind: RefKind::Head,
849                target: if branch_ref.is_empty() {
850                    target_hash.clone()
851                } else {
852                    branch_ref.clone()
853                },
854                protected: false,
855            },
856        )?;
857
858        Ok(Ref {
859            name: if branch_ref.is_empty() {
860                format!("detached:{target_hash}")
861            } else {
862                branch_ref
863            },
864            kind: if target_hash.is_empty() {
865                RefKind::Head
866            } else {
867                RefKind::Branch
868            },
869            target: target_hash,
870            protected: false,
871        })
872    }
873
874    pub fn vcs_merge(&self, input: MergeInput) -> RedDBResult<MergeOutcome> {
875        let store_arc = self.inner.db.store();
876        let store: &UnifiedStore = &store_arc;
877
878        // Resolve source commit (the one being merged in).
879        let from_hash = RedDBRuntime::vcs_resolve_commitish(self, &input.from)?;
880        let from_commit = load_commit(store, &from_hash).ok_or_else(|| {
881            RedDBError::NotFound(format!("source commit `{from_hash}` not found"))
882        })?;
883
884        // Resolve current HEAD for the connection.
885        let workset = load_workset(store, input.connection_id);
886        let (head_branch, head_hash) = match workset {
887            Some((branch, Some(head))) => (branch, head),
888            Some((branch, None)) => {
889                let head = load_ref(store, &branch)
890                    .map(|r| r.target)
891                    .unwrap_or_default();
892                (branch, head)
893            }
894            None => {
895                let head = load_ref(store, vc::DEFAULT_BRANCH_REF)
896                    .map(|r| r.target)
897                    .unwrap_or_default();
898                (vc::DEFAULT_BRANCH_REF.to_string(), head)
899            }
900        };
901        if head_hash.is_empty() {
902            return Err(RedDBError::InvalidConfig(
903                "cannot merge: HEAD has no commits".to_string(),
904            ));
905        }
906
907        // Fast-forward check: is HEAD an ancestor of `from`?
908        let from_ancestors = ancestor_set(store, &from_hash, 100_000);
909        let can_fast_forward = from_ancestors.contains(&head_hash);
910
911        match input.opts.strategy {
912            MergeStrategy::FastForwardOnly if !can_fast_forward => {
913                return Err(RedDBError::InvalidConfig(
914                    "not a fast-forward — use --strategy auto or no-ff".to_string(),
915                ));
916            }
917            _ => {}
918        }
919
920        if can_fast_forward && input.opts.strategy != MergeStrategy::NoFastForward {
921            if head_branch.is_empty() {
922                return Err(RedDBError::InvalidConfig(
923                    "cannot fast-forward a detached HEAD".to_string(),
924                ));
925            }
926            save_ref(
927                store,
928                &Ref {
929                    name: head_branch.clone(),
930                    kind: RefKind::Branch,
931                    target: from_hash.clone(),
932                    protected: false,
933                },
934            )?;
935            upsert_workset(
936                store,
937                input.connection_id,
938                &head_branch,
939                Some(&from_hash),
940                from_commit.root_xid,
941            )?;
942            return Ok(MergeOutcome {
943                merge_commit: Some(from_commit),
944                fast_forward: true,
945                conflicts: Vec::new(),
946                merge_state_id: None,
947            });
948        }
949
950        // Non-fast-forward: compute LCA, create a merge commit.
951        // Data-level 3-way merge (with conflict materialisation into
952        // red_conflicts) is deferred to Phase 4 — for now we produce
953        // the merge commit only when both sides have no content
954        // overlap to resolve, i.e. when LCA == head_hash (reverse FF)
955        // we already handled above. Otherwise surface a merge_state
956        // placeholder so callers know data reconciliation is required.
957        let lca = RedDBRuntime::vcs_lca(self, &head_hash, &from_hash)?;
958
959        let message = input
960            .opts
961            .message
962            .unwrap_or_else(|| format!("Merge {} into {}", input.from, head_branch));
963        let author = input.author.clone();
964        let timestamp_ms = now_ms();
965        let parents = vec![head_hash.clone(), from_hash.clone()];
966        let parent_height = parents
967            .iter()
968            .filter_map(|p| load_commit(store, p).map(|c| c.height))
969            .max()
970            .unwrap_or(0);
971        let height = parent_height + 1;
972        let root_xid = self.inner.snapshot_manager.begin();
973        self.inner.snapshot_manager.commit(root_xid);
974
975        let hash = compute_commit_hash(root_xid, &parents, &author, &message, timestamp_ms);
976
977        let merge_commit = Commit {
978            hash: hash.clone(),
979            root_xid,
980            parents,
981            height,
982            author: author.clone(),
983            committer: author,
984            message,
985            timestamp_ms,
986            signature: None,
987        };
988
989        // Create a merge_state row so Phase 4 can pick up where we
990        // stopped and actually reconcile data. We still open the
991        // commit / ref so log() shows the history; worksets gets the
992        // merge_state_id so status() surfaces the in-progress state.
993        save_commit(store, &merge_commit)?;
994        if root_xid != XID_NONE {
995            self.inner.snapshot_manager.pin(root_xid);
996        }
997        if !head_branch.is_empty() {
998            save_ref(
999                store,
1000                &Ref {
1001                    name: head_branch.clone(),
1002                    kind: RefKind::Branch,
1003                    target: hash.clone(),
1004                    protected: false,
1005                },
1006            )?;
1007        }
1008        upsert_workset(
1009            store,
1010            input.connection_id,
1011            &head_branch,
1012            Some(&hash),
1013            root_xid,
1014        )?;
1015
1016        let merge_state_id = format!("ms:{}", &hash[..16]);
1017
1018        // Materialise conflicts: any entity whose visibility changed
1019        // in BOTH sides relative to the LCA snapshot is a candidate
1020        // conflict. Phase 5 records identifiers + metadata; Phase 6
1021        // will stage the merged body and apply it to the user data.
1022        let conflicts = if let Some(lca_hash) = &lca {
1023            materialize_merge_conflicts(
1024                self,
1025                store,
1026                lca_hash,
1027                &head_hash,
1028                &from_hash,
1029                &merge_state_id,
1030            )?
1031        } else {
1032            Vec::new()
1033        };
1034
1035        let mut ms_fields: HashMap<String, Value> = HashMap::new();
1036        ms_fields.insert("id".to_string(), Value::text(merge_state_id.as_str()));
1037        ms_fields.insert("kind".to_string(), Value::text("merge"));
1038        ms_fields.insert("branch".to_string(), Value::text(head_branch.as_str()));
1039        if let Some(base_hash) = &lca {
1040            ms_fields.insert("base".to_string(), Value::text(base_hash.as_str()));
1041        }
1042        ms_fields.insert("ours".to_string(), Value::text(head_hash.as_str()));
1043        ms_fields.insert("theirs".to_string(), Value::text(from_hash.as_str()));
1044        ms_fields.insert(
1045            "conflicts_count".to_string(),
1046            Value::UnsignedInteger(conflicts.len() as u64),
1047        );
1048        insert_meta_row(store, vc::MERGE_STATE, ms_fields)?;
1049
1050        Ok(MergeOutcome {
1051            merge_commit: Some(merge_commit),
1052            fast_forward: false,
1053            conflicts,
1054            merge_state_id: Some(merge_state_id),
1055        })
1056    }
1057
1058    pub fn vcs_cherry_pick(
1059        &self,
1060        connection_id: u64,
1061        commit: &str,
1062        author: Author,
1063    ) -> RedDBResult<MergeOutcome> {
1064        let store_arc = self.inner.db.store();
1065        let store: &UnifiedStore = &store_arc;
1066
1067        // Resolve the commit being cherry-picked and its parent.
1068        let src_hash = RedDBRuntime::vcs_resolve_commitish(self, commit)?;
1069        let src_commit = load_commit(store, &src_hash).ok_or_else(|| {
1070            RedDBError::NotFound(format!("cherry-pick source `{src_hash}` not found"))
1071        })?;
1072        if src_commit.parents.is_empty() {
1073            return Err(RedDBError::InvalidConfig(
1074                "cannot cherry-pick a root commit".to_string(),
1075            ));
1076        }
1077        if src_commit.parents.len() > 1 {
1078            return Err(RedDBError::InvalidConfig(
1079                "cannot cherry-pick a merge commit; resolve manually".to_string(),
1080            ));
1081        }
1082        let parent_hash = src_commit.parents[0].clone();
1083
1084        // Resolve HEAD for this connection.
1085        let workset = load_workset(store, connection_id);
1086        let (head_branch, head_hash) = match workset {
1087            Some((branch, Some(head))) => (branch, head),
1088            Some((branch, None)) => {
1089                let head = load_ref(store, &branch)
1090                    .map(|r| r.target)
1091                    .unwrap_or_default();
1092                (branch, head)
1093            }
1094            None => {
1095                let head = load_ref(store, vc::DEFAULT_BRANCH_REF)
1096                    .map(|r| r.target)
1097                    .unwrap_or_default();
1098                (vc::DEFAULT_BRANCH_REF.to_string(), head)
1099            }
1100        };
1101        if head_hash.is_empty() {
1102            return Err(RedDBError::InvalidConfig(
1103                "cannot cherry-pick onto empty HEAD".to_string(),
1104            ));
1105        }
1106
1107        // Cherry-pick = 3-way merge with base = parent(src), ours = HEAD,
1108        // theirs = src. The new commit records the pick so log() sees
1109        // it; data application is staged in merge_state.pending so
1110        // Phase 6 can apply it transactionally.
1111        let message = format!("cherry-pick: {}", src_commit.message);
1112        let parents = vec![head_hash.clone()];
1113        let parent_height = load_commit(store, &head_hash)
1114            .map(|c| c.height)
1115            .unwrap_or(0);
1116        let height = parent_height + 1;
1117        let root_xid = self.inner.snapshot_manager.begin();
1118        self.inner.snapshot_manager.commit(root_xid);
1119        let timestamp_ms = now_ms();
1120
1121        let hash = compute_commit_hash(root_xid, &parents, &author, &message, timestamp_ms);
1122        let pick_commit = Commit {
1123            hash: hash.clone(),
1124            root_xid,
1125            parents,
1126            height,
1127            author: author.clone(),
1128            committer: author,
1129            message,
1130            timestamp_ms,
1131            signature: None,
1132        };
1133        save_commit(store, &pick_commit)?;
1134        if root_xid != XID_NONE {
1135            self.inner.snapshot_manager.pin(root_xid);
1136        }
1137        if !head_branch.is_empty() {
1138            save_ref(
1139                store,
1140                &Ref {
1141                    name: head_branch.clone(),
1142                    kind: RefKind::Branch,
1143                    target: hash.clone(),
1144                    protected: false,
1145                },
1146            )?;
1147        }
1148        upsert_workset(store, connection_id, &head_branch, Some(&hash), root_xid)?;
1149
1150        let merge_state_id = format!("cp:{}", &hash[..16]);
1151        let conflicts = materialize_merge_conflicts(
1152            self,
1153            store,
1154            &parent_hash,
1155            &head_hash,
1156            &src_hash,
1157            &merge_state_id,
1158        )?;
1159
1160        let mut ms_fields: HashMap<String, Value> = HashMap::new();
1161        ms_fields.insert("id".to_string(), Value::text(merge_state_id.as_str()));
1162        ms_fields.insert("kind".to_string(), Value::text("cherry_pick"));
1163        ms_fields.insert("branch".to_string(), Value::text(head_branch.as_str()));
1164        ms_fields.insert("base".to_string(), Value::text(parent_hash.as_str()));
1165        ms_fields.insert("ours".to_string(), Value::text(head_hash.as_str()));
1166        ms_fields.insert("theirs".to_string(), Value::text(src_hash.as_str()));
1167        ms_fields.insert(
1168            "conflicts_count".to_string(),
1169            Value::UnsignedInteger(conflicts.len() as u64),
1170        );
1171        insert_meta_row(store, vc::MERGE_STATE, ms_fields)?;
1172
1173        Ok(MergeOutcome {
1174            merge_commit: Some(pick_commit),
1175            fast_forward: false,
1176            conflicts,
1177            merge_state_id: Some(merge_state_id),
1178        })
1179    }
1180
1181    pub fn vcs_revert(
1182        &self,
1183        connection_id: u64,
1184        commit: &str,
1185        author: Author,
1186    ) -> RedDBResult<Commit> {
1187        let store_arc = self.inner.db.store();
1188        let store: &UnifiedStore = &store_arc;
1189
1190        let src_hash = RedDBRuntime::vcs_resolve_commitish(self, commit)?;
1191        let src_commit = load_commit(store, &src_hash)
1192            .ok_or_else(|| RedDBError::NotFound(format!("revert source `{src_hash}` not found")))?;
1193        if src_commit.parents.is_empty() {
1194            return Err(RedDBError::InvalidConfig(
1195                "cannot revert a root commit".to_string(),
1196            ));
1197        }
1198        let parent_hash = src_commit.parents[0].clone();
1199
1200        let workset = load_workset(store, connection_id);
1201        let (head_branch, head_hash) = match workset {
1202            Some((branch, Some(head))) => (branch, head),
1203            Some((branch, None)) => {
1204                let head = load_ref(store, &branch)
1205                    .map(|r| r.target)
1206                    .unwrap_or_default();
1207                (branch, head)
1208            }
1209            None => {
1210                let head = load_ref(store, vc::DEFAULT_BRANCH_REF)
1211                    .map(|r| r.target)
1212                    .unwrap_or_default();
1213                (vc::DEFAULT_BRANCH_REF.to_string(), head)
1214            }
1215        };
1216        if head_hash.is_empty() {
1217            return Err(RedDBError::InvalidConfig(
1218                "cannot revert onto empty HEAD".to_string(),
1219            ));
1220        }
1221
1222        // Revert = 3-way merge with base = src, ours = HEAD,
1223        // theirs = parent(src). The asymmetry vs cherry-pick flips
1224        // which side provides the "forward" delta so the effect is
1225        // the inverse of the original commit.
1226        let message = format!("Revert \"{}\"", src_commit.message);
1227        let parents = vec![head_hash.clone()];
1228        let parent_height = load_commit(store, &head_hash)
1229            .map(|c| c.height)
1230            .unwrap_or(0);
1231        let height = parent_height + 1;
1232        let root_xid = self.inner.snapshot_manager.begin();
1233        self.inner.snapshot_manager.commit(root_xid);
1234        let timestamp_ms = now_ms();
1235
1236        let hash = compute_commit_hash(root_xid, &parents, &author, &message, timestamp_ms);
1237        let rv_commit = Commit {
1238            hash: hash.clone(),
1239            root_xid,
1240            parents,
1241            height,
1242            author: author.clone(),
1243            committer: author,
1244            message,
1245            timestamp_ms,
1246            signature: None,
1247        };
1248        save_commit(store, &rv_commit)?;
1249        if root_xid != XID_NONE {
1250            self.inner.snapshot_manager.pin(root_xid);
1251        }
1252        if !head_branch.is_empty() {
1253            save_ref(
1254                store,
1255                &Ref {
1256                    name: head_branch.clone(),
1257                    kind: RefKind::Branch,
1258                    target: hash.clone(),
1259                    protected: false,
1260                },
1261            )?;
1262        }
1263        upsert_workset(store, connection_id, &head_branch, Some(&hash), root_xid)?;
1264
1265        let merge_state_id = format!("rv:{}", &hash[..16]);
1266        // Record merge_state for later data apply. No conflict
1267        // materialisation here — revert conflicts are rare when the
1268        // reverted commit touches disjoint entities.
1269        let mut ms_fields: HashMap<String, Value> = HashMap::new();
1270        ms_fields.insert("id".to_string(), Value::text(merge_state_id.as_str()));
1271        ms_fields.insert("kind".to_string(), Value::text("revert"));
1272        ms_fields.insert("branch".to_string(), Value::text(head_branch.as_str()));
1273        ms_fields.insert("base".to_string(), Value::text(src_hash.as_str()));
1274        ms_fields.insert("ours".to_string(), Value::text(head_hash.as_str()));
1275        ms_fields.insert("theirs".to_string(), Value::text(parent_hash.as_str()));
1276        ms_fields.insert("conflicts_count".to_string(), Value::UnsignedInteger(0));
1277        insert_meta_row(store, vc::MERGE_STATE, ms_fields)?;
1278
1279        Ok(rv_commit)
1280    }
1281
1282    pub fn vcs_reset(&self, input: ResetInput) -> RedDBResult<()> {
1283        let store_arc = self.inner.db.store();
1284        let store: &UnifiedStore = &store_arc;
1285        let target_hash = RedDBRuntime::vcs_resolve_commitish(self, &input.target)?;
1286        let target_commit = load_commit(store, &target_hash).ok_or_else(|| {
1287            RedDBError::NotFound(format!("target commit `{target_hash}` not found"))
1288        })?;
1289
1290        // Find the current branch for this connection.
1291        let workset = load_workset(store, input.connection_id);
1292        let branch = workset
1293            .as_ref()
1294            .map(|(b, _)| b.clone())
1295            .unwrap_or_else(|| vc::DEFAULT_BRANCH_REF.to_string());
1296
1297        // Soft and Mixed both move the branch ref + workset base.
1298        // Hard would additionally revert entity data — deferred to
1299        // Phase 4 because it requires selective MVCC rewind.
1300        match input.mode {
1301            ResetMode::Soft | ResetMode::Mixed => {
1302                if !branch.is_empty() {
1303                    save_ref(
1304                        store,
1305                        &Ref {
1306                            name: branch.clone(),
1307                            kind: RefKind::Branch,
1308                            target: target_hash.clone(),
1309                            protected: false,
1310                        },
1311                    )?;
1312                }
1313                upsert_workset(
1314                    store,
1315                    input.connection_id,
1316                    &branch,
1317                    Some(&target_hash),
1318                    target_commit.root_xid,
1319                )?;
1320                Ok(())
1321            }
1322            ResetMode::Hard => Err(unimplemented("reset --hard (Phase 4)")),
1323        }
1324    }
1325
1326    pub fn vcs_log(&self, input: LogInput) -> RedDBResult<Vec<Commit>> {
1327        let store_arc = self.inner.db.store();
1328        let store: &UnifiedStore = &store_arc;
1329        let start = match &input.range.to {
1330            Some(spec) => RedDBRuntime::vcs_resolve_commitish(self, spec)?,
1331            None => {
1332                let workset = load_workset(store, input.connection_id);
1333                workset
1334                    .and_then(|(_, base)| base)
1335                    .or_else(|| load_ref(store, vc::DEFAULT_BRANCH_REF).map(|r| r.target))
1336                    .unwrap_or_default()
1337            }
1338        };
1339        if start.is_empty() {
1340            return Ok(Vec::new());
1341        }
1342        Ok(topo_walk(store, &start, &input.range))
1343    }
1344
1345    pub fn vcs_diff(&self, input: DiffInput) -> RedDBResult<Diff> {
1346        let store_arc = self.inner.db.store();
1347        let store: &UnifiedStore = &store_arc;
1348        let from_hash = RedDBRuntime::vcs_resolve_commitish(self, &input.from)?;
1349        let to_hash = RedDBRuntime::vcs_resolve_commitish(self, &input.to)?;
1350        let from_xid = RedDBRuntime::vcs_resolve_as_of(self, AsOfSpec::Commit(from_hash.clone()))?;
1351        let to_xid = RedDBRuntime::vcs_resolve_as_of(self, AsOfSpec::Commit(to_hash.clone()))?;
1352
1353        let sm = &self.inner.snapshot_manager;
1354        let from_snap = sm.snapshot(from_xid);
1355        let to_snap = sm.snapshot(to_xid);
1356
1357        // Iterate every *user* collection (skip internal red_*).
1358        let mut entries: Vec<DiffEntry> = Vec::new();
1359        let mut added = 0usize;
1360        let mut removed = 0usize;
1361        let mut modified = 0usize;
1362        let collections = store.list_collections();
1363        for coll in collections {
1364            if coll.starts_with("red_") {
1365                continue;
1366            }
1367            if !is_versioned(store, &coll) {
1368                continue;
1369            }
1370            if let Some(filter) = &input.collection {
1371                if filter != &coll {
1372                    continue;
1373                }
1374            }
1375            let Some(manager) = store.get_collection(&coll) else {
1376                continue;
1377            };
1378            let entities = manager.query_all(|_| true);
1379            // Group by entity id so we can compare before/after state.
1380            for entity in &entities {
1381                let xmin = entity.xmin;
1382                let xmax = entity.xmax;
1383                let in_from = from_snap.sees(xmin, xmax) && !sm.is_aborted(xmin);
1384                let in_to = to_snap.sees(xmin, xmax) && !sm.is_aborted(xmin);
1385                if in_from == in_to {
1386                    continue;
1387                }
1388                let entity_id = entity.id.raw().to_string();
1389                let payload = if input.summary_only {
1390                    JsonValue::Null
1391                } else {
1392                    JsonValue::String(format!("entity#{} xmin={} xmax={}", entity_id, xmin, xmax))
1393                };
1394                let change = match (in_from, in_to) {
1395                    (false, true) => {
1396                        added += 1;
1397                        DiffChange::Added { after: payload }
1398                    }
1399                    (true, false) => {
1400                        removed += 1;
1401                        DiffChange::Removed { before: payload }
1402                    }
1403                    _ => unreachable!(),
1404                };
1405                entries.push(DiffEntry {
1406                    collection: coll.clone(),
1407                    entity_id,
1408                    change,
1409                });
1410            }
1411        }
1412
1413        // Modified rows in an append-only MVCC are expressed as a pair
1414        // (remove old version + add new version sharing entity_id);
1415        // collapse them into DiffChange::Modified so the wire format
1416        // matches user intuition.
1417        entries = coalesce_modifications(entries, &mut added, &mut removed, &mut modified);
1418
1419        Ok(Diff {
1420            from: from_hash,
1421            to: to_hash,
1422            entries,
1423            added,
1424            removed,
1425            modified,
1426        })
1427    }
1428
1429    pub fn vcs_status(&self, input: StatusInput) -> RedDBResult<Status> {
1430        let store_arc = self.inner.db.store();
1431        let store: &UnifiedStore = &store_arc;
1432        let workset = load_workset(store, input.connection_id);
1433        let (head_ref, head_commit) = match workset {
1434            Some((branch, base)) => {
1435                let base = base.or_else(|| load_ref(store, &branch).map(|r| r.target));
1436                (Some(branch), base)
1437            }
1438            None => {
1439                let base = load_ref(store, vc::DEFAULT_BRANCH_REF).map(|r| r.target);
1440                (Some(vc::DEFAULT_BRANCH_REF.to_string()), base)
1441            }
1442        };
1443        let detached = matches!(&head_ref, Some(s) if s.is_empty());
1444        Ok(Status {
1445            connection_id: input.connection_id,
1446            head_ref: head_ref.filter(|s| !s.is_empty()),
1447            head_commit,
1448            detached,
1449            staged_changes: 0,
1450            working_changes: 0,
1451            unresolved_conflicts: 0,
1452            merge_state_id: None,
1453        })
1454    }
1455
1456    pub fn vcs_lca(&self, a: &str, b: &str) -> RedDBResult<Option<CommitHash>> {
1457        let store_arc = self.inner.db.store();
1458        let store: &UnifiedStore = &store_arc;
1459        let a_hash = RedDBRuntime::vcs_resolve_commitish(self, a)?;
1460        let b_hash = RedDBRuntime::vcs_resolve_commitish(self, b)?;
1461        let a_ancestors = ancestor_set(store, &a_hash, 100_000);
1462
1463        // BFS from b, return first hit in a_ancestors with the greatest
1464        // height (closest common ancestor to b).
1465        let mut visited: HashSet<CommitHash> = HashSet::new();
1466        let mut stack: Vec<CommitHash> = vec![b_hash];
1467        let mut best: Option<(u64, CommitHash)> = None;
1468        while let Some(hash) = stack.pop() {
1469            if !visited.insert(hash.clone()) {
1470                continue;
1471            }
1472            if a_ancestors.contains(&hash) {
1473                let height = load_commit(store, &hash).map(|c| c.height).unwrap_or(0);
1474                match &best {
1475                    Some((h, _)) if *h >= height => {}
1476                    _ => best = Some((height, hash.clone())),
1477                }
1478                // Don't descend below an ancestor; higher-height hits
1479                // further from root are already captured.
1480                continue;
1481            }
1482            if let Some(commit) = load_commit(store, &hash) {
1483                for p in commit.parents {
1484                    if !visited.contains(&p) {
1485                        stack.push(p);
1486                    }
1487                }
1488            }
1489        }
1490        Ok(best.map(|(_, h)| h))
1491    }
1492
1493    pub fn vcs_conflicts_list(&self, merge_state_id: &str) -> RedDBResult<Vec<Conflict>> {
1494        let store_arc = self.inner.db.store();
1495        let store: &UnifiedStore = &store_arc;
1496        let Some(manager) = store.get_collection(vc::CONFLICTS) else {
1497            return Ok(Vec::new());
1498        };
1499        let msid = merge_state_id.to_string();
1500        let out = manager
1501            .query_all(|entity| {
1502                entity
1503                    .data
1504                    .as_row()
1505                    .is_some_and(|row| row_text(row, "merge_state_id").as_deref() == Some(&msid))
1506            })
1507            .into_iter()
1508            .filter_map(|entity| {
1509                let row = entity.data.as_row()?;
1510                Some(Conflict {
1511                    id: row_text(row, "id")?,
1512                    collection: row_text(row, "collection")?,
1513                    entity_id: row_text(row, "entity_id").unwrap_or_default(),
1514                    base: row_json(row, "base_json"),
1515                    ours: row_json(row, "ours_json"),
1516                    theirs: row_json(row, "theirs_json"),
1517                    conflicting_paths: row_string_list(row, "conflicting_paths"),
1518                    merge_state_id: row_text(row, "merge_state_id").unwrap_or_default(),
1519                })
1520            })
1521            .collect();
1522        Ok(out)
1523    }
1524
1525    pub fn vcs_conflict_resolve(&self, conflict_id: &str, _resolved: JsonValue) -> RedDBResult<()> {
1526        let store_arc = self.inner.db.store();
1527        let store: &UnifiedStore = &store_arc;
1528        let Some(manager) = store.get_collection(vc::CONFLICTS) else {
1529            return Err(RedDBError::NotFound(format!(
1530                "conflict `{conflict_id}` not found"
1531            )));
1532        };
1533        let cid = conflict_id.to_string();
1534        let mut deleted = 0usize;
1535        let matches = manager.query_all(|entity| {
1536            entity
1537                .data
1538                .as_row()
1539                .is_some_and(|row| row_text(row, "id").as_deref() == Some(&cid))
1540        });
1541        for entity in matches {
1542            store
1543                .delete(vc::CONFLICTS, entity.id)
1544                .map_err(|e| RedDBError::Internal(e.to_string()))?;
1545            deleted += 1;
1546        }
1547        // NOTE: Phase 4 will also apply `_resolved` to the user
1548        // collection under the current working set before deleting
1549        // the conflict row. Here we only remove the marker.
1550        if deleted == 0 {
1551            return Err(RedDBError::NotFound(format!(
1552                "conflict `{conflict_id}` not found"
1553            )));
1554        }
1555        Ok(())
1556    }
1557
1558    pub fn vcs_resolve_as_of(&self, spec: AsOfSpec) -> RedDBResult<Xid> {
1559        let store_arc = self.inner.db.store();
1560        let store: &UnifiedStore = &store_arc;
1561        match spec {
1562            AsOfSpec::Snapshot(x) => Ok(x),
1563            AsOfSpec::Commit(h) => {
1564                let c = load_commit(store, &h)
1565                    .ok_or_else(|| RedDBError::NotFound(format!("commit `{h}` not found")))?;
1566                Ok(c.root_xid)
1567            }
1568            AsOfSpec::Branch(name) => {
1569                let full = normalize_branch_name(&name);
1570                let r = load_ref(store, &full).ok_or_else(|| {
1571                    RedDBError::NotFound(format!("branch `{full}` does not exist"))
1572                })?;
1573                let c = load_commit(store, &r.target).ok_or_else(|| {
1574                    RedDBError::NotFound(format!("branch `{full}` points to missing commit"))
1575                })?;
1576                Ok(c.root_xid)
1577            }
1578            AsOfSpec::Tag(name) => {
1579                let full = normalize_tag_name(&name);
1580                let r = load_ref(store, &full)
1581                    .ok_or_else(|| RedDBError::NotFound(format!("tag `{full}` does not exist")))?;
1582                let c = load_commit(store, &r.target).ok_or_else(|| {
1583                    RedDBError::NotFound(format!("tag `{full}` points to missing commit"))
1584                })?;
1585                Ok(c.root_xid)
1586            }
1587            AsOfSpec::TimestampMs(ts) => {
1588                let manager = store
1589                    .get_collection(vc::COMMITS)
1590                    .ok_or_else(|| RedDBError::NotFound("no commits exist yet".to_string()))?;
1591                // Find the commit with the greatest timestamp_ms <= ts.
1592                let mut best: Option<(i64, Xid)> = None;
1593                let entities = manager.query_all(|_| true);
1594                for entity in entities {
1595                    if let Some(row) = entity.data.as_row() {
1596                        let t = row_i64(row, "timestamp_ms").unwrap_or(0);
1597                        if t <= ts {
1598                            let xid = row_u64(row, "root_xid").unwrap_or(0);
1599                            match &best {
1600                                Some((bt, _)) if *bt >= t => {}
1601                                _ => best = Some((t, xid)),
1602                            }
1603                        }
1604                    }
1605                }
1606                best.map(|(_, x)| x)
1607                    .ok_or_else(|| RedDBError::NotFound(format!("no commit at or before ts={ts}")))
1608            }
1609        }
1610    }
1611
1612    pub fn vcs_set_versioned(&self, collection: &str, enabled: bool) -> RedDBResult<()> {
1613        let store_arc = self.inner.db.store();
1614        let store: &UnifiedStore = &store_arc;
1615        set_versioned_flag(store, collection, enabled)
1616    }
1617
1618    pub fn vcs_list_versioned(&self) -> RedDBResult<Vec<String>> {
1619        let store_arc = self.inner.db.store();
1620        let store: &UnifiedStore = &store_arc;
1621        let mut list = versioned_collections(store);
1622        list.sort();
1623        Ok(list)
1624    }
1625
1626    pub fn vcs_is_versioned(&self, collection: &str) -> RedDBResult<bool> {
1627        let store_arc = self.inner.db.store();
1628        let store: &UnifiedStore = &store_arc;
1629        Ok(is_versioned(store, collection))
1630    }
1631
1632    pub fn vcs_resolve_commitish(&self, spec: &str) -> RedDBResult<CommitHash> {
1633        let store_arc = self.inner.db.store();
1634        let store: &UnifiedStore = &store_arc;
1635        if spec.is_empty() {
1636            return Err(RedDBError::InvalidConfig("empty commitish".to_string()));
1637        }
1638
1639        // 1. Exact commit hash.
1640        if spec.len() == 64
1641            && spec.chars().all(|c| c.is_ascii_hexdigit())
1642            && load_commit(store, spec).is_some()
1643        {
1644            return Ok(spec.to_string());
1645        }
1646
1647        // 2. Full ref name (refs/heads/..., refs/tags/...).
1648        if spec.starts_with("refs/") {
1649            if let Some(hash) = resolve_ref_chain(store, spec) {
1650                return Ok(hash);
1651            }
1652        }
1653
1654        // 3. Short branch / tag name.
1655        let normalized_branch = normalize_branch_name(spec);
1656        if let Some(hash) = resolve_ref_chain(store, &normalized_branch) {
1657            return Ok(hash);
1658        }
1659        let normalized_tag = normalize_tag_name(spec);
1660        if let Some(hash) = resolve_ref_chain(store, &normalized_tag) {
1661            return Ok(hash);
1662        }
1663
1664        // 4. Short commit hash prefix (≥ 4 chars, unique match).
1665        if let Some(hash) = resolve_short_commit(store, spec) {
1666            return Ok(hash);
1667        }
1668
1669        Err(RedDBError::NotFound(format!(
1670            "commitish `{spec}` did not resolve to any ref or commit"
1671        )))
1672    }
1673}
1674
1675/// Detect entities changed on both sides of a 3-way merge and write
1676/// a `red_conflicts` row for each. Returns the in-memory `Conflict`
1677/// list in the same shape `vcs_conflicts_list` would later read back.
1678///
1679/// Phase 5: metadata-level only — identifiers + which sides moved.
1680/// Phase 6 will stage the merged body (base/ours/theirs payloads +
1681/// JSON 3-way merge result) so `vcs_conflict_resolve` can apply the
1682/// resolved value back to the user collection.
1683fn materialize_merge_conflicts(
1684    rt: &RedDBRuntime,
1685    store: &UnifiedStore,
1686    base_hash: &str,
1687    ours_hash: &str,
1688    theirs_hash: &str,
1689    merge_state_id: &str,
1690) -> RedDBResult<Vec<Conflict>> {
1691    use crate::application::merge_json::three_way_merge;
1692    use crate::application::vcs::AsOfSpec;
1693
1694    let base_xid = rt.vcs_resolve_as_of(AsOfSpec::Commit(base_hash.to_string()))?;
1695    let ours_xid = rt.vcs_resolve_as_of(AsOfSpec::Commit(ours_hash.to_string()))?;
1696    let theirs_xid = rt.vcs_resolve_as_of(AsOfSpec::Commit(theirs_hash.to_string()))?;
1697
1698    let sm = &rt.inner.snapshot_manager;
1699    let base_snap = sm.snapshot(base_xid);
1700    let ours_snap = sm.snapshot(ours_xid);
1701    let theirs_snap = sm.snapshot(theirs_xid);
1702
1703    // Walk every VCS-opted-in user collection, materialising
1704    // per-(entity_id) visible JSON bodies at each of the three
1705    // snapshots. Collections that never opted in are skipped — by
1706    // definition they have no history semantics worth conflicting
1707    // over.
1708    let mut conflicts: Vec<Conflict> = Vec::new();
1709    for coll in store.list_collections() {
1710        if coll.starts_with("red_") {
1711            continue;
1712        }
1713        if !is_versioned(store, &coll) {
1714            continue;
1715        }
1716        let Some(manager) = store.get_collection(&coll) else {
1717            continue;
1718        };
1719        let mut at_base: HashMap<u64, JsonValue> = HashMap::new();
1720        let mut at_ours: HashMap<u64, JsonValue> = HashMap::new();
1721        let mut at_theirs: HashMap<u64, JsonValue> = HashMap::new();
1722        for entity in manager.query_all(|_| true) {
1723            let xmin = entity.xmin;
1724            let xmax = entity.xmax;
1725            if sm.is_aborted(xmin) {
1726                continue;
1727            }
1728            let eid = entity.id.raw();
1729            let body = crate::presentation::entity_json::compact_entity_json(&entity);
1730            if base_snap.sees(xmin, xmax) {
1731                at_base.insert(eid, body.clone());
1732            }
1733            if ours_snap.sees(xmin, xmax) {
1734                at_ours.insert(eid, body.clone());
1735            }
1736            if theirs_snap.sees(xmin, xmax) {
1737                at_theirs.insert(eid, body);
1738            }
1739        }
1740
1741        let mut all_ids: std::collections::BTreeSet<u64> = std::collections::BTreeSet::new();
1742        all_ids.extend(at_base.keys().copied());
1743        all_ids.extend(at_ours.keys().copied());
1744        all_ids.extend(at_theirs.keys().copied());
1745
1746        for eid in all_ids {
1747            let b = at_base.get(&eid).cloned().unwrap_or(JsonValue::Null);
1748            let o = at_ours.get(&eid).cloned().unwrap_or(JsonValue::Null);
1749            let t = at_theirs.get(&eid).cloned().unwrap_or(JsonValue::Null);
1750            let ours_changed = b != o;
1751            let theirs_changed = b != t;
1752            if !(ours_changed && theirs_changed) {
1753                continue;
1754            }
1755            if o == t {
1756                continue;
1757            }
1758            let merge = three_way_merge(&b, &o, &t);
1759            if merge.is_clean() {
1760                // Both sides touched different paths — no conflict
1761                // to record; Phase 6.2 will stage the merged body
1762                // into the workset for automatic apply.
1763                continue;
1764            }
1765            let conflict_id = format!("{}:{}/{}", merge_state_id, coll, eid);
1766            let paths: Vec<String> = merge
1767                .conflicts
1768                .iter()
1769                .map(|c| {
1770                    if c.path.is_empty() {
1771                        "*".to_string()
1772                    } else {
1773                        c.path.clone()
1774                    }
1775                })
1776                .collect();
1777
1778            let mut fields: HashMap<String, Value> = HashMap::new();
1779            fields.insert("id".to_string(), Value::text(conflict_id.as_str()));
1780            fields.insert("collection".to_string(), Value::text(coll.as_str()));
1781            fields.insert(
1782                "entity_id".to_string(),
1783                Value::text(eid.to_string().as_str()),
1784            );
1785            fields.insert("merge_state_id".to_string(), Value::text(merge_state_id));
1786            fields.insert(
1787                "conflicting_paths".to_string(),
1788                Value::Array(paths.iter().map(|p| Value::text(p.as_str())).collect()),
1789            );
1790            // Persist the three bodies as Value::Json blobs so
1791            // vcs_conflicts_list can hydrate them back into the
1792            // Conflict struct for presentation.
1793            if let Ok(bytes) = crate::json::to_vec(&b) {
1794                fields.insert("base_json".to_string(), Value::Json(bytes));
1795            }
1796            if let Ok(bytes) = crate::json::to_vec(&o) {
1797                fields.insert("ours_json".to_string(), Value::Json(bytes));
1798            }
1799            if let Ok(bytes) = crate::json::to_vec(&t) {
1800                fields.insert("theirs_json".to_string(), Value::Json(bytes));
1801            }
1802            insert_meta_row(store, vc::CONFLICTS, fields)?;
1803            conflicts.push(Conflict {
1804                id: conflict_id,
1805                collection: coll.clone(),
1806                entity_id: eid.to_string(),
1807                base: b,
1808                ours: o,
1809                theirs: t,
1810                conflicting_paths: paths,
1811                merge_state_id: merge_state_id.to_string(),
1812            });
1813        }
1814    }
1815    Ok(conflicts)
1816}
1817
1818// Collapse Add+Remove pairs sharing the same (collection, entity_id)
1819// into a single `Modified` entry. Called by `vcs_diff` after the
1820// naive pass so the caller sees one row per change, not two.
1821fn coalesce_modifications(
1822    entries: Vec<DiffEntry>,
1823    added: &mut usize,
1824    removed: &mut usize,
1825    modified: &mut usize,
1826) -> Vec<DiffEntry> {
1827    let mut by_key: HashMap<(String, String), Vec<DiffEntry>> = HashMap::new();
1828    for e in entries {
1829        by_key
1830            .entry((e.collection.clone(), e.entity_id.clone()))
1831            .or_default()
1832            .push(e);
1833    }
1834    let mut out: Vec<DiffEntry> = Vec::new();
1835    for ((coll, eid), group) in by_key {
1836        if group.len() >= 2 {
1837            let mut before = JsonValue::Null;
1838            let mut after = JsonValue::Null;
1839            for item in group {
1840                match item.change {
1841                    DiffChange::Removed { before: b } => before = b,
1842                    DiffChange::Added { after: a } => after = a,
1843                    DiffChange::Modified { .. } => {}
1844                }
1845            }
1846            *added = added.saturating_sub(1);
1847            *removed = removed.saturating_sub(1);
1848            *modified += 1;
1849            out.push(DiffEntry {
1850                collection: coll,
1851                entity_id: eid,
1852                change: DiffChange::Modified { before, after },
1853            });
1854        } else {
1855            out.extend(group);
1856        }
1857    }
1858    out.sort_by(|a, b| {
1859        a.collection
1860            .cmp(&b.collection)
1861            .then(a.entity_id.cmp(&b.entity_id))
1862    });
1863    out
1864}
1865
1866// Unused-import guards for stubs that reference types via `_`.
1867#[allow(dead_code)]
1868fn _touch_types(_: BTreeSet<()>) {}