1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use tracing::warn;
18use uni_common::Properties;
19use uni_common::Result;
20use uni_common::Value;
21use uni_common::core::id::{UniId, Vid};
22
23use crate::host::{ForkPromoteSink, ForkQueryHost};
24use crate::types::{
25 ConflictPolicy, DiffEdge, DiffVertex, EdgeDiff, ForkDiff, PromoteBaseline, PromoteOptions,
26 PromotePattern, PromoteReport, PropertyChange, VertexDiff, VertexPropertyChange,
27};
28
29pub async fn compute_diff<Q: ForkQueryHost + ?Sized>(a: &Q, b: &Q) -> Result<ForkDiff> {
39 let mut diff = ForkDiff::default();
40
41 let ext_a = a.storage().get_vertex_ext_ids().await.unwrap_or_default();
44 let ext_b = b.storage().get_vertex_ext_ids().await.unwrap_or_default();
45
46 let labels_a: HashSet<String> = a.schema().schema().labels.keys().cloned().collect();
47 let labels_b: HashSet<String> = b.schema().schema().labels.keys().cloned().collect();
48 let labels_union: Vec<&String> = labels_a.union(&labels_b).collect();
49
50 for label in labels_union {
51 let rows_a = scan_label_nodes(a, label, &ext_a).await?;
52 let rows_b = scan_label_nodes(b, label, &ext_b).await?;
53 diff_label(label, rows_a, rows_b, &mut diff.vertices);
54 }
55
56 let edges_a: HashSet<String> = a.schema().schema().edge_types.keys().cloned().collect();
57 let edges_b: HashSet<String> = b.schema().schema().edge_types.keys().cloned().collect();
58 let edges_union: Vec<&String> = edges_a.union(&edges_b).collect();
59
60 for edge_type in edges_union {
61 let rows_a = scan_edge_type(a, edge_type, &ext_a).await?;
62 let rows_b = scan_edge_type(b, edge_type, &ext_b).await?;
63 diff_edge_type(edge_type, rows_a, rows_b, &mut diff.edges);
64 }
65
66 Ok(diff)
67}
68
69fn ext_id_for(map: &HashMap<Vid, String>, vid: Vid) -> Option<&str> {
82 map.get(&vid).map(String::as_str)
83}
84
85type VertexBucket = HashMap<UniId, VertexRow>;
87type EdgeBucket = HashMap<UniId, EdgeRow>;
93
94#[derive(Debug, Clone)]
95struct VertexRow {
96 label: String,
97 vid: Vid,
98 properties: Properties,
99}
100
101#[derive(Debug, Clone)]
102struct EdgeRow {
103 src_uid: UniId,
104 dst_uid: UniId,
105 properties: Properties,
106}
107
108async fn scan_label_nodes<Q: ForkQueryHost + ?Sized>(
109 s: &Q,
110 label: &str,
111 ext_ids: &HashMap<Vid, String>,
112) -> Result<VertexBucket> {
113 use uni_store::storage::vertex::VertexDataset;
114 let cypher = format!("MATCH (n:`{}`) RETURN n", escape_backticks(label));
115 let result = s.query(&cypher).await?;
116 let mut bucket = VertexBucket::new();
117 for row in result.rows() {
118 let Some(Value::Node(node)) = row.value("n") else {
119 continue;
120 };
121 let uid = VertexDataset::compute_vertex_uid(
125 label,
126 ext_id_for(ext_ids, node.vid),
127 &node.properties,
128 );
129 if bucket
130 .insert(
131 uid,
132 VertexRow {
133 label: label.to_string(),
134 vid: node.vid,
135 properties: node.properties.clone(),
136 },
137 )
138 .is_some()
139 {
140 warn!(
144 label,
145 vid = node.vid.as_u64(),
146 "fork diff: vertex content-UID collision; a row is being shadowed"
147 );
148 }
149 }
150 Ok(bucket)
151}
152
153async fn scan_edge_type<Q: ForkQueryHost + ?Sized>(
154 s: &Q,
155 edge_type: &str,
156 ext_ids: &HashMap<Vid, String>,
157) -> Result<EdgeBucket> {
158 use uni_store::storage::main_edge::MainEdgeDataset;
159 use uni_store::storage::vertex::VertexDataset;
160 let cypher = format!(
161 "MATCH (a)-[r:`{}`]->(b) RETURN a, r, b",
162 escape_backticks(edge_type)
163 );
164 let result = s.query(&cypher).await?;
165 let mut bucket = EdgeBucket::new();
166 for row in result.rows() {
167 let (Some(Value::Edge(edge)), Some(Value::Node(a)), Some(Value::Node(b))) =
168 (row.value("r"), row.value("a"), row.value("b"))
169 else {
170 continue;
171 };
172 let a_label = a.labels.first().cloned().unwrap_or_default();
173 let b_label = b.labels.first().cloned().unwrap_or_default();
174 let src_uid =
175 VertexDataset::compute_vertex_uid(&a_label, ext_id_for(ext_ids, a.vid), &a.properties);
176 let dst_uid =
177 VertexDataset::compute_vertex_uid(&b_label, ext_id_for(ext_ids, b.vid), &b.properties);
178 let edge_uid =
179 MainEdgeDataset::compute_edge_uid(&src_uid, &dst_uid, edge_type, &edge.properties);
180 if bucket
181 .insert(
182 edge_uid,
183 EdgeRow {
184 src_uid,
185 dst_uid,
186 properties: edge.properties.clone(),
187 },
188 )
189 .is_some()
190 {
191 warn!(
192 edge_type,
193 "fork diff: edge content-UID collision; a row is being shadowed"
194 );
195 }
196 }
197 Ok(bucket)
198}
199
200fn partition_added_deleted<R, A, D>(
205 mut a: HashMap<UniId, R>,
206 mut b: HashMap<UniId, R>,
207 mut mk_added: A,
208 mut mk_deleted: D,
209) -> Vec<(UniId, R, R)>
210where
211 A: FnMut(UniId, R),
212 D: FnMut(UniId, R),
213{
214 let keys_a: HashSet<UniId> = a.keys().copied().collect();
215 let keys_b: HashSet<UniId> = b.keys().copied().collect();
216
217 let mut common = Vec::new();
218 for uid in &keys_b {
219 if !keys_a.contains(uid) {
220 mk_added(*uid, b.remove(uid).expect("key from keys_b"));
221 }
222 }
223 for uid in &keys_a {
224 match keys_b.contains(uid) {
225 true => {
226 let row_a = a.remove(uid).expect("key from keys_a");
227 let row_b = b.remove(uid).expect("shared key in b");
228 common.push((*uid, row_a, row_b));
229 }
230 false => mk_deleted(*uid, a.remove(uid).expect("key from keys_a")),
231 }
232 }
233 common
234}
235
236fn diff_label(label: &str, a: VertexBucket, b: VertexBucket, out: &mut VertexDiff) {
237 let common = partition_added_deleted(
238 a,
239 b,
240 |uid, row| {
241 out.added.push(DiffVertex {
242 label: row.label,
243 uid,
244 vid: Some(row.vid),
245 properties: row.properties,
246 });
247 },
248 |uid, row| {
249 out.deleted.push(DiffVertex {
250 label: row.label,
251 uid,
252 vid: Some(row.vid),
253 properties: row.properties,
254 });
255 },
256 );
257 for (uid, row_a, row_b) in common {
258 let changes = property_changes(&row_a.properties, &row_b.properties);
259 if !changes.is_empty() {
260 out.changed.push(VertexPropertyChange {
261 label: label.to_string(),
262 uid,
263 changes,
264 });
265 }
266 }
267}
268
269fn diff_edge_type(edge_type: &str, a: EdgeBucket, b: EdgeBucket, out: &mut EdgeDiff) {
270 partition_added_deleted(
280 a,
281 b,
282 |edge_uid, row| {
283 out.added.push(DiffEdge {
284 edge_type: edge_type.to_string(),
285 edge_uid,
286 src_uid: row.src_uid,
287 dst_uid: row.dst_uid,
288 properties: row.properties,
289 });
290 },
291 |edge_uid, row| {
292 out.deleted.push(DiffEdge {
293 edge_type: edge_type.to_string(),
294 edge_uid,
295 src_uid: row.src_uid,
296 dst_uid: row.dst_uid,
297 properties: row.properties,
298 });
299 },
300 );
301}
302
303fn property_changes(a: &Properties, b: &Properties) -> Vec<PropertyChange> {
304 let mut changes = Vec::new();
305 let keys: HashSet<&String> = a.keys().chain(b.keys()).collect();
306 let mut sorted: Vec<&String> = keys.into_iter().collect();
307 sorted.sort();
308 for k in sorted {
309 let va = a.get(k);
310 let vb = b.get(k);
311 if va != vb {
312 changes.push(PropertyChange {
313 key: k.clone(),
314 before: va.cloned(),
315 after: vb.cloned(),
316 });
317 }
318 }
319 changes
320}
321
322fn escape_backticks(s: &str) -> String {
323 s.replace('`', "``")
324}
325
326async fn batch_resolve_primary_vids<Q: ForkQueryHost + ?Sized>(
343 primary: &Q,
344 primary_storage: &Arc<uni_store::storage::manager::StorageManager>,
345 label: &str,
346 uids: &[UniId],
347) -> (HashMap<UniId, Vid>, bool) {
348 let mut out: HashMap<UniId, Vid> = HashMap::new();
357 if uids.is_empty() {
358 return (out, false);
359 }
360 let candidates_per_uid: HashMap<UniId, Vec<Vid>> = match primary_storage.uid_index(label).ok() {
366 Some(uix) => match resolve_all_candidate_vids(&uix, uids).await {
367 Ok(m) => m,
368 Err(_) => return (out, true),
369 },
370 None => return (out, true),
371 };
372 if candidates_per_uid.is_empty() {
373 return (out, false);
374 }
375 let vid_set: HashSet<u64> = candidates_per_uid
379 .values()
380 .flat_map(|vs| vs.iter().map(|v| v.as_u64()))
381 .collect();
382 let vid_list: Vec<String> = vid_set.iter().map(|v| v.to_string()).collect();
383 let cypher = format!(
384 "MATCH (n:`{}`) WHERE id(n) IN [{}] RETURN id(n) AS vid",
385 escape_backticks(label),
386 vid_list.join(", ")
387 );
388 let rs = match primary.query(&cypher).await {
389 Ok(rs) => rs,
390 Err(_) => return (out, true),
391 };
392 let primary_vids: HashSet<u64> = rs
393 .rows()
394 .iter()
395 .filter_map(|row| row.get::<i64>("vid").ok())
396 .map(|v| v as u64)
397 .collect();
398 for (uid, vids) in candidates_per_uid {
399 if let Some(vid) = vids
402 .into_iter()
403 .find(|v| primary_vids.contains(&v.as_u64()))
404 {
405 out.insert(uid, vid);
406 }
407 }
408 (out, false)
409}
410
411async fn batch_resolve_primary_by_ext_id<Q: ForkQueryHost + ?Sized>(
425 primary: &Q,
426 primary_ext_ids: &HashMap<Vid, String>,
427 label: &str,
428 ext_ids: &HashSet<String>,
429) -> HashMap<String, (Vid, Properties)> {
430 let mut out: HashMap<String, (Vid, Properties)> = HashMap::new();
431 if ext_ids.is_empty() {
432 return out;
433 }
434 let mut ext_to_vid: HashMap<String, Vid> = HashMap::new();
438 for (vid, eid) in primary_ext_ids {
439 if ext_ids.contains(eid) {
440 ext_to_vid.insert(eid.clone(), *vid);
441 }
442 }
443 if ext_to_vid.is_empty() {
444 return out;
445 }
446 let vid_list: Vec<String> = ext_to_vid
447 .values()
448 .map(|v| v.as_u64().to_string())
449 .collect();
450 let cypher = format!(
451 "MATCH (n:`{}`) WHERE id(n) IN [{}] RETURN id(n) AS vid, n AS node",
452 escape_backticks(label),
453 vid_list.join(", ")
454 );
455 let Ok(rs) = primary.query(&cypher).await else {
456 return out;
457 };
458 let mut vid_to_props: HashMap<u64, Properties> = HashMap::new();
459 for row in rs.rows() {
460 if let Ok(vid) = row.get::<i64>("vid")
461 && let Some(Value::Node(node)) = row.value("node")
462 {
463 vid_to_props.insert(vid as u64, node.properties.clone());
464 }
465 }
466 for (eid, vid) in ext_to_vid {
467 if let Some(props) = vid_to_props.get(&vid.as_u64()) {
468 out.insert(eid, (vid, props.clone()));
469 }
470 }
471 out
472}
473
474async fn resolve_all_candidate_vids(
479 uix: &uni_store::storage::index::UidIndex,
480 uids: &[UniId],
481) -> uni_common::Result<HashMap<UniId, Vec<Vid>>> {
482 use arrow_array::Array;
483 use futures::TryStreamExt;
484
485 fn internal<E>(e: E) -> uni_common::UniError
489 where
490 E: std::error::Error + Send + Sync + 'static,
491 {
492 uni_common::UniError::Internal(anyhow::anyhow!(e))
493 }
494
495 let ds = uix.open().await.map_err(uni_common::UniError::Internal)?;
496 let hex_values: Vec<String> = uids.iter().map(uid_to_hex).collect();
497 let filter = format!(
498 "_uid_hex IN ({})",
499 hex_values
500 .iter()
501 .map(|h| format!("'{}'", h))
502 .collect::<Vec<_>>()
503 .join(", ")
504 );
505 let mut stream = ds
506 .scan()
507 .filter(&filter)
508 .map_err(internal)?
509 .project(&["_uid_hex", "_vid"])
510 .map_err(internal)?
511 .try_into_stream()
512 .await
513 .map_err(internal)?;
514
515 let hex_to_uid: HashMap<String, UniId> =
516 uids.iter().map(|uid| (uid_to_hex(uid), *uid)).collect();
517 let mut out: HashMap<UniId, Vec<Vid>> = HashMap::new();
518 while let Some(batch) = stream.try_next().await.map_err(internal)? {
519 let uid_hex_col = batch
520 .column_by_name("_uid_hex")
521 .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>())
522 .ok_or_else(|| {
523 uni_common::UniError::Internal(anyhow::anyhow!("Missing _uid_hex column"))
524 })?;
525 let vid_col = batch
526 .column_by_name("_vid")
527 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
528 .ok_or_else(|| {
529 uni_common::UniError::Internal(anyhow::anyhow!("Missing _vid column"))
530 })?;
531 for i in 0..batch.num_rows() {
532 if uid_hex_col.is_null(i) {
533 continue;
534 }
535 let hex = uid_hex_col.value(i);
536 if let Some(&uid) = hex_to_uid.get(hex) {
537 out.entry(uid)
538 .or_default()
539 .push(Vid::from(vid_col.value(i)));
540 }
541 }
542 }
543 Ok(out)
544}
545
546fn uid_to_hex(uid: &UniId) -> String {
547 uid.as_bytes()
548 .iter()
549 .map(|b| format!("{:02x}", b))
550 .collect()
551}
552
553pub async fn run_promote<Q, S>(
568 fork: &Q,
569 primary: &Q,
570 primary_tx: &S,
571 patterns: &[PromotePattern],
572 options: &PromoteOptions,
573 baseline: Option<&PromoteBaseline>,
574) -> Result<PromoteReport>
575where
576 Q: ForkQueryHost + ?Sized,
577 S: ForkPromoteSink + ?Sized,
578{
579 use uni_store::storage::vertex::VertexDataset;
580
581 let mut report = PromoteReport {
582 per_pattern_inserted: vec![0usize; patterns.len()],
583 ..Default::default()
584 };
585
586 let primary_storage = primary.storage();
587 let fork_ext_ids = fork
590 .storage()
591 .get_vertex_ext_ids()
592 .await
593 .unwrap_or_default();
594 let primary_ext_ids = primary_storage
595 .get_vertex_ext_ids()
596 .await
597 .unwrap_or_default();
598 let mut any_edge_pattern = false;
599 let mut just_inserted: HashMap<(String, UniId), Vid> = HashMap::new();
605
606 for (idx, pattern) in patterns.iter().enumerate() {
607 match pattern {
608 PromotePattern::Vertex {
609 label,
610 where_clause,
611 } => {
612 let cypher = match where_clause {
613 Some(w) => format!(
614 "MATCH (n:`{}`) WHERE {} RETURN n",
615 escape_backticks(label),
616 w
617 ),
618 None => format!("MATCH (n:`{}`) RETURN n", escape_backticks(label)),
619 };
620
621 let result = fork.query(&cypher).await?;
622 if result.rows().is_empty() {
623 continue;
624 }
625
626 let mut candidates: Vec<(UniId, Properties, Option<String>)> =
629 Vec::with_capacity(result.rows().len());
630 for row in result.rows() {
631 let Some(Value::Node(node)) = row.value("n") else {
632 continue;
633 };
634 let ext_id = ext_id_for(&fork_ext_ids, node.vid).map(str::to_string);
635 let uid = VertexDataset::compute_vertex_uid(
636 label,
637 ext_id.as_deref(),
638 &node.properties,
639 );
640 if just_inserted.contains_key(&(label.clone(), uid)) {
641 report.vertices_skipped_uid_conflict += 1;
642 continue;
643 }
644 candidates.push((uid, node.properties.clone(), ext_id));
645 }
646
647 let ext_resolved: HashMap<String, (Vid, Properties)> = if options.upsert {
652 let ext_ids: HashSet<String> = candidates
653 .iter()
654 .filter_map(|(_, _, e)| e.clone())
655 .collect();
656 batch_resolve_primary_by_ext_id(primary, &primary_ext_ids, label, &ext_ids)
657 .await
658 } else {
659 HashMap::new()
660 };
661
662 let label_baseline = baseline.and_then(|b| b.ext.get(label));
664
665 let mut uid_candidates: Vec<(UniId, Properties)> =
669 Vec::with_capacity(candidates.len());
670 for (uid, props, ext_id) in candidates {
671 let resolved = ext_id
672 .as_ref()
673 .and_then(|e| ext_resolved.get(e).map(|r| (e.clone(), r)));
674 let Some((eid, (pvid, pprops))) = resolved else {
675 uid_candidates.push((uid, props));
676 continue;
677 };
678 match label_baseline.and_then(|m| m.get(&eid)) {
679 Some(b) => {
683 if props == *pprops {
684 report.vertices_skipped_no_op += 1;
687 } else if props == *b {
688 report.vertices_skipped_no_op += 1;
691 } else if *pprops != *b {
692 report.vertices_conflicting += 1;
694 if options.on_conflict == ConflictPolicy::Overwrite {
695 primary_tx
696 .update_vertex_properties(label, *pvid, props)
697 .await?;
698 report.vertices_updated += 1;
699 }
700 } else {
701 primary_tx
703 .update_vertex_properties(label, *pvid, props)
704 .await?;
705 report.vertices_updated += 1;
706 }
707 }
708 None => {
710 if props == *pprops {
711 report.vertices_skipped_no_op += 1;
712 } else {
713 primary_tx
714 .update_vertex_properties(label, *pvid, props)
715 .await?;
716 report.vertices_updated += 1;
717 }
718 }
719 }
720 }
721
722 let uids_to_check: Vec<UniId> = uid_candidates.iter().map(|(u, _)| *u).collect();
727 let (on_primary, degraded) =
728 batch_resolve_primary_vids(primary, &primary_storage, label, &uids_to_check)
729 .await;
730
731 let mut to_insert: Vec<Properties> = Vec::with_capacity(uid_candidates.len());
732 let mut insert_uids: Vec<UniId> = Vec::with_capacity(uid_candidates.len());
733 for (uid, props) in uid_candidates {
734 if on_primary.contains_key(&uid) {
735 report.vertices_skipped_uid_conflict += 1;
736 } else {
737 to_insert.push(props);
738 insert_uids.push(uid);
739 }
740 }
741
742 if !to_insert.is_empty() {
743 let n = to_insert.len();
744 let vids = primary_tx.bulk_insert_vertices(label, to_insert).await?;
745 for (uid, vid) in insert_uids.into_iter().zip(vids) {
746 just_inserted.insert((label.clone(), uid), vid);
747 }
748 report.vertices_inserted += n;
749 report.per_pattern_inserted[idx] = n;
750 if degraded {
754 report.vertices_inserted_unverified += n;
755 warn!(
756 label = %label,
757 count = n,
758 "promote inserted vertices whose primary presence could not be \
759 confirmed (resolve degraded); they may be duplicates"
760 );
761 }
762 }
763 }
764 PromotePattern::Edge {
765 edge_type,
766 where_clause,
767 } => {
768 any_edge_pattern = true;
769 let cypher = match where_clause {
770 Some(w) => format!(
771 "MATCH (a)-[r:`{}`]->(b) WHERE {} RETURN a, r, b",
772 escape_backticks(edge_type),
773 w
774 ),
775 None => format!(
776 "MATCH (a)-[r:`{}`]->(b) RETURN a, r, b",
777 escape_backticks(edge_type)
778 ),
779 };
780
781 let result = fork.query(&cypher).await?;
782 if result.rows().is_empty() {
783 continue;
784 }
785
786 use uni_store::storage::main_edge::MainEdgeDataset;
787
788 struct ForkEdgeRow {
792 a_label: String,
793 b_label: String,
794 src_uid: UniId,
795 dst_uid: UniId,
796 edge_uid: UniId,
797 edge_props: Properties,
798 }
799 let mut fork_edges: Vec<ForkEdgeRow> = Vec::with_capacity(result.rows().len());
800 for row in result.rows() {
801 let (Some(Value::Edge(edge)), Some(Value::Node(a)), Some(Value::Node(b))) =
802 (row.value("r"), row.value("a"), row.value("b"))
803 else {
804 continue;
805 };
806 let a_label = match a.labels.first() {
807 Some(l) => l.clone(),
808 None => continue,
809 };
810 let b_label = match b.labels.first() {
811 Some(l) => l.clone(),
812 None => continue,
813 };
814 let src_uid = VertexDataset::compute_vertex_uid(
815 &a_label,
816 ext_id_for(&fork_ext_ids, a.vid),
817 &a.properties,
818 );
819 let dst_uid = VertexDataset::compute_vertex_uid(
820 &b_label,
821 ext_id_for(&fork_ext_ids, b.vid),
822 &b.properties,
823 );
824 let edge_uid = MainEdgeDataset::compute_edge_uid(
825 &src_uid,
826 &dst_uid,
827 edge_type,
828 &edge.properties,
829 );
830 fork_edges.push(ForkEdgeRow {
831 a_label,
832 b_label,
833 src_uid,
834 dst_uid,
835 edge_uid,
836 edge_props: edge.properties.clone(),
837 });
838 }
839
840 let mut to_resolve: HashMap<String, HashSet<UniId>> = HashMap::new();
843 for fe in &fork_edges {
844 if !just_inserted.contains_key(&(fe.a_label.clone(), fe.src_uid)) {
845 to_resolve
846 .entry(fe.a_label.clone())
847 .or_default()
848 .insert(fe.src_uid);
849 }
850 if !just_inserted.contains_key(&(fe.b_label.clone(), fe.dst_uid)) {
851 to_resolve
852 .entry(fe.b_label.clone())
853 .or_default()
854 .insert(fe.dst_uid);
855 }
856 }
857 let mut endpoint_resolved: HashMap<(String, UniId), Vid> = HashMap::new();
858 for (lbl, uid_set) in to_resolve {
859 let uid_vec: Vec<UniId> = uid_set.into_iter().collect();
860 let (resolved, _degraded) =
861 batch_resolve_primary_vids(primary, &primary_storage, &lbl, &uid_vec).await;
862 for (uid, vid) in resolved {
863 endpoint_resolved.insert((lbl.clone(), uid), vid);
864 }
865 }
866 for ((lbl, uid), vid) in just_inserted.iter() {
868 endpoint_resolved.insert((lbl.clone(), *uid), *vid);
869 }
870
871 let mut resolved_pairs: HashSet<(Vid, Vid)> = HashSet::new();
875 for fe in &fork_edges {
876 let s = endpoint_resolved.get(&(fe.a_label.clone(), fe.src_uid));
877 let d = endpoint_resolved.get(&(fe.b_label.clone(), fe.dst_uid));
878 if let (Some(s), Some(d)) = (s, d) {
879 resolved_pairs.insert((*s, *d));
880 }
881 }
882 let mut primary_edge_uids: HashSet<UniId> = HashSet::new();
883 if !resolved_pairs.is_empty() {
884 let src_vids: HashSet<u64> =
885 resolved_pairs.iter().map(|(s, _)| s.as_u64()).collect();
886 let dst_vids: HashSet<u64> =
887 resolved_pairs.iter().map(|(_, d)| d.as_u64()).collect();
888 let src_list: Vec<String> = src_vids.iter().map(|v| v.to_string()).collect();
889 let dst_list: Vec<String> = dst_vids.iter().map(|v| v.to_string()).collect();
890 let dedup_cypher = format!(
891 "MATCH (a)-[r:`{}`]->(b) \
892 WHERE id(a) IN [{}] AND id(b) IN [{}] \
893 RETURN a, r, b",
894 escape_backticks(edge_type),
895 src_list.join(", "),
896 dst_list.join(", "),
897 );
898 if let Ok(rs) = primary.query(&dedup_cypher).await {
899 for row in rs.rows() {
900 let (
901 Some(Value::Edge(existing)),
902 Some(Value::Node(ea)),
903 Some(Value::Node(eb)),
904 ) = (row.value("r"), row.value("a"), row.value("b"))
905 else {
906 continue;
907 };
908 let ea_label = ea.labels.first().cloned().unwrap_or_default();
909 let eb_label = eb.labels.first().cloned().unwrap_or_default();
910 let esrc = VertexDataset::compute_vertex_uid(
911 &ea_label,
912 ext_id_for(&primary_ext_ids, ea.vid),
913 &ea.properties,
914 );
915 let edst = VertexDataset::compute_vertex_uid(
916 &eb_label,
917 ext_id_for(&primary_ext_ids, eb.vid),
918 &eb.properties,
919 );
920 let euid = MainEdgeDataset::compute_edge_uid(
921 &esrc,
922 &edst,
923 edge_type,
924 &existing.properties,
925 );
926 primary_edge_uids.insert(euid);
927 }
928 }
929 }
930
931 let mut edges_to_insert: Vec<(Vid, Vid, Properties)> =
935 Vec::with_capacity(fork_edges.len());
936 let mut pattern_inserted = 0usize;
937 for fe in fork_edges {
938 let src_vid = endpoint_resolved
939 .get(&(fe.a_label.clone(), fe.src_uid))
940 .copied();
941 let dst_vid = endpoint_resolved
942 .get(&(fe.b_label.clone(), fe.dst_uid))
943 .copied();
944 let (src_vid, dst_vid) = match (src_vid, dst_vid) {
945 (Some(s), Some(d)) => (s, d),
946 _ => {
947 report.edges_skipped_no_endpoint += 1;
948 continue;
949 }
950 };
951 if primary_edge_uids.contains(&fe.edge_uid) {
952 report.edges_skipped_duplicate += 1;
953 continue;
954 }
955 edges_to_insert.push((src_vid, dst_vid, fe.edge_props));
956 pattern_inserted += 1;
957 }
958 if !edges_to_insert.is_empty() {
959 let n = edges_to_insert.len();
960 primary_tx
961 .bulk_insert_edges(edge_type, edges_to_insert)
962 .await?;
963 report.edges_inserted += n;
964 }
965 report.per_pattern_inserted[idx] = pattern_inserted;
966 }
967 }
968 }
969
970 if options.delete_promotion
979 && let Some(baseline) = baseline
980 {
981 let mut del_labels: Vec<&str> = patterns
982 .iter()
983 .filter(|p| !p.is_edge())
984 .map(|p| p.label_name())
985 .collect();
986 del_labels.sort_unstable();
987 del_labels.dedup();
988
989 for label in del_labels {
990 let cypher = format!("MATCH (n:`{}`) RETURN n", escape_backticks(label));
991 let result = fork.query(&cypher).await?;
992 let mut fork_now_ext: HashSet<String> = HashSet::new();
993 let mut fork_now_noext: HashSet<UniId> = HashSet::new();
994 for row in result.rows() {
995 if let Some(Value::Node(node)) = row.value("n") {
996 match ext_id_for(&fork_ext_ids, node.vid) {
997 Some(eid) if !eid.is_empty() => {
998 fork_now_ext.insert(eid.to_string());
999 }
1000 _ => {
1001 fork_now_noext.insert(VertexDataset::compute_vertex_uid(
1002 label,
1003 None,
1004 &node.properties,
1005 ));
1006 }
1007 }
1008 }
1009 }
1010
1011 if let Some(base_ext) = baseline.ext.get(label) {
1013 let deleted_ext: HashSet<String> = base_ext
1014 .keys()
1015 .filter(|eid| !fork_now_ext.contains(*eid))
1016 .cloned()
1017 .collect();
1018 if !deleted_ext.is_empty() {
1019 let resolved = batch_resolve_primary_by_ext_id(
1022 primary,
1023 &primary_ext_ids,
1024 label,
1025 &deleted_ext,
1026 )
1027 .await;
1028 for (_eid, (pvid, _props)) in resolved {
1029 primary_tx.delete_vertex(label, pvid).await?;
1030 report.vertices_deleted += 1;
1031 }
1032 }
1033 }
1034
1035 if let Some(base_noext) = baseline.no_ext.get(label) {
1038 let gone = base_noext
1039 .iter()
1040 .filter(|u| !fork_now_noext.contains(*u))
1041 .count();
1042 report.vertices_skipped_no_ext_id_for_delete += gone;
1043 }
1044 }
1045 }
1046
1047 if !any_edge_pattern {
1050 let mut edge_seen = 0usize;
1051 for et in fork.schema().schema().edge_types.keys() {
1052 let cypher = format!(
1053 "MATCH ()-[r:`{}`]->() RETURN count(r) AS c",
1054 escape_backticks(et)
1055 );
1056 if let Ok(rs) = fork.query(&cypher).await
1057 && let Some(row) = rs.rows().first()
1058 && let Ok(c) = row.get::<i64>("c")
1059 {
1060 edge_seen += c as usize;
1061 }
1062 }
1063 if edge_seen > 0 {
1064 report.edges_skipped = edge_seen;
1065 warn!(
1066 target: "uni::promote",
1067 edges_skipped = edge_seen,
1068 "promote_from_fork: fork contains {} edges; pass \
1069 PromotePattern::edge_type(...) to promote them",
1070 edge_seen
1071 );
1072 }
1073 }
1074
1075 Ok(report)
1076}