1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use tracing::warn;
18use uni_common::Properties;
19use uni_common::Result;
20use uni_common::Value;
21use uni_common::core::id::{UniId, Vid};
22
23use crate::host::{ForkPromoteSink, ForkQueryHost};
24use crate::types::{
25 DiffEdge, DiffVertex, EdgeDiff, ForkDiff, PromotePattern, PromoteReport, PropertyChange,
26 VertexDiff, VertexPropertyChange,
27};
28
29pub async fn compute_diff<Q: ForkQueryHost + ?Sized>(a: &Q, b: &Q) -> Result<ForkDiff> {
39 let mut diff = ForkDiff::default();
40
41 let labels_a: HashSet<String> = a.schema().schema().labels.keys().cloned().collect();
42 let labels_b: HashSet<String> = b.schema().schema().labels.keys().cloned().collect();
43 let labels_union: Vec<&String> = labels_a.union(&labels_b).collect();
44
45 for label in labels_union {
46 let rows_a = scan_label_nodes(a, label).await?;
47 let rows_b = scan_label_nodes(b, label).await?;
48 diff_label(label, rows_a, rows_b, &mut diff.vertices);
49 }
50
51 let edges_a: HashSet<String> = a.schema().schema().edge_types.keys().cloned().collect();
52 let edges_b: HashSet<String> = b.schema().schema().edge_types.keys().cloned().collect();
53 let edges_union: Vec<&String> = edges_a.union(&edges_b).collect();
54
55 for edge_type in edges_union {
56 let rows_a = scan_edge_type(a, edge_type).await?;
57 let rows_b = scan_edge_type(b, edge_type).await?;
58 diff_edge_type(edge_type, rows_a, rows_b, &mut diff.edges);
59 }
60
61 Ok(diff)
62}
63
64type VertexBucket = HashMap<UniId, VertexRow>;
66type EdgeBucket = HashMap<UniId, EdgeRow>;
72
73#[derive(Debug, Clone)]
74struct VertexRow {
75 label: String,
76 vid: Vid,
77 properties: Properties,
78}
79
80#[derive(Debug, Clone)]
81struct EdgeRow {
82 src_uid: UniId,
83 dst_uid: UniId,
84 properties: Properties,
85}
86
87async fn scan_label_nodes<Q: ForkQueryHost + ?Sized>(s: &Q, label: &str) -> Result<VertexBucket> {
88 use uni_store::storage::vertex::VertexDataset;
89 let cypher = format!("MATCH (n:`{}`) RETURN n", escape_backticks(label));
90 let result = s.query(&cypher).await?;
91 let mut bucket = VertexBucket::new();
92 for row in result.rows() {
93 let Some(Value::Node(node)) = row.value("n") else {
94 continue;
95 };
96 let uid = VertexDataset::compute_vertex_uid(label, None, &node.properties);
99 bucket.insert(
100 uid,
101 VertexRow {
102 label: label.to_string(),
103 vid: node.vid,
104 properties: node.properties.clone(),
105 },
106 );
107 }
108 Ok(bucket)
109}
110
111async fn scan_edge_type<Q: ForkQueryHost + ?Sized>(s: &Q, edge_type: &str) -> Result<EdgeBucket> {
112 use uni_store::storage::main_edge::MainEdgeDataset;
113 use uni_store::storage::vertex::VertexDataset;
114 let cypher = format!(
115 "MATCH (a)-[r:`{}`]->(b) RETURN a, r, b",
116 escape_backticks(edge_type)
117 );
118 let result = s.query(&cypher).await?;
119 let mut bucket = EdgeBucket::new();
120 for row in result.rows() {
121 let (Some(Value::Edge(edge)), Some(Value::Node(a)), Some(Value::Node(b))) =
122 (row.value("r"), row.value("a"), row.value("b"))
123 else {
124 continue;
125 };
126 let a_label = a.labels.first().cloned().unwrap_or_default();
127 let b_label = b.labels.first().cloned().unwrap_or_default();
128 let src_uid = VertexDataset::compute_vertex_uid(&a_label, None, &a.properties);
129 let dst_uid = VertexDataset::compute_vertex_uid(&b_label, None, &b.properties);
130 let edge_uid =
131 MainEdgeDataset::compute_edge_uid(&src_uid, &dst_uid, edge_type, &edge.properties);
132 bucket.insert(
133 edge_uid,
134 EdgeRow {
135 src_uid,
136 dst_uid,
137 properties: edge.properties.clone(),
138 },
139 );
140 }
141 Ok(bucket)
142}
143
144fn partition_added_deleted<R, A, D>(
149 mut a: HashMap<UniId, R>,
150 mut b: HashMap<UniId, R>,
151 mut mk_added: A,
152 mut mk_deleted: D,
153) -> Vec<(UniId, R, R)>
154where
155 A: FnMut(UniId, R),
156 D: FnMut(UniId, R),
157{
158 let keys_a: HashSet<UniId> = a.keys().copied().collect();
159 let keys_b: HashSet<UniId> = b.keys().copied().collect();
160
161 let mut common = Vec::new();
162 for uid in &keys_b {
163 if !keys_a.contains(uid) {
164 mk_added(*uid, b.remove(uid).expect("key from keys_b"));
165 }
166 }
167 for uid in &keys_a {
168 match keys_b.contains(uid) {
169 true => {
170 let row_a = a.remove(uid).expect("key from keys_a");
171 let row_b = b.remove(uid).expect("shared key in b");
172 common.push((*uid, row_a, row_b));
173 }
174 false => mk_deleted(*uid, a.remove(uid).expect("key from keys_a")),
175 }
176 }
177 common
178}
179
180fn diff_label(label: &str, a: VertexBucket, b: VertexBucket, out: &mut VertexDiff) {
181 let common = partition_added_deleted(
182 a,
183 b,
184 |uid, row| {
185 out.added.push(DiffVertex {
186 label: row.label,
187 uid,
188 vid: Some(row.vid),
189 properties: row.properties,
190 });
191 },
192 |uid, row| {
193 out.deleted.push(DiffVertex {
194 label: row.label,
195 uid,
196 vid: Some(row.vid),
197 properties: row.properties,
198 });
199 },
200 );
201 for (uid, row_a, row_b) in common {
202 let changes = property_changes(&row_a.properties, &row_b.properties);
203 if !changes.is_empty() {
204 out.changed.push(VertexPropertyChange {
205 label: label.to_string(),
206 uid,
207 changes,
208 });
209 }
210 }
211}
212
213fn diff_edge_type(edge_type: &str, a: EdgeBucket, b: EdgeBucket, out: &mut EdgeDiff) {
214 partition_added_deleted(
224 a,
225 b,
226 |edge_uid, row| {
227 out.added.push(DiffEdge {
228 edge_type: edge_type.to_string(),
229 edge_uid,
230 src_uid: row.src_uid,
231 dst_uid: row.dst_uid,
232 properties: row.properties,
233 });
234 },
235 |edge_uid, row| {
236 out.deleted.push(DiffEdge {
237 edge_type: edge_type.to_string(),
238 edge_uid,
239 src_uid: row.src_uid,
240 dst_uid: row.dst_uid,
241 properties: row.properties,
242 });
243 },
244 );
245}
246
247fn property_changes(a: &Properties, b: &Properties) -> Vec<PropertyChange> {
248 let mut changes = Vec::new();
249 let keys: HashSet<&String> = a.keys().chain(b.keys()).collect();
250 let mut sorted: Vec<&String> = keys.into_iter().collect();
251 sorted.sort();
252 for k in sorted {
253 let va = a.get(k);
254 let vb = b.get(k);
255 if va != vb {
256 changes.push(PropertyChange {
257 key: k.clone(),
258 before: va.cloned(),
259 after: vb.cloned(),
260 });
261 }
262 }
263 changes
264}
265
266fn escape_backticks(s: &str) -> String {
267 s.replace('`', "``")
268}
269
270async fn batch_resolve_primary_vids<Q: ForkQueryHost + ?Sized>(
287 primary: &Q,
288 primary_storage: &Arc<uni_store::storage::manager::StorageManager>,
289 label: &str,
290 uids: &[UniId],
291) -> HashMap<UniId, Vid> {
292 let mut out: HashMap<UniId, Vid> = HashMap::new();
299 if uids.is_empty() {
300 return out;
301 }
302 let candidates_per_uid: HashMap<UniId, Vec<Vid>> = match primary_storage.uid_index(label).ok() {
308 Some(uix) => match resolve_all_candidate_vids(&uix, uids).await {
309 Ok(m) => m,
310 Err(_) => return out,
311 },
312 None => return out,
313 };
314 if candidates_per_uid.is_empty() {
315 return out;
316 }
317 let vid_set: HashSet<u64> = candidates_per_uid
321 .values()
322 .flat_map(|vs| vs.iter().map(|v| v.as_u64()))
323 .collect();
324 let vid_list: Vec<String> = vid_set.iter().map(|v| v.to_string()).collect();
325 let cypher = format!(
326 "MATCH (n:`{}`) WHERE id(n) IN [{}] RETURN id(n) AS vid",
327 escape_backticks(label),
328 vid_list.join(", ")
329 );
330 let rs = match primary.query(&cypher).await {
331 Ok(rs) => rs,
332 Err(_) => return out,
333 };
334 let primary_vids: HashSet<u64> = rs
335 .rows()
336 .iter()
337 .filter_map(|row| row.get::<i64>("vid").ok())
338 .map(|v| v as u64)
339 .collect();
340 for (uid, vids) in candidates_per_uid {
341 if let Some(vid) = vids
344 .into_iter()
345 .find(|v| primary_vids.contains(&v.as_u64()))
346 {
347 out.insert(uid, vid);
348 }
349 }
350 out
351}
352
353async fn resolve_all_candidate_vids(
358 uix: &uni_store::storage::index::UidIndex,
359 uids: &[UniId],
360) -> uni_common::Result<HashMap<UniId, Vec<Vid>>> {
361 use arrow_array::Array;
362 use futures::TryStreamExt;
363
364 fn internal<E>(e: E) -> uni_common::UniError
368 where
369 E: std::error::Error + Send + Sync + 'static,
370 {
371 uni_common::UniError::Internal(anyhow::anyhow!(e))
372 }
373
374 let ds = uix.open().await.map_err(uni_common::UniError::Internal)?;
375 let hex_values: Vec<String> = uids.iter().map(uid_to_hex).collect();
376 let filter = format!(
377 "_uid_hex IN ({})",
378 hex_values
379 .iter()
380 .map(|h| format!("'{}'", h))
381 .collect::<Vec<_>>()
382 .join(", ")
383 );
384 let mut stream = ds
385 .scan()
386 .filter(&filter)
387 .map_err(internal)?
388 .project(&["_uid_hex", "_vid"])
389 .map_err(internal)?
390 .try_into_stream()
391 .await
392 .map_err(internal)?;
393
394 let hex_to_uid: HashMap<String, UniId> =
395 uids.iter().map(|uid| (uid_to_hex(uid), *uid)).collect();
396 let mut out: HashMap<UniId, Vec<Vid>> = HashMap::new();
397 while let Some(batch) = stream.try_next().await.map_err(internal)? {
398 let uid_hex_col = batch
399 .column_by_name("_uid_hex")
400 .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>())
401 .ok_or_else(|| {
402 uni_common::UniError::Internal(anyhow::anyhow!("Missing _uid_hex column"))
403 })?;
404 let vid_col = batch
405 .column_by_name("_vid")
406 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
407 .ok_or_else(|| {
408 uni_common::UniError::Internal(anyhow::anyhow!("Missing _vid column"))
409 })?;
410 for i in 0..batch.num_rows() {
411 if uid_hex_col.is_null(i) {
412 continue;
413 }
414 let hex = uid_hex_col.value(i);
415 if let Some(&uid) = hex_to_uid.get(hex) {
416 out.entry(uid)
417 .or_default()
418 .push(Vid::from(vid_col.value(i)));
419 }
420 }
421 }
422 Ok(out)
423}
424
425fn uid_to_hex(uid: &UniId) -> String {
426 uid.as_bytes()
427 .iter()
428 .map(|b| format!("{:02x}", b))
429 .collect()
430}
431
432pub async fn run_promote<Q, S>(
447 fork: &Q,
448 primary: &Q,
449 primary_tx: &S,
450 patterns: &[PromotePattern],
451) -> Result<PromoteReport>
452where
453 Q: ForkQueryHost + ?Sized,
454 S: ForkPromoteSink + ?Sized,
455{
456 use uni_store::storage::vertex::VertexDataset;
457
458 let mut report = PromoteReport {
459 per_pattern_inserted: vec![0usize; patterns.len()],
460 ..Default::default()
461 };
462
463 let primary_storage = primary.storage();
464 let mut any_edge_pattern = false;
465 let mut just_inserted: HashMap<(String, UniId), Vid> = HashMap::new();
471
472 for (idx, pattern) in patterns.iter().enumerate() {
473 match pattern {
474 PromotePattern::Vertex {
475 label,
476 where_clause,
477 } => {
478 let cypher = match where_clause {
479 Some(w) => format!(
480 "MATCH (n:`{}`) WHERE {} RETURN n",
481 escape_backticks(label),
482 w
483 ),
484 None => format!("MATCH (n:`{}`) RETURN n", escape_backticks(label)),
485 };
486
487 let result = fork.query(&cypher).await?;
488 if result.rows().is_empty() {
489 continue;
490 }
491
492 let mut candidates: Vec<(UniId, Properties)> =
495 Vec::with_capacity(result.rows().len());
496 for row in result.rows() {
497 let Some(Value::Node(node)) = row.value("n") else {
498 continue;
499 };
500 let uid = VertexDataset::compute_vertex_uid(label, None, &node.properties);
501 if just_inserted.contains_key(&(label.clone(), uid)) {
502 report.vertices_skipped_uid_conflict += 1;
503 continue;
504 }
505 candidates.push((uid, node.properties.clone()));
506 }
507
508 let uids_to_check: Vec<UniId> = candidates.iter().map(|(u, _)| *u).collect();
512 let on_primary =
513 batch_resolve_primary_vids(primary, &primary_storage, label, &uids_to_check)
514 .await;
515
516 let mut to_insert: Vec<Properties> = Vec::with_capacity(candidates.len());
517 let mut insert_uids: Vec<UniId> = Vec::with_capacity(candidates.len());
518 for (uid, props) in candidates {
519 if on_primary.contains_key(&uid) {
520 report.vertices_skipped_uid_conflict += 1;
521 } else {
522 to_insert.push(props);
523 insert_uids.push(uid);
524 }
525 }
526
527 if !to_insert.is_empty() {
528 let n = to_insert.len();
529 let vids = primary_tx.bulk_insert_vertices(label, to_insert).await?;
530 for (uid, vid) in insert_uids.into_iter().zip(vids) {
531 just_inserted.insert((label.clone(), uid), vid);
532 }
533 report.vertices_inserted += n;
534 report.per_pattern_inserted[idx] = n;
535 }
536 }
537 PromotePattern::Edge {
538 edge_type,
539 where_clause,
540 } => {
541 any_edge_pattern = true;
542 let cypher = match where_clause {
543 Some(w) => format!(
544 "MATCH (a)-[r:`{}`]->(b) WHERE {} RETURN a, r, b",
545 escape_backticks(edge_type),
546 w
547 ),
548 None => format!(
549 "MATCH (a)-[r:`{}`]->(b) RETURN a, r, b",
550 escape_backticks(edge_type)
551 ),
552 };
553
554 let result = fork.query(&cypher).await?;
555 if result.rows().is_empty() {
556 continue;
557 }
558
559 use uni_store::storage::main_edge::MainEdgeDataset;
560
561 struct ForkEdgeRow {
565 a_label: String,
566 b_label: String,
567 src_uid: UniId,
568 dst_uid: UniId,
569 edge_uid: UniId,
570 edge_props: Properties,
571 }
572 let mut fork_edges: Vec<ForkEdgeRow> = Vec::with_capacity(result.rows().len());
573 for row in result.rows() {
574 let (Some(Value::Edge(edge)), Some(Value::Node(a)), Some(Value::Node(b))) =
575 (row.value("r"), row.value("a"), row.value("b"))
576 else {
577 continue;
578 };
579 let a_label = match a.labels.first() {
580 Some(l) => l.clone(),
581 None => continue,
582 };
583 let b_label = match b.labels.first() {
584 Some(l) => l.clone(),
585 None => continue,
586 };
587 let src_uid = VertexDataset::compute_vertex_uid(&a_label, None, &a.properties);
588 let dst_uid = VertexDataset::compute_vertex_uid(&b_label, None, &b.properties);
589 let edge_uid = MainEdgeDataset::compute_edge_uid(
590 &src_uid,
591 &dst_uid,
592 edge_type,
593 &edge.properties,
594 );
595 fork_edges.push(ForkEdgeRow {
596 a_label,
597 b_label,
598 src_uid,
599 dst_uid,
600 edge_uid,
601 edge_props: edge.properties.clone(),
602 });
603 }
604
605 let mut to_resolve: HashMap<String, HashSet<UniId>> = HashMap::new();
608 for fe in &fork_edges {
609 if !just_inserted.contains_key(&(fe.a_label.clone(), fe.src_uid)) {
610 to_resolve
611 .entry(fe.a_label.clone())
612 .or_default()
613 .insert(fe.src_uid);
614 }
615 if !just_inserted.contains_key(&(fe.b_label.clone(), fe.dst_uid)) {
616 to_resolve
617 .entry(fe.b_label.clone())
618 .or_default()
619 .insert(fe.dst_uid);
620 }
621 }
622 let mut endpoint_resolved: HashMap<(String, UniId), Vid> = HashMap::new();
623 for (lbl, uid_set) in to_resolve {
624 let uid_vec: Vec<UniId> = uid_set.into_iter().collect();
625 let resolved =
626 batch_resolve_primary_vids(primary, &primary_storage, &lbl, &uid_vec).await;
627 for (uid, vid) in resolved {
628 endpoint_resolved.insert((lbl.clone(), uid), vid);
629 }
630 }
631 for ((lbl, uid), vid) in just_inserted.iter() {
633 endpoint_resolved.insert((lbl.clone(), *uid), *vid);
634 }
635
636 let mut resolved_pairs: HashSet<(Vid, Vid)> = HashSet::new();
640 for fe in &fork_edges {
641 let s = endpoint_resolved.get(&(fe.a_label.clone(), fe.src_uid));
642 let d = endpoint_resolved.get(&(fe.b_label.clone(), fe.dst_uid));
643 if let (Some(s), Some(d)) = (s, d) {
644 resolved_pairs.insert((*s, *d));
645 }
646 }
647 let mut primary_edge_uids: HashSet<UniId> = HashSet::new();
648 if !resolved_pairs.is_empty() {
649 let src_vids: HashSet<u64> =
650 resolved_pairs.iter().map(|(s, _)| s.as_u64()).collect();
651 let dst_vids: HashSet<u64> =
652 resolved_pairs.iter().map(|(_, d)| d.as_u64()).collect();
653 let src_list: Vec<String> = src_vids.iter().map(|v| v.to_string()).collect();
654 let dst_list: Vec<String> = dst_vids.iter().map(|v| v.to_string()).collect();
655 let dedup_cypher = format!(
656 "MATCH (a)-[r:`{}`]->(b) \
657 WHERE id(a) IN [{}] AND id(b) IN [{}] \
658 RETURN a, r, b",
659 escape_backticks(edge_type),
660 src_list.join(", "),
661 dst_list.join(", "),
662 );
663 if let Ok(rs) = primary.query(&dedup_cypher).await {
664 for row in rs.rows() {
665 let (
666 Some(Value::Edge(existing)),
667 Some(Value::Node(ea)),
668 Some(Value::Node(eb)),
669 ) = (row.value("r"), row.value("a"), row.value("b"))
670 else {
671 continue;
672 };
673 let ea_label = ea.labels.first().cloned().unwrap_or_default();
674 let eb_label = eb.labels.first().cloned().unwrap_or_default();
675 let esrc =
676 VertexDataset::compute_vertex_uid(&ea_label, None, &ea.properties);
677 let edst =
678 VertexDataset::compute_vertex_uid(&eb_label, None, &eb.properties);
679 let euid = MainEdgeDataset::compute_edge_uid(
680 &esrc,
681 &edst,
682 edge_type,
683 &existing.properties,
684 );
685 primary_edge_uids.insert(euid);
686 }
687 }
688 }
689
690 let mut edges_to_insert: Vec<(Vid, Vid, Properties)> =
694 Vec::with_capacity(fork_edges.len());
695 let mut pattern_inserted = 0usize;
696 for fe in fork_edges {
697 let src_vid = endpoint_resolved
698 .get(&(fe.a_label.clone(), fe.src_uid))
699 .copied();
700 let dst_vid = endpoint_resolved
701 .get(&(fe.b_label.clone(), fe.dst_uid))
702 .copied();
703 let (src_vid, dst_vid) = match (src_vid, dst_vid) {
704 (Some(s), Some(d)) => (s, d),
705 _ => {
706 report.edges_skipped_no_endpoint += 1;
707 continue;
708 }
709 };
710 if primary_edge_uids.contains(&fe.edge_uid) {
711 report.edges_skipped_duplicate += 1;
712 continue;
713 }
714 edges_to_insert.push((src_vid, dst_vid, fe.edge_props));
715 pattern_inserted += 1;
716 }
717 if !edges_to_insert.is_empty() {
718 let n = edges_to_insert.len();
719 primary_tx
720 .bulk_insert_edges(edge_type, edges_to_insert)
721 .await?;
722 report.edges_inserted += n;
723 }
724 report.per_pattern_inserted[idx] = pattern_inserted;
725 }
726 }
727 }
728
729 if !any_edge_pattern {
732 let mut edge_seen = 0usize;
733 for et in fork.schema().schema().edge_types.keys() {
734 let cypher = format!(
735 "MATCH ()-[r:`{}`]->() RETURN count(r) AS c",
736 escape_backticks(et)
737 );
738 if let Ok(rs) = fork.query(&cypher).await
739 && let Some(row) = rs.rows().first()
740 && let Ok(c) = row.get::<i64>("c")
741 {
742 edge_seen += c as usize;
743 }
744 }
745 if edge_seen > 0 {
746 report.edges_skipped = edge_seen;
747 warn!(
748 target: "uni::promote",
749 edges_skipped = edge_seen,
750 "promote_from_fork: fork contains {} edges; pass \
751 PromotePattern::edge_type(...) to promote them",
752 edge_seen
753 );
754 }
755 }
756
757 Ok(report)
758}