Skip to main content

uni_fork/
diff.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Fork diff & promote engine (Phase 6+).
5//!
6//! `compute_diff` computes the structural delta between two views; `run_promote`
7//! scans a fork for matched rows and bulk-inserts them onto primary. Both are
8//! generic over the [`ForkQueryHost`] / [`ForkPromoteSink`] host traits that
9//! uni-db implements for its `Session`/`Transaction` types.
10
11// ============================================================================
12// Diff engine
13// ============================================================================
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use tracing::warn;
18use uni_common::Properties;
19use uni_common::Result;
20use uni_common::Value;
21use uni_common::core::id::{UniId, Vid};
22
23use crate::host::{ForkPromoteSink, ForkQueryHost};
24use crate::types::{
25    DiffEdge, DiffVertex, EdgeDiff, ForkDiff, PromotePattern, PromoteReport, PropertyChange,
26    VertexDiff, VertexPropertyChange,
27};
28
29/// Compute the structural delta between two views.
30///
31/// Both `a` and `b` may be primary or forked sessions. The convention is
32/// *forward*: returned `ForkDiff.vertices.added` is rows present in `b`
33/// but not `a`; `deleted` is rows in `a` but not `b`.
34///
35/// Identity is content-addressed UID for vertices and `(src_uid,
36/// dst_uid)` for edges, scoped by edge type — so two unrelated forks
37/// with overlapping VIDs but distinct content pair correctly.
38pub async fn compute_diff<Q: ForkQueryHost + ?Sized>(a: &Q, b: &Q) -> Result<ForkDiff> {
39    let mut diff = ForkDiff::default();
40
41    let labels_a: HashSet<String> = a.schema().schema().labels.keys().cloned().collect();
42    let labels_b: HashSet<String> = b.schema().schema().labels.keys().cloned().collect();
43    let labels_union: Vec<&String> = labels_a.union(&labels_b).collect();
44
45    for label in labels_union {
46        let rows_a = scan_label_nodes(a, label).await?;
47        let rows_b = scan_label_nodes(b, label).await?;
48        diff_label(label, rows_a, rows_b, &mut diff.vertices);
49    }
50
51    let edges_a: HashSet<String> = a.schema().schema().edge_types.keys().cloned().collect();
52    let edges_b: HashSet<String> = b.schema().schema().edge_types.keys().cloned().collect();
53    let edges_union: Vec<&String> = edges_a.union(&edges_b).collect();
54
55    for edge_type in edges_union {
56        let rows_a = scan_edge_type(a, edge_type).await?;
57        let rows_b = scan_edge_type(b, edge_type).await?;
58        diff_edge_type(edge_type, rows_a, rows_b, &mut diff.edges);
59    }
60
61    Ok(diff)
62}
63
64/// One bucketed vertex row keyed by content UID.
65type VertexBucket = HashMap<UniId, VertexRow>;
66/// One bucketed edge row keyed by content-addressed edge UID
67/// (`compute_edge_uid(src_uid, dst_uid, type, properties)`). Two
68/// parallel edges between the same endpoints with different property
69/// bags hash to different keys and therefore appear as distinct
70/// entries — that's the Phase 7d multi-edge semantics.
71type EdgeBucket = HashMap<UniId, EdgeRow>;
72
73#[derive(Debug, Clone)]
74struct VertexRow {
75    label: String,
76    vid: Vid,
77    properties: Properties,
78}
79
80#[derive(Debug, Clone)]
81struct EdgeRow {
82    src_uid: UniId,
83    dst_uid: UniId,
84    properties: Properties,
85}
86
87async fn scan_label_nodes<Q: ForkQueryHost + ?Sized>(s: &Q, label: &str) -> Result<VertexBucket> {
88    use uni_store::storage::vertex::VertexDataset;
89    let cypher = format!("MATCH (n:`{}`) RETURN n", escape_backticks(label));
90    let result = s.query(&cypher).await?;
91    let mut bucket = VertexBucket::new();
92    for row in result.rows() {
93        let Some(Value::Node(node)) = row.value("n") else {
94            continue;
95        };
96        // The MATCH already filters to nodes carrying `label`, so the
97        // bucketed row's label is always `label`.
98        let uid = VertexDataset::compute_vertex_uid(label, None, &node.properties);
99        bucket.insert(
100            uid,
101            VertexRow {
102                label: label.to_string(),
103                vid: node.vid,
104                properties: node.properties.clone(),
105            },
106        );
107    }
108    Ok(bucket)
109}
110
111async fn scan_edge_type<Q: ForkQueryHost + ?Sized>(s: &Q, edge_type: &str) -> Result<EdgeBucket> {
112    use uni_store::storage::main_edge::MainEdgeDataset;
113    use uni_store::storage::vertex::VertexDataset;
114    let cypher = format!(
115        "MATCH (a)-[r:`{}`]->(b) RETURN a, r, b",
116        escape_backticks(edge_type)
117    );
118    let result = s.query(&cypher).await?;
119    let mut bucket = EdgeBucket::new();
120    for row in result.rows() {
121        let (Some(Value::Edge(edge)), Some(Value::Node(a)), Some(Value::Node(b))) =
122            (row.value("r"), row.value("a"), row.value("b"))
123        else {
124            continue;
125        };
126        let a_label = a.labels.first().cloned().unwrap_or_default();
127        let b_label = b.labels.first().cloned().unwrap_or_default();
128        let src_uid = VertexDataset::compute_vertex_uid(&a_label, None, &a.properties);
129        let dst_uid = VertexDataset::compute_vertex_uid(&b_label, None, &b.properties);
130        let edge_uid =
131            MainEdgeDataset::compute_edge_uid(&src_uid, &dst_uid, edge_type, &edge.properties);
132        bucket.insert(
133            edge_uid,
134            EdgeRow {
135                src_uid,
136                dst_uid,
137                properties: edge.properties.clone(),
138            },
139        );
140    }
141    Ok(bucket)
142}
143
144/// Split two content-keyed buckets into *added* (present in `b`, not `a`)
145/// and *deleted* (present in `a`, not `b`) rows, moving each row out of its
146/// owning map via the supplied builders. Returns the rows shared by both
147/// buckets (`(uid, row_a, row_b)`) so the caller can diff their properties.
148fn partition_added_deleted<R, A, D>(
149    mut a: HashMap<UniId, R>,
150    mut b: HashMap<UniId, R>,
151    mut mk_added: A,
152    mut mk_deleted: D,
153) -> Vec<(UniId, R, R)>
154where
155    A: FnMut(UniId, R),
156    D: FnMut(UniId, R),
157{
158    let keys_a: HashSet<UniId> = a.keys().copied().collect();
159    let keys_b: HashSet<UniId> = b.keys().copied().collect();
160
161    let mut common = Vec::new();
162    for uid in &keys_b {
163        if !keys_a.contains(uid) {
164            mk_added(*uid, b.remove(uid).expect("key from keys_b"));
165        }
166    }
167    for uid in &keys_a {
168        match keys_b.contains(uid) {
169            true => {
170                let row_a = a.remove(uid).expect("key from keys_a");
171                let row_b = b.remove(uid).expect("shared key in b");
172                common.push((*uid, row_a, row_b));
173            }
174            false => mk_deleted(*uid, a.remove(uid).expect("key from keys_a")),
175        }
176    }
177    common
178}
179
180fn diff_label(label: &str, a: VertexBucket, b: VertexBucket, out: &mut VertexDiff) {
181    let common = partition_added_deleted(
182        a,
183        b,
184        |uid, row| {
185            out.added.push(DiffVertex {
186                label: row.label,
187                uid,
188                vid: Some(row.vid),
189                properties: row.properties,
190            });
191        },
192        |uid, row| {
193            out.deleted.push(DiffVertex {
194                label: row.label,
195                uid,
196                vid: Some(row.vid),
197                properties: row.properties,
198            });
199        },
200    );
201    for (uid, row_a, row_b) in common {
202        let changes = property_changes(&row_a.properties, &row_b.properties);
203        if !changes.is_empty() {
204            out.changed.push(VertexPropertyChange {
205                label: label.to_string(),
206                uid,
207                changes,
208            });
209        }
210    }
211}
212
213fn diff_edge_type(edge_type: &str, a: EdgeBucket, b: EdgeBucket, out: &mut EdgeDiff) {
214    // Note: under content-addressed identity, two edges with the same
215    // edge_uid have, by construction, identical (src, dst, type,
216    // properties) — so the shared (intersection) rows cannot contain a
217    // property difference. The `changed` branch is intentionally
218    // unreachable under multi-edge semantics; property mutations surface
219    // as added+deleted of distinct edge UIDs. `EdgePropertyChange` remains
220    // in the public API for forward compatibility with a future identity
221    // model that anchors on a stable edge id. We therefore discard the
222    // common rows.
223    partition_added_deleted(
224        a,
225        b,
226        |edge_uid, row| {
227            out.added.push(DiffEdge {
228                edge_type: edge_type.to_string(),
229                edge_uid,
230                src_uid: row.src_uid,
231                dst_uid: row.dst_uid,
232                properties: row.properties,
233            });
234        },
235        |edge_uid, row| {
236            out.deleted.push(DiffEdge {
237                edge_type: edge_type.to_string(),
238                edge_uid,
239                src_uid: row.src_uid,
240                dst_uid: row.dst_uid,
241                properties: row.properties,
242            });
243        },
244    );
245}
246
247fn property_changes(a: &Properties, b: &Properties) -> Vec<PropertyChange> {
248    let mut changes = Vec::new();
249    let keys: HashSet<&String> = a.keys().chain(b.keys()).collect();
250    let mut sorted: Vec<&String> = keys.into_iter().collect();
251    sorted.sort();
252    for k in sorted {
253        let va = a.get(k);
254        let vb = b.get(k);
255        if va != vb {
256            changes.push(PropertyChange {
257                key: k.clone(),
258                before: va.cloned(),
259                after: vb.cloned(),
260            });
261        }
262    }
263    changes
264}
265
266fn escape_backticks(s: &str) -> String {
267    s.replace('`', "``")
268}
269
270/// Resolve a set of UIDs to their primary VIDs in two queries
271/// regardless of the input size.
272///
273/// Returns a `HashMap<UniId, Vid>` containing only those UIDs that
274/// successfully resolve to a *primary* VID (i.e., a candidate VID
275/// from the shared `UidIndex` is actually present in primary's view
276/// of the label's vertex table). UIDs absent from the result map
277/// either had no candidate registered or all candidates pointed at
278/// fork-only rows.
279///
280/// Two queries per call regardless of `uids.len()`: one IN-filter
281/// scan of `UidIndex`'s dataset (collecting **all** registered VIDs
282/// per UID — `UidIndex::resolve_uids` collapses to one VID per UID
283/// which loses fork/primary disambiguation), and one primary Cypher
284/// MATCH with an `id(n) IN [...]` predicate to confirm which
285/// candidates live on primary.
286async fn batch_resolve_primary_vids<Q: ForkQueryHost + ?Sized>(
287    primary: &Q,
288    primary_storage: &Arc<uni_store::storage::manager::StorageManager>,
289    label: &str,
290    uids: &[UniId],
291) -> HashMap<UniId, Vid> {
292    // NOTE: every error path below degrades to whatever has been
293    // resolved so far (an empty or partial map) rather than
294    // propagating. This is deliberate: `run_promote` treats an
295    // unresolved UID as "not present on primary" and inserts it, so a
296    // transient resolve failure must not abort the promote. Changing
297    // this to propagate would alter promote semantics.
298    let mut out: HashMap<UniId, Vid> = HashMap::new();
299    if uids.is_empty() {
300        return out;
301    }
302    // Collect *all* candidate VIDs per UID by scanning the shared
303    // UidIndex with an IN filter. The shared index is not
304    // branch-isolated, so a single UID may have a fork-only VID and
305    // a primary VID both registered — we keep both and let the
306    // primary Cypher MATCH below decide which is real.
307    let candidates_per_uid: HashMap<UniId, Vec<Vid>> = match primary_storage.uid_index(label).ok() {
308        Some(uix) => match resolve_all_candidate_vids(&uix, uids).await {
309            Ok(m) => m,
310            Err(_) => return out,
311        },
312        None => return out,
313    };
314    if candidates_per_uid.is_empty() {
315        return out;
316    }
317    // Single Cypher with IN clause over every candidate VID across
318    // every UID. Primary's branched backend filters out fork-only
319    // VIDs naturally — they have no row in the primary view.
320    let vid_set: HashSet<u64> = candidates_per_uid
321        .values()
322        .flat_map(|vs| vs.iter().map(|v| v.as_u64()))
323        .collect();
324    let vid_list: Vec<String> = vid_set.iter().map(|v| v.to_string()).collect();
325    let cypher = format!(
326        "MATCH (n:`{}`) WHERE id(n) IN [{}] RETURN id(n) AS vid",
327        escape_backticks(label),
328        vid_list.join(", ")
329    );
330    let rs = match primary.query(&cypher).await {
331        Ok(rs) => rs,
332        Err(_) => return out,
333    };
334    let primary_vids: HashSet<u64> = rs
335        .rows()
336        .iter()
337        .filter_map(|row| row.get::<i64>("vid").ok())
338        .map(|v| v as u64)
339        .collect();
340    for (uid, vids) in candidates_per_uid {
341        // If *any* candidate VID for this UID lives on primary, the
342        // UID exists on primary. Pick the first such VID.
343        if let Some(vid) = vids
344            .into_iter()
345            .find(|v| primary_vids.contains(&v.as_u64()))
346        {
347            out.insert(uid, vid);
348        }
349    }
350    out
351}
352
353/// Scan `UidIndex`'s underlying dataset with an `_uid_hex IN (...)`
354/// filter and collect **every** VID registered for each UID — unlike
355/// `UidIndex::resolve_uids`, which collapses to one VID per UID via
356/// HashMap overwrite (losing fork-vs-primary disambiguation).
357async fn resolve_all_candidate_vids(
358    uix: &uni_store::storage::index::UidIndex,
359    uids: &[UniId],
360) -> uni_common::Result<HashMap<UniId, Vec<Vid>>> {
361    use arrow_array::Array;
362    use futures::TryStreamExt;
363
364    // Lance/DataFusion errors all wrap uniformly as `Internal`; the
365    // generic bound lets one helper cover the scan-builder and stream
366    // error types alike.
367    fn internal<E>(e: E) -> uni_common::UniError
368    where
369        E: std::error::Error + Send + Sync + 'static,
370    {
371        uni_common::UniError::Internal(anyhow::anyhow!(e))
372    }
373
374    let ds = uix.open().await.map_err(uni_common::UniError::Internal)?;
375    let hex_values: Vec<String> = uids.iter().map(uid_to_hex).collect();
376    let filter = format!(
377        "_uid_hex IN ({})",
378        hex_values
379            .iter()
380            .map(|h| format!("'{}'", h))
381            .collect::<Vec<_>>()
382            .join(", ")
383    );
384    let mut stream = ds
385        .scan()
386        .filter(&filter)
387        .map_err(internal)?
388        .project(&["_uid_hex", "_vid"])
389        .map_err(internal)?
390        .try_into_stream()
391        .await
392        .map_err(internal)?;
393
394    let hex_to_uid: HashMap<String, UniId> =
395        uids.iter().map(|uid| (uid_to_hex(uid), *uid)).collect();
396    let mut out: HashMap<UniId, Vec<Vid>> = HashMap::new();
397    while let Some(batch) = stream.try_next().await.map_err(internal)? {
398        let uid_hex_col = batch
399            .column_by_name("_uid_hex")
400            .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>())
401            .ok_or_else(|| {
402                uni_common::UniError::Internal(anyhow::anyhow!("Missing _uid_hex column"))
403            })?;
404        let vid_col = batch
405            .column_by_name("_vid")
406            .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
407            .ok_or_else(|| {
408                uni_common::UniError::Internal(anyhow::anyhow!("Missing _vid column"))
409            })?;
410        for i in 0..batch.num_rows() {
411            if uid_hex_col.is_null(i) {
412                continue;
413            }
414            let hex = uid_hex_col.value(i);
415            if let Some(&uid) = hex_to_uid.get(hex) {
416                out.entry(uid)
417                    .or_default()
418                    .push(Vid::from(vid_col.value(i)));
419            }
420        }
421    }
422    Ok(out)
423}
424
425fn uid_to_hex(uid: &UniId) -> String {
426    uid.as_bytes()
427        .iter()
428        .map(|b| format!("{:02x}", b))
429        .collect()
430}
431
432// ============================================================================
433// Promote engine
434// ============================================================================
435
436/// Scan a fork session for matches per pattern, then bulk-insert the
437/// matched vertices on primary (deduplicated by content-derived UID)
438/// and edges (deduplicated by `(src_uid, dst_uid, edge_type)`).
439///
440/// Edges whose endpoints don't exist on primary by UID are skipped and
441/// counted in `edges_skipped_no_endpoint` — promote the missing
442/// vertices first via a vertex pattern, then re-run.
443///
444/// If the call contains no edge patterns, incidental edges on the fork
445/// are counted in `edges_skipped` and a tracing warning is emitted.
446pub async fn run_promote<Q, S>(
447    fork: &Q,
448    primary: &Q,
449    primary_tx: &S,
450    patterns: &[PromotePattern],
451) -> Result<PromoteReport>
452where
453    Q: ForkQueryHost + ?Sized,
454    S: ForkPromoteSink + ?Sized,
455{
456    use uni_store::storage::vertex::VertexDataset;
457
458    let mut report = PromoteReport {
459        per_pattern_inserted: vec![0usize; patterns.len()],
460        ..Default::default()
461    };
462
463    let primary_storage = primary.storage();
464    let mut any_edge_pattern = false;
465    // Cache of vertices just promoted inside this call. Edge patterns
466    // check this before falling back to primary's UidIndex + Cypher
467    // verify — pending tx_l0 writes aren't visible to a primary
468    // Cypher round-trip until commit, so without this cache an edge
469    // pattern in the same call wouldn't see endpoints we just added.
470    let mut just_inserted: HashMap<(String, UniId), Vid> = HashMap::new();
471
472    for (idx, pattern) in patterns.iter().enumerate() {
473        match pattern {
474            PromotePattern::Vertex {
475                label,
476                where_clause,
477            } => {
478                let cypher = match where_clause {
479                    Some(w) => format!(
480                        "MATCH (n:`{}`) WHERE {} RETURN n",
481                        escape_backticks(label),
482                        w
483                    ),
484                    None => format!("MATCH (n:`{}`) RETURN n", escape_backticks(label)),
485                };
486
487                let result = fork.query(&cypher).await?;
488                if result.rows().is_empty() {
489                    continue;
490                }
491
492                // First pass: extract (uid, props) for every fork row,
493                // skipping rows already in the within-call cache.
494                let mut candidates: Vec<(UniId, Properties)> =
495                    Vec::with_capacity(result.rows().len());
496                for row in result.rows() {
497                    let Some(Value::Node(node)) = row.value("n") else {
498                        continue;
499                    };
500                    let uid = VertexDataset::compute_vertex_uid(label, None, &node.properties);
501                    if just_inserted.contains_key(&(label.clone(), uid)) {
502                        report.vertices_skipped_uid_conflict += 1;
503                        continue;
504                    }
505                    candidates.push((uid, node.properties.clone()));
506                }
507
508                // Batch-resolve every candidate UID against primary.
509                // Two queries total per pattern (UidIndex.resolve_uids
510                // + Cypher IN-clause verify) instead of 2N.
511                let uids_to_check: Vec<UniId> = candidates.iter().map(|(u, _)| *u).collect();
512                let on_primary =
513                    batch_resolve_primary_vids(primary, &primary_storage, label, &uids_to_check)
514                        .await;
515
516                let mut to_insert: Vec<Properties> = Vec::with_capacity(candidates.len());
517                let mut insert_uids: Vec<UniId> = Vec::with_capacity(candidates.len());
518                for (uid, props) in candidates {
519                    if on_primary.contains_key(&uid) {
520                        report.vertices_skipped_uid_conflict += 1;
521                    } else {
522                        to_insert.push(props);
523                        insert_uids.push(uid);
524                    }
525                }
526
527                if !to_insert.is_empty() {
528                    let n = to_insert.len();
529                    let vids = primary_tx.bulk_insert_vertices(label, to_insert).await?;
530                    for (uid, vid) in insert_uids.into_iter().zip(vids) {
531                        just_inserted.insert((label.clone(), uid), vid);
532                    }
533                    report.vertices_inserted += n;
534                    report.per_pattern_inserted[idx] = n;
535                }
536            }
537            PromotePattern::Edge {
538                edge_type,
539                where_clause,
540            } => {
541                any_edge_pattern = true;
542                let cypher = match where_clause {
543                    Some(w) => format!(
544                        "MATCH (a)-[r:`{}`]->(b) WHERE {} RETURN a, r, b",
545                        escape_backticks(edge_type),
546                        w
547                    ),
548                    None => format!(
549                        "MATCH (a)-[r:`{}`]->(b) RETURN a, r, b",
550                        escape_backticks(edge_type)
551                    ),
552                };
553
554                let result = fork.query(&cypher).await?;
555                if result.rows().is_empty() {
556                    continue;
557                }
558
559                use uni_store::storage::main_edge::MainEdgeDataset;
560
561                // First pass: extract every fork edge into a typed
562                // record so we can batch-resolve endpoints and
563                // pre-fetch primary parallel edges in one shot each.
564                struct ForkEdgeRow {
565                    a_label: String,
566                    b_label: String,
567                    src_uid: UniId,
568                    dst_uid: UniId,
569                    edge_uid: UniId,
570                    edge_props: Properties,
571                }
572                let mut fork_edges: Vec<ForkEdgeRow> = Vec::with_capacity(result.rows().len());
573                for row in result.rows() {
574                    let (Some(Value::Edge(edge)), Some(Value::Node(a)), Some(Value::Node(b))) =
575                        (row.value("r"), row.value("a"), row.value("b"))
576                    else {
577                        continue;
578                    };
579                    let a_label = match a.labels.first() {
580                        Some(l) => l.clone(),
581                        None => continue,
582                    };
583                    let b_label = match b.labels.first() {
584                        Some(l) => l.clone(),
585                        None => continue,
586                    };
587                    let src_uid = VertexDataset::compute_vertex_uid(&a_label, None, &a.properties);
588                    let dst_uid = VertexDataset::compute_vertex_uid(&b_label, None, &b.properties);
589                    let edge_uid = MainEdgeDataset::compute_edge_uid(
590                        &src_uid,
591                        &dst_uid,
592                        edge_type,
593                        &edge.properties,
594                    );
595                    fork_edges.push(ForkEdgeRow {
596                        a_label,
597                        b_label,
598                        src_uid,
599                        dst_uid,
600                        edge_uid,
601                        edge_props: edge.properties.clone(),
602                    });
603                }
604
605                // Group endpoints by label so we can batch-resolve
606                // each label's UIDs in a single round-trip.
607                let mut to_resolve: HashMap<String, HashSet<UniId>> = HashMap::new();
608                for fe in &fork_edges {
609                    if !just_inserted.contains_key(&(fe.a_label.clone(), fe.src_uid)) {
610                        to_resolve
611                            .entry(fe.a_label.clone())
612                            .or_default()
613                            .insert(fe.src_uid);
614                    }
615                    if !just_inserted.contains_key(&(fe.b_label.clone(), fe.dst_uid)) {
616                        to_resolve
617                            .entry(fe.b_label.clone())
618                            .or_default()
619                            .insert(fe.dst_uid);
620                    }
621                }
622                let mut endpoint_resolved: HashMap<(String, UniId), Vid> = HashMap::new();
623                for (lbl, uid_set) in to_resolve {
624                    let uid_vec: Vec<UniId> = uid_set.into_iter().collect();
625                    let resolved =
626                        batch_resolve_primary_vids(primary, &primary_storage, &lbl, &uid_vec).await;
627                    for (uid, vid) in resolved {
628                        endpoint_resolved.insert((lbl.clone(), uid), vid);
629                    }
630                }
631                // Seed with just_inserted cache hits.
632                for ((lbl, uid), vid) in just_inserted.iter() {
633                    endpoint_resolved.insert((lbl.clone(), *uid), *vid);
634                }
635
636                // Pre-fetch primary's parallel edges for dedup: one
637                // query covering every (src_vid, dst_vid) pair across
638                // all resolved fork edges. Hash by computed edge UID.
639                let mut resolved_pairs: HashSet<(Vid, Vid)> = HashSet::new();
640                for fe in &fork_edges {
641                    let s = endpoint_resolved.get(&(fe.a_label.clone(), fe.src_uid));
642                    let d = endpoint_resolved.get(&(fe.b_label.clone(), fe.dst_uid));
643                    if let (Some(s), Some(d)) = (s, d) {
644                        resolved_pairs.insert((*s, *d));
645                    }
646                }
647                let mut primary_edge_uids: HashSet<UniId> = HashSet::new();
648                if !resolved_pairs.is_empty() {
649                    let src_vids: HashSet<u64> =
650                        resolved_pairs.iter().map(|(s, _)| s.as_u64()).collect();
651                    let dst_vids: HashSet<u64> =
652                        resolved_pairs.iter().map(|(_, d)| d.as_u64()).collect();
653                    let src_list: Vec<String> = src_vids.iter().map(|v| v.to_string()).collect();
654                    let dst_list: Vec<String> = dst_vids.iter().map(|v| v.to_string()).collect();
655                    let dedup_cypher = format!(
656                        "MATCH (a)-[r:`{}`]->(b) \
657                         WHERE id(a) IN [{}] AND id(b) IN [{}] \
658                         RETURN a, r, b",
659                        escape_backticks(edge_type),
660                        src_list.join(", "),
661                        dst_list.join(", "),
662                    );
663                    if let Ok(rs) = primary.query(&dedup_cypher).await {
664                        for row in rs.rows() {
665                            let (
666                                Some(Value::Edge(existing)),
667                                Some(Value::Node(ea)),
668                                Some(Value::Node(eb)),
669                            ) = (row.value("r"), row.value("a"), row.value("b"))
670                            else {
671                                continue;
672                            };
673                            let ea_label = ea.labels.first().cloned().unwrap_or_default();
674                            let eb_label = eb.labels.first().cloned().unwrap_or_default();
675                            let esrc =
676                                VertexDataset::compute_vertex_uid(&ea_label, None, &ea.properties);
677                            let edst =
678                                VertexDataset::compute_vertex_uid(&eb_label, None, &eb.properties);
679                            let euid = MainEdgeDataset::compute_edge_uid(
680                                &esrc,
681                                &edst,
682                                edge_type,
683                                &existing.properties,
684                            );
685                            primary_edge_uids.insert(euid);
686                        }
687                    }
688                }
689
690                // Second pass: classify each fork edge against the
691                // resolved endpoints and primary edge-UID set. Edges
692                // are accumulated and bulk-inserted in one call.
693                let mut edges_to_insert: Vec<(Vid, Vid, Properties)> =
694                    Vec::with_capacity(fork_edges.len());
695                let mut pattern_inserted = 0usize;
696                for fe in fork_edges {
697                    let src_vid = endpoint_resolved
698                        .get(&(fe.a_label.clone(), fe.src_uid))
699                        .copied();
700                    let dst_vid = endpoint_resolved
701                        .get(&(fe.b_label.clone(), fe.dst_uid))
702                        .copied();
703                    let (src_vid, dst_vid) = match (src_vid, dst_vid) {
704                        (Some(s), Some(d)) => (s, d),
705                        _ => {
706                            report.edges_skipped_no_endpoint += 1;
707                            continue;
708                        }
709                    };
710                    if primary_edge_uids.contains(&fe.edge_uid) {
711                        report.edges_skipped_duplicate += 1;
712                        continue;
713                    }
714                    edges_to_insert.push((src_vid, dst_vid, fe.edge_props));
715                    pattern_inserted += 1;
716                }
717                if !edges_to_insert.is_empty() {
718                    let n = edges_to_insert.len();
719                    primary_tx
720                        .bulk_insert_edges(edge_type, edges_to_insert)
721                        .await?;
722                    report.edges_inserted += n;
723                }
724                report.per_pattern_inserted[idx] = pattern_inserted;
725            }
726        }
727    }
728
729    // When the call contains no edge patterns, surface incidental edges
730    // on the fork so callers see they exist (and weren't promoted).
731    if !any_edge_pattern {
732        let mut edge_seen = 0usize;
733        for et in fork.schema().schema().edge_types.keys() {
734            let cypher = format!(
735                "MATCH ()-[r:`{}`]->() RETURN count(r) AS c",
736                escape_backticks(et)
737            );
738            if let Ok(rs) = fork.query(&cypher).await
739                && let Some(row) = rs.rows().first()
740                && let Ok(c) = row.get::<i64>("c")
741            {
742                edge_seen += c as usize;
743            }
744        }
745        if edge_seen > 0 {
746            report.edges_skipped = edge_seen;
747            warn!(
748                target: "uni::promote",
749                edges_skipped = edge_seen,
750                "promote_from_fork: fork contains {} edges; pass \
751                 PromotePattern::edge_type(...) to promote them",
752                edge_seen
753            );
754        }
755    }
756
757    Ok(report)
758}