Skip to main content

uni_fork/
diff.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Fork diff & promote engine (Phase 6+).
5//!
6//! `compute_diff` computes the structural delta between two views; `run_promote`
7//! scans a fork for matched rows and bulk-inserts them onto primary. Both are
8//! generic over the [`ForkQueryHost`] / [`ForkPromoteSink`] host traits that
9//! uni-db implements for its `Session`/`Transaction` types.
10
11// ============================================================================
12// Diff engine
13// ============================================================================
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use tracing::warn;
18use uni_common::Properties;
19use uni_common::Result;
20use uni_common::Value;
21use uni_common::core::id::{UniId, Vid};
22
23use crate::host::{ForkPromoteSink, ForkQueryHost};
24use crate::types::{
25    ConflictPolicy, DiffEdge, DiffVertex, EdgeDiff, ForkDiff, PromoteBaseline, PromoteOptions,
26    PromotePattern, PromoteReport, PropertyChange, VertexDiff, VertexPropertyChange,
27};
28
29/// Compute the structural delta between two views.
30///
31/// Both `a` and `b` may be primary or forked sessions. The convention is
32/// *forward*: returned `ForkDiff.vertices.added` is rows present in `b`
33/// but not `a`; `deleted` is rows in `a` but not `b`.
34///
35/// Identity is content-addressed UID for vertices and `(src_uid,
36/// dst_uid)` for edges, scoped by edge type — so two unrelated forks
37/// with overlapping VIDs but distinct content pair correctly.
38pub async fn compute_diff<Q: ForkQueryHost + ?Sized>(a: &Q, b: &Q) -> Result<ForkDiff> {
39    let mut diff = ForkDiff::default();
40
41    // vid → ext_id per side: ext_id is folded into the content UID but stripped
42    // from query results, so look it up from storage (review H4).
43    let ext_a = a.storage().get_vertex_ext_ids().await.unwrap_or_default();
44    let ext_b = b.storage().get_vertex_ext_ids().await.unwrap_or_default();
45
46    let labels_a: HashSet<String> = a.schema().schema().labels.keys().cloned().collect();
47    let labels_b: HashSet<String> = b.schema().schema().labels.keys().cloned().collect();
48    let labels_union: Vec<&String> = labels_a.union(&labels_b).collect();
49
50    for label in labels_union {
51        let rows_a = scan_label_nodes(a, label, &ext_a).await?;
52        let rows_b = scan_label_nodes(b, label, &ext_b).await?;
53        diff_label(label, rows_a, rows_b, &mut diff.vertices);
54    }
55
56    let edges_a: HashSet<String> = a.schema().schema().edge_types.keys().cloned().collect();
57    let edges_b: HashSet<String> = b.schema().schema().edge_types.keys().cloned().collect();
58    let edges_union: Vec<&String> = edges_a.union(&edges_b).collect();
59
60    for edge_type in edges_union {
61        let rows_a = scan_edge_type(a, edge_type, &ext_a).await?;
62        let rows_b = scan_edge_type(b, edge_type, &ext_b).await?;
63        diff_edge_type(edge_type, rows_a, rows_b, &mut diff.edges);
64    }
65
66    Ok(diff)
67}
68
69/// A vertex's `ext_id` for content-UID computation, from a `vid → ext_id` map
70/// sourced from storage (`StorageManager::get_vertex_ext_ids`).
71///
72/// `ext_id` is folded into the storage `_uid` but is stripped from query
73/// results, so the diff can't recover it by re-hashing query rows — two
74/// vertices differing only by `ext_id` would collapse to one identity (review
75/// H4). We fold it back into the *recomputed* UID (not the storage `_uid`,
76/// which diverges from a recompute and breaks L0/flushed consistency). Vertices
77/// without an `ext_id` are absent from the map → `None`, i.e. unchanged
78/// behavior. Limitation: covers flushed rows; a vertex created fork-local and
79/// not yet flushed is absent from the map (its `ext_id` collapse only matters
80/// for promote, and flushing the fork closes it).
81fn ext_id_for(map: &HashMap<Vid, String>, vid: Vid) -> Option<&str> {
82    map.get(&vid).map(String::as_str)
83}
84
85/// One bucketed vertex row keyed by content UID.
86type VertexBucket = HashMap<UniId, VertexRow>;
87/// One bucketed edge row keyed by content-addressed edge UID
88/// (`compute_edge_uid(src_uid, dst_uid, type, properties)`). Two
89/// parallel edges between the same endpoints with different property
90/// bags hash to different keys and therefore appear as distinct
91/// entries — that's the Phase 7d multi-edge semantics.
92type EdgeBucket = HashMap<UniId, EdgeRow>;
93
94#[derive(Debug, Clone)]
95struct VertexRow {
96    label: String,
97    vid: Vid,
98    properties: Properties,
99}
100
101#[derive(Debug, Clone)]
102struct EdgeRow {
103    src_uid: UniId,
104    dst_uid: UniId,
105    properties: Properties,
106}
107
108async fn scan_label_nodes<Q: ForkQueryHost + ?Sized>(
109    s: &Q,
110    label: &str,
111    ext_ids: &HashMap<Vid, String>,
112) -> Result<VertexBucket> {
113    use uni_store::storage::vertex::VertexDataset;
114    let cypher = format!("MATCH (n:`{}`) RETURN n", escape_backticks(label));
115    let result = s.query(&cypher).await?;
116    let mut bucket = VertexBucket::new();
117    for row in result.rows() {
118        let Some(Value::Node(node)) = row.value("n") else {
119            continue;
120        };
121        // The MATCH already filters to nodes carrying `label`, so the bucketed
122        // row's label is always `label`. Fold the stored `ext_id` into the UID
123        // so ext_id-distinct vertices don't collapse (review H4).
124        let uid = VertexDataset::compute_vertex_uid(
125            label,
126            ext_id_for(ext_ids, node.vid),
127            &node.properties,
128        );
129        if bucket
130            .insert(
131                uid,
132                VertexRow {
133                    label: label.to_string(),
134                    vid: node.vid,
135                    properties: node.properties.clone(),
136                },
137            )
138            .is_some()
139        {
140            // Two distinct vertices hashed to the same content UID — one will be
141            // dropped from the diff. Observable signal for residual identity
142            // collisions (review H4).
143            warn!(
144                label,
145                vid = node.vid.as_u64(),
146                "fork diff: vertex content-UID collision; a row is being shadowed"
147            );
148        }
149    }
150    Ok(bucket)
151}
152
153async fn scan_edge_type<Q: ForkQueryHost + ?Sized>(
154    s: &Q,
155    edge_type: &str,
156    ext_ids: &HashMap<Vid, String>,
157) -> Result<EdgeBucket> {
158    use uni_store::storage::main_edge::MainEdgeDataset;
159    use uni_store::storage::vertex::VertexDataset;
160    let cypher = format!(
161        "MATCH (a)-[r:`{}`]->(b) RETURN a, r, b",
162        escape_backticks(edge_type)
163    );
164    let result = s.query(&cypher).await?;
165    let mut bucket = EdgeBucket::new();
166    for row in result.rows() {
167        let (Some(Value::Edge(edge)), Some(Value::Node(a)), Some(Value::Node(b))) =
168            (row.value("r"), row.value("a"), row.value("b"))
169        else {
170            continue;
171        };
172        let a_label = a.labels.first().cloned().unwrap_or_default();
173        let b_label = b.labels.first().cloned().unwrap_or_default();
174        let src_uid =
175            VertexDataset::compute_vertex_uid(&a_label, ext_id_for(ext_ids, a.vid), &a.properties);
176        let dst_uid =
177            VertexDataset::compute_vertex_uid(&b_label, ext_id_for(ext_ids, b.vid), &b.properties);
178        let edge_uid =
179            MainEdgeDataset::compute_edge_uid(&src_uid, &dst_uid, edge_type, &edge.properties);
180        if bucket
181            .insert(
182                edge_uid,
183                EdgeRow {
184                    src_uid,
185                    dst_uid,
186                    properties: edge.properties.clone(),
187                },
188            )
189            .is_some()
190        {
191            warn!(
192                edge_type,
193                "fork diff: edge content-UID collision; a row is being shadowed"
194            );
195        }
196    }
197    Ok(bucket)
198}
199
200/// Split two content-keyed buckets into *added* (present in `b`, not `a`)
201/// and *deleted* (present in `a`, not `b`) rows, moving each row out of its
202/// owning map via the supplied builders. Returns the rows shared by both
203/// buckets (`(uid, row_a, row_b)`) so the caller can diff their properties.
204fn partition_added_deleted<R, A, D>(
205    mut a: HashMap<UniId, R>,
206    mut b: HashMap<UniId, R>,
207    mut mk_added: A,
208    mut mk_deleted: D,
209) -> Vec<(UniId, R, R)>
210where
211    A: FnMut(UniId, R),
212    D: FnMut(UniId, R),
213{
214    let keys_a: HashSet<UniId> = a.keys().copied().collect();
215    let keys_b: HashSet<UniId> = b.keys().copied().collect();
216
217    let mut common = Vec::new();
218    for uid in &keys_b {
219        if !keys_a.contains(uid) {
220            mk_added(*uid, b.remove(uid).expect("key from keys_b"));
221        }
222    }
223    for uid in &keys_a {
224        match keys_b.contains(uid) {
225            true => {
226                let row_a = a.remove(uid).expect("key from keys_a");
227                let row_b = b.remove(uid).expect("shared key in b");
228                common.push((*uid, row_a, row_b));
229            }
230            false => mk_deleted(*uid, a.remove(uid).expect("key from keys_a")),
231        }
232    }
233    common
234}
235
236fn diff_label(label: &str, a: VertexBucket, b: VertexBucket, out: &mut VertexDiff) {
237    let common = partition_added_deleted(
238        a,
239        b,
240        |uid, row| {
241            out.added.push(DiffVertex {
242                label: row.label,
243                uid,
244                vid: Some(row.vid),
245                properties: row.properties,
246            });
247        },
248        |uid, row| {
249            out.deleted.push(DiffVertex {
250                label: row.label,
251                uid,
252                vid: Some(row.vid),
253                properties: row.properties,
254            });
255        },
256    );
257    for (uid, row_a, row_b) in common {
258        let changes = property_changes(&row_a.properties, &row_b.properties);
259        if !changes.is_empty() {
260            out.changed.push(VertexPropertyChange {
261                label: label.to_string(),
262                uid,
263                changes,
264            });
265        }
266    }
267}
268
269fn diff_edge_type(edge_type: &str, a: EdgeBucket, b: EdgeBucket, out: &mut EdgeDiff) {
270    // Note: under content-addressed identity, two edges with the same
271    // edge_uid have, by construction, identical (src, dst, type,
272    // properties) — so the shared (intersection) rows cannot contain a
273    // property difference. The `changed` branch is intentionally
274    // unreachable under multi-edge semantics; property mutations surface
275    // as added+deleted of distinct edge UIDs. `EdgePropertyChange` remains
276    // in the public API for forward compatibility with a future identity
277    // model that anchors on a stable edge id. We therefore discard the
278    // common rows.
279    partition_added_deleted(
280        a,
281        b,
282        |edge_uid, row| {
283            out.added.push(DiffEdge {
284                edge_type: edge_type.to_string(),
285                edge_uid,
286                src_uid: row.src_uid,
287                dst_uid: row.dst_uid,
288                properties: row.properties,
289            });
290        },
291        |edge_uid, row| {
292            out.deleted.push(DiffEdge {
293                edge_type: edge_type.to_string(),
294                edge_uid,
295                src_uid: row.src_uid,
296                dst_uid: row.dst_uid,
297                properties: row.properties,
298            });
299        },
300    );
301}
302
303fn property_changes(a: &Properties, b: &Properties) -> Vec<PropertyChange> {
304    let mut changes = Vec::new();
305    let keys: HashSet<&String> = a.keys().chain(b.keys()).collect();
306    let mut sorted: Vec<&String> = keys.into_iter().collect();
307    sorted.sort();
308    for k in sorted {
309        let va = a.get(k);
310        let vb = b.get(k);
311        if va != vb {
312            changes.push(PropertyChange {
313                key: k.clone(),
314                before: va.cloned(),
315                after: vb.cloned(),
316            });
317        }
318    }
319    changes
320}
321
322fn escape_backticks(s: &str) -> String {
323    s.replace('`', "``")
324}
325
326/// Resolve a set of UIDs to their primary VIDs in two queries
327/// regardless of the input size.
328///
329/// Returns a `HashMap<UniId, Vid>` containing only those UIDs that
330/// successfully resolve to a *primary* VID (i.e., a candidate VID
331/// from the shared `UidIndex` is actually present in primary's view
332/// of the label's vertex table). UIDs absent from the result map
333/// either had no candidate registered or all candidates pointed at
334/// fork-only rows.
335///
336/// Two queries per call regardless of `uids.len()`: one IN-filter
337/// scan of `UidIndex`'s dataset (collecting **all** registered VIDs
338/// per UID — `UidIndex::resolve_uids` collapses to one VID per UID
339/// which loses fork/primary disambiguation), and one primary Cypher
340/// MATCH with an `id(n) IN [...]` predicate to confirm which
341/// candidates live on primary.
342async fn batch_resolve_primary_vids<Q: ForkQueryHost + ?Sized>(
343    primary: &Q,
344    primary_storage: &Arc<uni_store::storage::manager::StorageManager>,
345    label: &str,
346    uids: &[UniId],
347) -> (HashMap<UniId, Vid>, bool) {
348    // NOTE: every error path below degrades to whatever has been
349    // resolved so far (an empty or partial map) rather than
350    // propagating. This is deliberate: `run_promote` treats an
351    // unresolved UID as "not present on primary" and inserts it, so a
352    // transient resolve failure must not abort the promote. The returned
353    // `degraded` flag (M5) tells the caller that "absent" was inferred
354    // from a failed resolve, so the resulting inserts are unverified and
355    // may be duplicates — surfaced as `vertices_inserted_unverified`.
356    let mut out: HashMap<UniId, Vid> = HashMap::new();
357    if uids.is_empty() {
358        return (out, false);
359    }
360    // Collect *all* candidate VIDs per UID by scanning the shared
361    // UidIndex with an IN filter. The shared index is not
362    // branch-isolated, so a single UID may have a fork-only VID and
363    // a primary VID both registered — we keep both and let the
364    // primary Cypher MATCH below decide which is real.
365    let candidates_per_uid: HashMap<UniId, Vec<Vid>> = match primary_storage.uid_index(label).ok() {
366        Some(uix) => match resolve_all_candidate_vids(&uix, uids).await {
367            Ok(m) => m,
368            Err(_) => return (out, true),
369        },
370        None => return (out, true),
371    };
372    if candidates_per_uid.is_empty() {
373        return (out, false);
374    }
375    // Single Cypher with IN clause over every candidate VID across
376    // every UID. Primary's branched backend filters out fork-only
377    // VIDs naturally — they have no row in the primary view.
378    let vid_set: HashSet<u64> = candidates_per_uid
379        .values()
380        .flat_map(|vs| vs.iter().map(|v| v.as_u64()))
381        .collect();
382    let vid_list: Vec<String> = vid_set.iter().map(|v| v.to_string()).collect();
383    let cypher = format!(
384        "MATCH (n:`{}`) WHERE id(n) IN [{}] RETURN id(n) AS vid",
385        escape_backticks(label),
386        vid_list.join(", ")
387    );
388    let rs = match primary.query(&cypher).await {
389        Ok(rs) => rs,
390        Err(_) => return (out, true),
391    };
392    let primary_vids: HashSet<u64> = rs
393        .rows()
394        .iter()
395        .filter_map(|row| row.get::<i64>("vid").ok())
396        .map(|v| v as u64)
397        .collect();
398    for (uid, vids) in candidates_per_uid {
399        // If *any* candidate VID for this UID lives on primary, the
400        // UID exists on primary. Pick the first such VID.
401        if let Some(vid) = vids
402            .into_iter()
403            .find(|v| primary_vids.contains(&v.as_u64()))
404        {
405            out.insert(uid, vid);
406        }
407    }
408    (out, false)
409}
410
411/// Resolve fork candidate vertices to existing primary VIDs by their
412/// stable `(label, ext_id)` identity, returning each match's current
413/// primary properties for the upsert equality check.
414///
415/// Unlike [`batch_resolve_primary_vids`] (which keys by mutable
416/// content-UID and so cannot recognize an *edited* vertex), this keys by
417/// the immutable `ext_id`, so a fork edit resolves to the same primary
418/// vertex instead of looking like a brand-new row. Fork rows whose
419/// `ext_id` is absent are not returned here and fall back to the
420/// content-UID path.
421///
422/// A failed primary round-trip degrades to an empty map (treated as "not
423/// present" → insert), matching the deliberate non-aborting contract.
424async fn batch_resolve_primary_by_ext_id<Q: ForkQueryHost + ?Sized>(
425    primary: &Q,
426    primary_ext_ids: &HashMap<Vid, String>,
427    label: &str,
428    ext_ids: &HashSet<String>,
429) -> HashMap<String, (Vid, Properties)> {
430    let mut out: HashMap<String, (Vid, Properties)> = HashMap::new();
431    if ext_ids.is_empty() {
432        return out;
433    }
434    // Invert primary's vid→ext_id map for just the candidate ext_ids.
435    // `get_vertex_ext_ids` is not label-scoped, so the Cypher below
436    // confirms the label (and fetches current props).
437    let mut ext_to_vid: HashMap<String, Vid> = HashMap::new();
438    for (vid, eid) in primary_ext_ids {
439        if ext_ids.contains(eid) {
440            ext_to_vid.insert(eid.clone(), *vid);
441        }
442    }
443    if ext_to_vid.is_empty() {
444        return out;
445    }
446    let vid_list: Vec<String> = ext_to_vid
447        .values()
448        .map(|v| v.as_u64().to_string())
449        .collect();
450    let cypher = format!(
451        "MATCH (n:`{}`) WHERE id(n) IN [{}] RETURN id(n) AS vid, n AS node",
452        escape_backticks(label),
453        vid_list.join(", ")
454    );
455    let Ok(rs) = primary.query(&cypher).await else {
456        return out;
457    };
458    let mut vid_to_props: HashMap<u64, Properties> = HashMap::new();
459    for row in rs.rows() {
460        if let Ok(vid) = row.get::<i64>("vid")
461            && let Some(Value::Node(node)) = row.value("node")
462        {
463            vid_to_props.insert(vid as u64, node.properties.clone());
464        }
465    }
466    for (eid, vid) in ext_to_vid {
467        if let Some(props) = vid_to_props.get(&vid.as_u64()) {
468            out.insert(eid, (vid, props.clone()));
469        }
470    }
471    out
472}
473
474/// Scan `UidIndex`'s underlying dataset with an `_uid_hex IN (...)`
475/// filter and collect **every** VID registered for each UID — unlike
476/// `UidIndex::resolve_uids`, which collapses to one VID per UID via
477/// HashMap overwrite (losing fork-vs-primary disambiguation).
478async fn resolve_all_candidate_vids(
479    uix: &uni_store::storage::index::UidIndex,
480    uids: &[UniId],
481) -> uni_common::Result<HashMap<UniId, Vec<Vid>>> {
482    use arrow_array::Array;
483    use futures::TryStreamExt;
484
485    // Lance/DataFusion errors all wrap uniformly as `Internal`; the
486    // generic bound lets one helper cover the scan-builder and stream
487    // error types alike.
488    fn internal<E>(e: E) -> uni_common::UniError
489    where
490        E: std::error::Error + Send + Sync + 'static,
491    {
492        uni_common::UniError::Internal(anyhow::anyhow!(e))
493    }
494
495    let ds = uix.open().await.map_err(uni_common::UniError::Internal)?;
496    let hex_values: Vec<String> = uids.iter().map(uid_to_hex).collect();
497    let filter = format!(
498        "_uid_hex IN ({})",
499        hex_values
500            .iter()
501            .map(|h| format!("'{}'", h))
502            .collect::<Vec<_>>()
503            .join(", ")
504    );
505    let mut stream = ds
506        .scan()
507        .filter(&filter)
508        .map_err(internal)?
509        .project(&["_uid_hex", "_vid"])
510        .map_err(internal)?
511        .try_into_stream()
512        .await
513        .map_err(internal)?;
514
515    let hex_to_uid: HashMap<String, UniId> =
516        uids.iter().map(|uid| (uid_to_hex(uid), *uid)).collect();
517    let mut out: HashMap<UniId, Vec<Vid>> = HashMap::new();
518    while let Some(batch) = stream.try_next().await.map_err(internal)? {
519        let uid_hex_col = batch
520            .column_by_name("_uid_hex")
521            .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>())
522            .ok_or_else(|| {
523                uni_common::UniError::Internal(anyhow::anyhow!("Missing _uid_hex column"))
524            })?;
525        let vid_col = batch
526            .column_by_name("_vid")
527            .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
528            .ok_or_else(|| {
529                uni_common::UniError::Internal(anyhow::anyhow!("Missing _vid column"))
530            })?;
531        for i in 0..batch.num_rows() {
532            if uid_hex_col.is_null(i) {
533                continue;
534            }
535            let hex = uid_hex_col.value(i);
536            if let Some(&uid) = hex_to_uid.get(hex) {
537                out.entry(uid)
538                    .or_default()
539                    .push(Vid::from(vid_col.value(i)));
540            }
541        }
542    }
543    Ok(out)
544}
545
546fn uid_to_hex(uid: &UniId) -> String {
547    uid.as_bytes()
548        .iter()
549        .map(|b| format!("{:02x}", b))
550        .collect()
551}
552
553// ============================================================================
554// Promote engine
555// ============================================================================
556
557/// Scan a fork session for matches per pattern, then bulk-insert the
558/// matched vertices on primary (deduplicated by content-derived UID)
559/// and edges (deduplicated by `(src_uid, dst_uid, edge_type)`).
560///
561/// Edges whose endpoints don't exist on primary by UID are skipped and
562/// counted in `edges_skipped_no_endpoint` — promote the missing
563/// vertices first via a vertex pattern, then re-run.
564///
565/// If the call contains no edge patterns, incidental edges on the fork
566/// are counted in `edges_skipped` and a tracing warning is emitted.
567pub async fn run_promote<Q, S>(
568    fork: &Q,
569    primary: &Q,
570    primary_tx: &S,
571    patterns: &[PromotePattern],
572    options: &PromoteOptions,
573    baseline: Option<&PromoteBaseline>,
574) -> Result<PromoteReport>
575where
576    Q: ForkQueryHost + ?Sized,
577    S: ForkPromoteSink + ?Sized,
578{
579    use uni_store::storage::vertex::VertexDataset;
580
581    let mut report = PromoteReport {
582        per_pattern_inserted: vec![0usize; patterns.len()],
583        ..Default::default()
584    };
585
586    let primary_storage = primary.storage();
587    // vid → ext_id maps so promote keys candidates by the same ext_id-aware
588    // content UID, distinguishing ext_id-distinct rows (review H4).
589    let fork_ext_ids = fork
590        .storage()
591        .get_vertex_ext_ids()
592        .await
593        .unwrap_or_default();
594    let primary_ext_ids = primary_storage
595        .get_vertex_ext_ids()
596        .await
597        .unwrap_or_default();
598    let mut any_edge_pattern = false;
599    // Cache of vertices just promoted inside this call. Edge patterns
600    // check this before falling back to primary's UidIndex + Cypher
601    // verify — pending tx_l0 writes aren't visible to a primary
602    // Cypher round-trip until commit, so without this cache an edge
603    // pattern in the same call wouldn't see endpoints we just added.
604    let mut just_inserted: HashMap<(String, UniId), Vid> = HashMap::new();
605
606    for (idx, pattern) in patterns.iter().enumerate() {
607        match pattern {
608            PromotePattern::Vertex {
609                label,
610                where_clause,
611            } => {
612                let cypher = match where_clause {
613                    Some(w) => format!(
614                        "MATCH (n:`{}`) WHERE {} RETURN n",
615                        escape_backticks(label),
616                        w
617                    ),
618                    None => format!("MATCH (n:`{}`) RETURN n", escape_backticks(label)),
619                };
620
621                let result = fork.query(&cypher).await?;
622                if result.rows().is_empty() {
623                    continue;
624                }
625
626                // First pass: extract (uid, props, ext_id) for every fork
627                // row, skipping rows already in the within-call cache.
628                let mut candidates: Vec<(UniId, Properties, Option<String>)> =
629                    Vec::with_capacity(result.rows().len());
630                for row in result.rows() {
631                    let Some(Value::Node(node)) = row.value("n") else {
632                        continue;
633                    };
634                    let ext_id = ext_id_for(&fork_ext_ids, node.vid).map(str::to_string);
635                    let uid = VertexDataset::compute_vertex_uid(
636                        label,
637                        ext_id.as_deref(),
638                        &node.properties,
639                    );
640                    if just_inserted.contains_key(&(label.clone(), uid)) {
641                        report.vertices_skipped_uid_conflict += 1;
642                        continue;
643                    }
644                    candidates.push((uid, node.properties.clone(), ext_id));
645                }
646
647                // M4 upsert: resolve ext_id-bearing candidates against
648                // primary by their stable `(label, ext_id)` identity so a
649                // fork EDIT updates the existing vertex instead of inserting
650                // a twin. Only consulted when `options.upsert`.
651                let ext_resolved: HashMap<String, (Vid, Properties)> = if options.upsert {
652                    let ext_ids: HashSet<String> = candidates
653                        .iter()
654                        .filter_map(|(_, _, e)| e.clone())
655                        .collect();
656                    batch_resolve_primary_by_ext_id(primary, &primary_ext_ids, label, &ext_ids)
657                        .await
658                } else {
659                    HashMap::new()
660                };
661
662                // Per-label fork-point baseline (merge mode only).
663                let label_baseline = baseline.and_then(|b| b.ext.get(label));
664
665                // Partition: ext_id matches become in-place upserts; every
666                // other candidate flows through the content-UID
667                // insert-or-skip path (unchanged legacy behavior).
668                let mut uid_candidates: Vec<(UniId, Properties)> =
669                    Vec::with_capacity(candidates.len());
670                for (uid, props, ext_id) in candidates {
671                    let resolved = ext_id
672                        .as_ref()
673                        .and_then(|e| ext_resolved.get(e).map(|r| (e.clone(), r)));
674                    let Some((eid, (pvid, pprops))) = resolved else {
675                        uid_candidates.push((uid, props));
676                        continue;
677                    };
678                    match label_baseline.and_then(|m| m.get(&eid)) {
679                        // Baseline-aware merge (with_merge): reconcile the
680                        // fork value `props` against primary-now `pprops` and
681                        // the fork-point baseline `b`.
682                        Some(b) => {
683                            if props == *pprops {
684                                // Already converged — keeps re-promote
685                                // idempotent. Must be checked first.
686                                report.vertices_skipped_no_op += 1;
687                            } else if props == *b {
688                                // Fork left this vertex untouched since the
689                                // fork point — never revert primary's edit.
690                                report.vertices_skipped_no_op += 1;
691                            } else if *pprops != *b {
692                                // Both sides moved off baseline → conflict.
693                                report.vertices_conflicting += 1;
694                                if options.on_conflict == ConflictPolicy::Overwrite {
695                                    primary_tx
696                                        .update_vertex_properties(label, *pvid, props)
697                                        .await?;
698                                    report.vertices_updated += 1;
699                                }
700                            } else {
701                                // Only the fork changed → clean fast-forward.
702                                primary_tx
703                                    .update_vertex_properties(label, *pvid, props)
704                                    .await?;
705                                report.vertices_updated += 1;
706                            }
707                        }
708                        // No baseline for this ext_id: fork-wins upsert.
709                        None => {
710                            if props == *pprops {
711                                report.vertices_skipped_no_op += 1;
712                            } else {
713                                primary_tx
714                                    .update_vertex_properties(label, *pvid, props)
715                                    .await?;
716                                report.vertices_updated += 1;
717                            }
718                        }
719                    }
720                }
721
722                // Batch-resolve the remaining candidates by content-UID.
723                // Two queries total per pattern (UidIndex.resolve_uids +
724                // Cypher IN-clause verify) instead of 2N. `degraded` (M5)
725                // signals the resolve could not confirm presence.
726                let uids_to_check: Vec<UniId> = uid_candidates.iter().map(|(u, _)| *u).collect();
727                let (on_primary, degraded) =
728                    batch_resolve_primary_vids(primary, &primary_storage, label, &uids_to_check)
729                        .await;
730
731                let mut to_insert: Vec<Properties> = Vec::with_capacity(uid_candidates.len());
732                let mut insert_uids: Vec<UniId> = Vec::with_capacity(uid_candidates.len());
733                for (uid, props) in uid_candidates {
734                    if on_primary.contains_key(&uid) {
735                        report.vertices_skipped_uid_conflict += 1;
736                    } else {
737                        to_insert.push(props);
738                        insert_uids.push(uid);
739                    }
740                }
741
742                if !to_insert.is_empty() {
743                    let n = to_insert.len();
744                    let vids = primary_tx.bulk_insert_vertices(label, to_insert).await?;
745                    for (uid, vid) in insert_uids.into_iter().zip(vids) {
746                        just_inserted.insert((label.clone(), uid), vid);
747                    }
748                    report.vertices_inserted += n;
749                    report.per_pattern_inserted[idx] = n;
750                    // M5: presence could not be confirmed for this batch, so
751                    // some of these inserts may be duplicates of existing
752                    // primary rows. Surface it instead of silently dup'ing.
753                    if degraded {
754                        report.vertices_inserted_unverified += n;
755                        warn!(
756                            label = %label,
757                            count = n,
758                            "promote inserted vertices whose primary presence could not be \
759                             confirmed (resolve degraded); they may be duplicates"
760                        );
761                    }
762                }
763            }
764            PromotePattern::Edge {
765                edge_type,
766                where_clause,
767            } => {
768                any_edge_pattern = true;
769                let cypher = match where_clause {
770                    Some(w) => format!(
771                        "MATCH (a)-[r:`{}`]->(b) WHERE {} RETURN a, r, b",
772                        escape_backticks(edge_type),
773                        w
774                    ),
775                    None => format!(
776                        "MATCH (a)-[r:`{}`]->(b) RETURN a, r, b",
777                        escape_backticks(edge_type)
778                    ),
779                };
780
781                let result = fork.query(&cypher).await?;
782                if result.rows().is_empty() {
783                    continue;
784                }
785
786                use uni_store::storage::main_edge::MainEdgeDataset;
787
788                // First pass: extract every fork edge into a typed
789                // record so we can batch-resolve endpoints and
790                // pre-fetch primary parallel edges in one shot each.
791                struct ForkEdgeRow {
792                    a_label: String,
793                    b_label: String,
794                    src_uid: UniId,
795                    dst_uid: UniId,
796                    edge_uid: UniId,
797                    edge_props: Properties,
798                }
799                let mut fork_edges: Vec<ForkEdgeRow> = Vec::with_capacity(result.rows().len());
800                for row in result.rows() {
801                    let (Some(Value::Edge(edge)), Some(Value::Node(a)), Some(Value::Node(b))) =
802                        (row.value("r"), row.value("a"), row.value("b"))
803                    else {
804                        continue;
805                    };
806                    let a_label = match a.labels.first() {
807                        Some(l) => l.clone(),
808                        None => continue,
809                    };
810                    let b_label = match b.labels.first() {
811                        Some(l) => l.clone(),
812                        None => continue,
813                    };
814                    let src_uid = VertexDataset::compute_vertex_uid(
815                        &a_label,
816                        ext_id_for(&fork_ext_ids, a.vid),
817                        &a.properties,
818                    );
819                    let dst_uid = VertexDataset::compute_vertex_uid(
820                        &b_label,
821                        ext_id_for(&fork_ext_ids, b.vid),
822                        &b.properties,
823                    );
824                    let edge_uid = MainEdgeDataset::compute_edge_uid(
825                        &src_uid,
826                        &dst_uid,
827                        edge_type,
828                        &edge.properties,
829                    );
830                    fork_edges.push(ForkEdgeRow {
831                        a_label,
832                        b_label,
833                        src_uid,
834                        dst_uid,
835                        edge_uid,
836                        edge_props: edge.properties.clone(),
837                    });
838                }
839
840                // Group endpoints by label so we can batch-resolve
841                // each label's UIDs in a single round-trip.
842                let mut to_resolve: HashMap<String, HashSet<UniId>> = HashMap::new();
843                for fe in &fork_edges {
844                    if !just_inserted.contains_key(&(fe.a_label.clone(), fe.src_uid)) {
845                        to_resolve
846                            .entry(fe.a_label.clone())
847                            .or_default()
848                            .insert(fe.src_uid);
849                    }
850                    if !just_inserted.contains_key(&(fe.b_label.clone(), fe.dst_uid)) {
851                        to_resolve
852                            .entry(fe.b_label.clone())
853                            .or_default()
854                            .insert(fe.dst_uid);
855                    }
856                }
857                let mut endpoint_resolved: HashMap<(String, UniId), Vid> = HashMap::new();
858                for (lbl, uid_set) in to_resolve {
859                    let uid_vec: Vec<UniId> = uid_set.into_iter().collect();
860                    let (resolved, _degraded) =
861                        batch_resolve_primary_vids(primary, &primary_storage, &lbl, &uid_vec).await;
862                    for (uid, vid) in resolved {
863                        endpoint_resolved.insert((lbl.clone(), uid), vid);
864                    }
865                }
866                // Seed with just_inserted cache hits.
867                for ((lbl, uid), vid) in just_inserted.iter() {
868                    endpoint_resolved.insert((lbl.clone(), *uid), *vid);
869                }
870
871                // Pre-fetch primary's parallel edges for dedup: one
872                // query covering every (src_vid, dst_vid) pair across
873                // all resolved fork edges. Hash by computed edge UID.
874                let mut resolved_pairs: HashSet<(Vid, Vid)> = HashSet::new();
875                for fe in &fork_edges {
876                    let s = endpoint_resolved.get(&(fe.a_label.clone(), fe.src_uid));
877                    let d = endpoint_resolved.get(&(fe.b_label.clone(), fe.dst_uid));
878                    if let (Some(s), Some(d)) = (s, d) {
879                        resolved_pairs.insert((*s, *d));
880                    }
881                }
882                let mut primary_edge_uids: HashSet<UniId> = HashSet::new();
883                if !resolved_pairs.is_empty() {
884                    let src_vids: HashSet<u64> =
885                        resolved_pairs.iter().map(|(s, _)| s.as_u64()).collect();
886                    let dst_vids: HashSet<u64> =
887                        resolved_pairs.iter().map(|(_, d)| d.as_u64()).collect();
888                    let src_list: Vec<String> = src_vids.iter().map(|v| v.to_string()).collect();
889                    let dst_list: Vec<String> = dst_vids.iter().map(|v| v.to_string()).collect();
890                    let dedup_cypher = format!(
891                        "MATCH (a)-[r:`{}`]->(b) \
892                         WHERE id(a) IN [{}] AND id(b) IN [{}] \
893                         RETURN a, r, b",
894                        escape_backticks(edge_type),
895                        src_list.join(", "),
896                        dst_list.join(", "),
897                    );
898                    if let Ok(rs) = primary.query(&dedup_cypher).await {
899                        for row in rs.rows() {
900                            let (
901                                Some(Value::Edge(existing)),
902                                Some(Value::Node(ea)),
903                                Some(Value::Node(eb)),
904                            ) = (row.value("r"), row.value("a"), row.value("b"))
905                            else {
906                                continue;
907                            };
908                            let ea_label = ea.labels.first().cloned().unwrap_or_default();
909                            let eb_label = eb.labels.first().cloned().unwrap_or_default();
910                            let esrc = VertexDataset::compute_vertex_uid(
911                                &ea_label,
912                                ext_id_for(&primary_ext_ids, ea.vid),
913                                &ea.properties,
914                            );
915                            let edst = VertexDataset::compute_vertex_uid(
916                                &eb_label,
917                                ext_id_for(&primary_ext_ids, eb.vid),
918                                &eb.properties,
919                            );
920                            let euid = MainEdgeDataset::compute_edge_uid(
921                                &esrc,
922                                &edst,
923                                edge_type,
924                                &existing.properties,
925                            );
926                            primary_edge_uids.insert(euid);
927                        }
928                    }
929                }
930
931                // Second pass: classify each fork edge against the
932                // resolved endpoints and primary edge-UID set. Edges
933                // are accumulated and bulk-inserted in one call.
934                let mut edges_to_insert: Vec<(Vid, Vid, Properties)> =
935                    Vec::with_capacity(fork_edges.len());
936                let mut pattern_inserted = 0usize;
937                for fe in fork_edges {
938                    let src_vid = endpoint_resolved
939                        .get(&(fe.a_label.clone(), fe.src_uid))
940                        .copied();
941                    let dst_vid = endpoint_resolved
942                        .get(&(fe.b_label.clone(), fe.dst_uid))
943                        .copied();
944                    let (src_vid, dst_vid) = match (src_vid, dst_vid) {
945                        (Some(s), Some(d)) => (s, d),
946                        _ => {
947                            report.edges_skipped_no_endpoint += 1;
948                            continue;
949                        }
950                    };
951                    if primary_edge_uids.contains(&fe.edge_uid) {
952                        report.edges_skipped_duplicate += 1;
953                        continue;
954                    }
955                    edges_to_insert.push((src_vid, dst_vid, fe.edge_props));
956                    pattern_inserted += 1;
957                }
958                if !edges_to_insert.is_empty() {
959                    let n = edges_to_insert.len();
960                    primary_tx
961                        .bulk_insert_edges(edge_type, edges_to_insert)
962                        .await?;
963                    report.edges_inserted += n;
964                }
965                report.per_pattern_inserted[idx] = pattern_inserted;
966            }
967        }
968    }
969
970    // Delete-promotion (M4): a vertex present at the fork point but removed
971    // on the fork is deleted on primary. Opt-in and ext_id-keyed. We scan
972    // the FULL fork label (ignoring per-pattern where-clauses, which select
973    // which present rows to *promote*, not which to keep), so a filtered-out
974    // but still-present fork row is never read as a deletion. A row primary
975    // added after the fork point is absent from the baseline and so is never
976    // a delete candidate — the anti-spurious-delete guarantee. Runs after
977    // the pattern loop so vertex deletes are issued last in tx order.
978    if options.delete_promotion
979        && let Some(baseline) = baseline
980    {
981        let mut del_labels: Vec<&str> = patterns
982            .iter()
983            .filter(|p| !p.is_edge())
984            .map(|p| p.label_name())
985            .collect();
986        del_labels.sort_unstable();
987        del_labels.dedup();
988
989        for label in del_labels {
990            let cypher = format!("MATCH (n:`{}`) RETURN n", escape_backticks(label));
991            let result = fork.query(&cypher).await?;
992            let mut fork_now_ext: HashSet<String> = HashSet::new();
993            let mut fork_now_noext: HashSet<UniId> = HashSet::new();
994            for row in result.rows() {
995                if let Some(Value::Node(node)) = row.value("n") {
996                    match ext_id_for(&fork_ext_ids, node.vid) {
997                        Some(eid) if !eid.is_empty() => {
998                            fork_now_ext.insert(eid.to_string());
999                        }
1000                        _ => {
1001                            fork_now_noext.insert(VertexDataset::compute_vertex_uid(
1002                                label,
1003                                None,
1004                                &node.properties,
1005                            ));
1006                        }
1007                    }
1008                }
1009            }
1010
1011            // ext_id rows present at the fork point, absent on the fork now.
1012            if let Some(base_ext) = baseline.ext.get(label) {
1013                let deleted_ext: HashSet<String> = base_ext
1014                    .keys()
1015                    .filter(|eid| !fork_now_ext.contains(*eid))
1016                    .cloned()
1017                    .collect();
1018                if !deleted_ext.is_empty() {
1019                    // Resolve against primary NOW; delete only those still
1020                    // present (idempotent if primary already removed them).
1021                    let resolved = batch_resolve_primary_by_ext_id(
1022                        primary,
1023                        &primary_ext_ids,
1024                        label,
1025                        &deleted_ext,
1026                    )
1027                    .await;
1028                    for (_eid, (pvid, _props)) in resolved {
1029                        primary_tx.delete_vertex(label, pvid).await?;
1030                        report.vertices_deleted += 1;
1031                    }
1032                }
1033            }
1034
1035            // Non-ext_id fork-point rows that vanished can't be safely
1036            // delete-promoted (no stable identity); surface the count.
1037            if let Some(base_noext) = baseline.no_ext.get(label) {
1038                let gone = base_noext
1039                    .iter()
1040                    .filter(|u| !fork_now_noext.contains(*u))
1041                    .count();
1042                report.vertices_skipped_no_ext_id_for_delete += gone;
1043            }
1044        }
1045    }
1046
1047    // When the call contains no edge patterns, surface incidental edges
1048    // on the fork so callers see they exist (and weren't promoted).
1049    if !any_edge_pattern {
1050        let mut edge_seen = 0usize;
1051        for et in fork.schema().schema().edge_types.keys() {
1052            let cypher = format!(
1053                "MATCH ()-[r:`{}`]->() RETURN count(r) AS c",
1054                escape_backticks(et)
1055            );
1056            if let Ok(rs) = fork.query(&cypher).await
1057                && let Some(row) = rs.rows().first()
1058                && let Ok(c) = row.get::<i64>("c")
1059            {
1060                edge_seen += c as usize;
1061            }
1062        }
1063        if edge_seen > 0 {
1064            report.edges_skipped = edge_seen;
1065            warn!(
1066                target: "uni::promote",
1067                edges_skipped = edge_seen,
1068                "promote_from_fork: fork contains {} edges; pass \
1069                 PromotePattern::edge_type(...) to promote them",
1070                edge_seen
1071            );
1072        }
1073    }
1074
1075    Ok(report)
1076}