1use crate::runtime::context::QueryContext;
5use crate::runtime::l0::L0Buffer;
6use crate::runtime::l0_visibility;
7use crate::storage::main_vertex::MainVertexDataset;
8use crate::storage::manager::StorageManager;
9use crate::storage::value_codec::{self, CrdtDecodeMode};
10use anyhow::{Result, anyhow};
11use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
12use futures::TryStreamExt;
13use lancedb::query::{ExecutableQuery, QueryBase, Select};
14use lru::LruCache;
15use metrics;
16use std::collections::HashMap;
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use tokio::sync::Mutex;
20use tracing::{debug, instrument, warn};
21use uni_common::Properties;
22use uni_common::Value;
23use uni_common::core::id::{Eid, Vid};
24use uni_common::core::schema::{DataType, SchemaManager};
25use uni_crdt::Crdt;
26
27pub struct PropertyManager {
28 storage: Arc<StorageManager>,
29 schema_manager: Arc<SchemaManager>,
30 vertex_cache: Option<Mutex<LruCache<(Vid, String), Value>>>,
32 edge_cache: Option<Mutex<LruCache<(uni_common::core::id::Eid, String), Value>>>,
33 cache_capacity: usize,
34}
35
36impl PropertyManager {
37 pub fn new(
38 storage: Arc<StorageManager>,
39 schema_manager: Arc<SchemaManager>,
40 capacity: usize,
41 ) -> Self {
42 let (vertex_cache, edge_cache) = if capacity == 0 {
44 (None, None)
45 } else {
46 let cap = NonZeroUsize::new(capacity).unwrap();
47 (
48 Some(Mutex::new(LruCache::new(cap))),
49 Some(Mutex::new(LruCache::new(cap))),
50 )
51 };
52
53 Self {
54 storage,
55 schema_manager,
56 vertex_cache,
57 edge_cache,
58 cache_capacity: capacity,
59 }
60 }
61
62 pub fn cache_size(&self) -> usize {
63 self.cache_capacity
64 }
65
66 pub fn caching_enabled(&self) -> bool {
68 self.cache_capacity > 0
69 }
70
71 pub async fn clear_cache(&self) {
74 if let Some(ref cache) = self.vertex_cache {
75 cache.lock().await.clear();
76 }
77 if let Some(ref cache) = self.edge_cache {
78 cache.lock().await.clear();
79 }
80 }
81
82 pub async fn invalidate_vertex(&self, _vid: Vid) {
84 if let Some(ref cache) = self.vertex_cache {
85 let mut cache = cache.lock().await;
86 cache.clear();
90 }
91 }
92
93 pub async fn invalidate_edge(&self, _eid: uni_common::core::id::Eid) {
95 if let Some(ref cache) = self.edge_cache {
96 let mut cache = cache.lock().await;
97 cache.clear();
99 }
100 }
101
102 #[instrument(skip(self, ctx), level = "trace")]
103 pub async fn get_edge_prop(
104 &self,
105 eid: uni_common::core::id::Eid,
106 prop: &str,
107 ctx: Option<&QueryContext>,
108 ) -> Result<Value> {
109 if l0_visibility::is_edge_deleted(eid, ctx) {
111 return Ok(Value::Null);
112 }
113
114 if let Some(val) = l0_visibility::lookup_edge_prop(eid, prop, ctx) {
116 return Ok(val);
117 }
118
119 if let Some(ref cache) = self.edge_cache {
121 let mut cache = cache.lock().await;
122 if let Some(val) = cache.get(&(eid, prop.to_string())) {
123 debug!(eid = ?eid, prop, "Cache HIT");
124 metrics::counter!("uni_property_cache_hits_total", "type" => "edge").increment(1);
125 return Ok(val.clone());
126 } else {
127 debug!(eid = ?eid, prop, "Cache MISS");
128 metrics::counter!("uni_property_cache_misses_total", "type" => "edge").increment(1);
129 }
130 }
131
132 let all = self.get_all_edge_props_with_ctx(eid, ctx).await?;
134 let val = all
135 .as_ref()
136 .and_then(|props| props.get(prop).cloned())
137 .unwrap_or(Value::Null);
138
139 if let Some(ref cache) = self.edge_cache {
141 let mut cache = cache.lock().await;
142 if let Some(ref props) = all {
143 for (prop_name, prop_val) in props {
144 cache.put((eid, prop_name.clone()), prop_val.clone());
145 }
146 } else {
147 cache.put((eid, prop.to_string()), Value::Null);
149 }
150 }
151
152 Ok(val)
153 }
154
155 pub async fn get_all_edge_props_with_ctx(
156 &self,
157 eid: uni_common::core::id::Eid,
158 ctx: Option<&QueryContext>,
159 ) -> Result<Option<Properties>> {
160 if l0_visibility::is_edge_deleted(eid, ctx) {
162 return Ok(None);
163 }
164
165 let mut final_props = l0_visibility::accumulate_edge_props(eid, ctx).unwrap_or_default();
167
168 let storage_props = self.fetch_all_edge_props_from_storage(eid).await?;
170
171 if final_props.is_empty() && storage_props.is_none() {
173 if l0_visibility::edge_exists_in_l0(eid, ctx) {
174 return Ok(Some(Properties::new()));
175 }
176 return Ok(None);
177 }
178
179 if let Some(sp) = storage_props {
181 for (k, v) in sp {
182 final_props.entry(k).or_insert(v);
183 }
184 }
185
186 Ok(Some(final_props))
187 }
188
189 async fn fetch_all_edge_props_from_storage(&self, eid: Eid) -> Result<Option<Properties>> {
190 self.fetch_all_edge_props_from_storage_with_hint(eid, None)
192 .await
193 }
194
195 async fn fetch_all_edge_props_from_storage_with_hint(
196 &self,
197 eid: Eid,
198 type_name_hint: Option<&str>,
199 ) -> Result<Option<Properties>> {
200 let schema = self.schema_manager.schema();
201 let lancedb_store = self.storage.lancedb_store();
202
203 let type_names: Vec<&str> = if let Some(hint) = type_name_hint {
205 vec![hint]
206 } else {
207 schema.edge_types.keys().map(|s| s.as_str()).collect()
209 };
210
211 for type_name in type_names {
212 let type_props = schema.properties.get(type_name);
213
214 let delta_ds = match self.storage.delta_dataset(type_name, "fwd") {
217 Ok(ds) => ds,
218 Err(_) => continue, };
220
221 let table = match delta_ds.open_lancedb(lancedb_store).await {
223 Ok(t) => t,
224 Err(_) => continue, };
226
227 use lancedb::query::{ExecutableQuery, QueryBase};
228
229 let base_filter = format!("eid = {}", eid.as_u64());
230
231 let filter_expr = self.storage.apply_version_filter(base_filter);
232
233 let query = table.query().only_if(filter_expr);
234 let stream = match query.execute().await {
235 Ok(s) => s,
236 Err(_) => continue,
237 };
238
239 let batches: Vec<arrow_array::RecordBatch> =
240 stream.try_collect().await.unwrap_or_default();
241
242 let mut rows: Vec<(u64, u8, Properties)> = Vec::new();
244
245 for batch in batches {
246 let op_col = match batch.column_by_name("op") {
247 Some(c) => c
248 .as_any()
249 .downcast_ref::<arrow_array::UInt8Array>()
250 .unwrap(),
251 None => continue,
252 };
253 let ver_col = match batch.column_by_name("_version") {
254 Some(c) => c.as_any().downcast_ref::<UInt64Array>().unwrap(),
255 None => continue,
256 };
257
258 for row in 0..batch.num_rows() {
259 let ver = ver_col.value(row);
260 let op = op_col.value(row);
261 let mut props = Properties::new();
262
263 if op != 1 {
264 if let Some(tp) = type_props {
266 for (p_name, p_meta) in tp {
267 if let Some(col) = batch.column_by_name(p_name)
268 && !col.is_null(row)
269 {
270 let val =
271 Self::value_from_column(col.as_ref(), &p_meta.r#type, row)?;
272 props.insert(p_name.clone(), val);
273 }
274 }
275 }
276 }
277 rows.push((ver, op, props));
278 }
279 }
280
281 if rows.is_empty() {
282 continue;
283 }
284
285 rows.sort_by_key(|(ver, _, _)| *ver);
287
288 let mut merged_props: Properties = Properties::new();
292 let mut is_deleted = false;
293
294 for (_, op, props) in rows {
295 if op == 1 {
296 is_deleted = true;
298 merged_props.clear();
299 } else {
300 is_deleted = false;
301 for (p_name, p_val) in props {
302 let is_crdt = type_props
304 .and_then(|tp| tp.get(&p_name))
305 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
306 .unwrap_or(false);
307
308 if is_crdt {
309 if let Some(existing) = merged_props.get(&p_name) {
311 if let Ok(merged) = self.merge_crdt_values(existing, &p_val) {
312 merged_props.insert(p_name, merged);
313 }
314 } else {
315 merged_props.insert(p_name, p_val);
316 }
317 } else {
318 merged_props.insert(p_name, p_val);
320 }
321 }
322 }
323 }
324
325 if is_deleted {
326 return Ok(None);
327 }
328
329 if !merged_props.is_empty() {
330 return Ok(Some(merged_props));
331 }
332 }
333
334 use crate::storage::main_edge::MainEdgeDataset;
336 if let Some(props) = MainEdgeDataset::find_props_by_eid(lancedb_store, eid).await? {
337 return Ok(Some(props));
338 }
339
340 Ok(None)
341 }
342
343 pub async fn get_batch_vertex_props(
345 &self,
346 vids: &[Vid],
347 properties: &[&str],
348 ctx: Option<&QueryContext>,
349 ) -> Result<HashMap<Vid, Properties>> {
350 let schema = self.schema_manager.schema();
351 let mut result = HashMap::new();
352 if vids.is_empty() {
353 return Ok(result);
354 }
355
356 let labels_to_scan: Vec<String> = {
361 let mut needed: std::collections::HashSet<String> = std::collections::HashSet::new();
362 let mut all_resolved = true;
363 for &vid in vids {
364 if let Some(labels) = self.storage.get_labels_from_index(vid) {
365 needed.extend(labels);
366 } else {
367 all_resolved = false;
368 break;
369 }
370 }
371 if all_resolved {
372 needed.into_iter().collect()
373 } else {
374 schema.labels.keys().cloned().collect() }
376 };
377
378 for label_name in &labels_to_scan {
380 let label_schema_props = schema.properties.get(label_name);
382 let valid_props: Vec<&str> = properties
383 .iter()
384 .cloned()
385 .filter(|p| label_schema_props.is_some_and(|props| props.contains_key(*p)))
386 .collect();
387 let ds = self.storage.vertex_dataset(label_name)?;
390 let lancedb_store = self.storage.lancedb_store();
391 let table = match ds.open_lancedb(lancedb_store).await {
392 Ok(t) => t,
393 Err(e) => {
394 let err_msg = e.to_string();
395 if err_msg.contains("was not found")
396 || err_msg.contains("does not exist")
397 || err_msg.contains("not found")
398 {
399 continue; }
401 warn!(
402 label = %label_name,
403 error = %e,
404 "failed to open LanceDB table for label, skipping"
405 );
406 continue;
407 }
408 };
409
410 let vid_list = vids
412 .iter()
413 .map(|v| v.as_u64().to_string())
414 .collect::<Vec<_>>()
415 .join(",");
416 let base_filter = format!("_vid IN ({})", vid_list);
417
418 let final_filter = self.storage.apply_version_filter(base_filter);
419
420 let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
422 columns.push("_vid".to_string());
423 columns.push("_version".to_string());
424 columns.push("_deleted".to_string());
425 columns.extend(valid_props.iter().map(|s| s.to_string()));
426 columns.push("overflow_json".to_string());
428
429 let query = table
430 .query()
431 .only_if(final_filter)
432 .select(Select::Columns(columns));
433
434 let stream = match query.execute().await {
435 Ok(s) => s,
436 Err(e) => {
437 warn!(
438 label = %label_name,
439 error = %e,
440 "failed to execute query on label table, skipping"
441 );
442 continue;
443 }
444 };
445
446 let batches: Vec<RecordBatch> = match stream.try_collect().await {
447 Ok(b) => b,
448 Err(e) => {
449 warn!(
450 label = %label_name,
451 error = %e,
452 "failed to collect query results for label, skipping"
453 );
454 continue;
455 }
456 };
457 for batch in batches {
458 let vid_col = match batch
459 .column_by_name("_vid")
460 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
461 {
462 Some(c) => c,
463 None => continue,
464 };
465 let del_col = match batch
466 .column_by_name("_deleted")
467 .and_then(|col| col.as_any().downcast_ref::<BooleanArray>())
468 {
469 Some(c) => c,
470 None => continue,
471 };
472
473 for row in 0..batch.num_rows() {
474 let vid = Vid::from(vid_col.value(row));
475
476 if del_col.value(row) {
477 result.remove(&vid);
478 continue;
479 }
480
481 let label_props = schema.properties.get(label_name);
482 let mut props =
483 Self::extract_row_properties(&batch, row, &valid_props, label_props)?;
484 Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
485 result.insert(vid, props);
486 }
487 }
488 }
489
490 if let Some(ctx) = ctx {
492 for pending_l0_arc in &ctx.pending_flush_l0s {
494 let pending_l0 = pending_l0_arc.read();
495 self.overlay_l0_batch(vids, &pending_l0, properties, &mut result);
496 }
497
498 let l0 = ctx.l0.read();
500 self.overlay_l0_batch(vids, &l0, properties, &mut result);
501
502 if self.storage.version_high_water_mark().is_none()
506 && let Some(tx_l0_arc) = &ctx.transaction_l0
507 {
508 let tx_l0 = tx_l0_arc.read();
509 self.overlay_l0_batch(vids, &tx_l0, properties, &mut result);
510 }
511 }
512
513 Ok(result)
514 }
515
516 fn overlay_l0_batch(
517 &self,
518 vids: &[Vid],
519 l0: &L0Buffer,
520 properties: &[&str],
521 result: &mut HashMap<Vid, Properties>,
522 ) {
523 let schema = self.schema_manager.schema();
524 for &vid in vids {
525 if l0.vertex_tombstones.contains(&vid) {
527 result.remove(&vid);
528 continue;
529 }
530 if let Some(l0_props) = l0.vertex_properties.get(&vid) {
532 let entry_version = l0.vertex_versions.get(&vid).copied().unwrap_or(0);
534 if self
535 .storage
536 .version_high_water_mark()
537 .is_some_and(|hwm| entry_version > hwm)
538 {
539 continue;
540 }
541
542 let entry = result.entry(vid).or_default();
543 let labels = l0.get_vertex_labels(vid);
545
546 for (k, v) in l0_props {
547 if properties.contains(&k.as_str()) {
548 let is_crdt = labels
550 .and_then(|label_list| {
551 label_list.iter().find_map(|ln| {
552 schema
553 .properties
554 .get(ln)
555 .and_then(|lp| lp.get(k))
556 .filter(|pm| matches!(pm.r#type, DataType::Crdt(_)))
557 })
558 })
559 .is_some();
560
561 if is_crdt {
562 let existing = entry.entry(k.clone()).or_insert(Value::Null);
563 *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
564 } else {
565 entry.insert(k.clone(), v.clone());
566 }
567 }
568 }
569 }
570 }
571 }
572
573 pub async fn get_batch_edge_props(
576 &self,
577 eids: &[uni_common::core::id::Eid],
578 properties: &[&str],
579 ctx: Option<&QueryContext>,
580 ) -> Result<HashMap<Vid, Properties>> {
581 let schema = self.schema_manager.schema();
582 let mut result = HashMap::new();
583 if eids.is_empty() {
584 return Ok(result);
585 }
586
587 let types_to_scan: Vec<String> = {
592 if let Some(ctx) = ctx {
593 let mut needed: std::collections::HashSet<String> =
594 std::collections::HashSet::new();
595 let mut all_resolved = true;
596 for &eid in eids {
597 if let Some(etype) = ctx.l0.read().get_edge_type(eid) {
598 needed.insert(etype.to_string());
599 } else {
600 all_resolved = false;
601 break;
602 }
603 }
604 if all_resolved {
605 needed.into_iter().collect()
606 } else {
607 schema.edge_types.keys().cloned().collect() }
609 } else {
610 schema.edge_types.keys().cloned().collect() }
612 };
613
614 for type_name in &types_to_scan {
616 let type_props = schema.properties.get(type_name);
617 let valid_props: Vec<&str> = properties
618 .iter()
619 .cloned()
620 .filter(|p| type_props.is_some_and(|props| props.contains_key(*p)))
621 .collect();
622 let delta_ds = match self.storage.delta_dataset(type_name, "fwd") {
625 Ok(ds) => ds,
626 Err(_) => continue,
627 };
628 let lancedb_store = self.storage.lancedb_store();
629 let table = match delta_ds.open_lancedb(lancedb_store).await {
630 Ok(t) => t,
631 Err(e) => {
632 let err_msg = e.to_string();
633 if err_msg.contains("was not found")
634 || err_msg.contains("does not exist")
635 || err_msg.contains("not found")
636 {
637 continue; }
639 warn!(
640 edge_type = %type_name,
641 error = %e,
642 "failed to open LanceDB delta table for edge type, skipping"
643 );
644 continue;
645 }
646 };
647
648 let eid_list = eids
649 .iter()
650 .map(|e| e.as_u64().to_string())
651 .collect::<Vec<_>>()
652 .join(",");
653 let base_filter = format!("eid IN ({})", eid_list);
654
655 let final_filter = self.storage.apply_version_filter(base_filter);
656
657 let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
659 columns.push("eid".to_string());
660 columns.push("_version".to_string());
661 columns.push("op".to_string());
662 columns.extend(valid_props.iter().map(|s| s.to_string()));
663 columns.push("overflow_json".to_string());
665
666 let query = table
667 .query()
668 .only_if(final_filter)
669 .select(Select::Columns(columns));
670
671 let stream = match query.execute().await {
672 Ok(s) => s,
673 Err(e) => {
674 warn!(
675 edge_type = %type_name,
676 error = %e,
677 "failed to execute query on edge delta table, skipping"
678 );
679 continue;
680 }
681 };
682
683 let batches: Vec<RecordBatch> = match stream.try_collect().await {
684 Ok(b) => b,
685 Err(e) => {
686 warn!(
687 edge_type = %type_name,
688 error = %e,
689 "failed to collect query results for edge type, skipping"
690 );
691 continue;
692 }
693 };
694 for batch in batches {
695 let eid_col = match batch
696 .column_by_name("eid")
697 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
698 {
699 Some(c) => c,
700 None => continue,
701 };
702 let op_col = match batch
703 .column_by_name("op")
704 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
705 {
706 Some(c) => c,
707 None => continue,
708 };
709
710 for row in 0..batch.num_rows() {
711 let eid = uni_common::core::id::Eid::from(eid_col.value(row));
712
713 if op_col.value(row) == 1 {
715 result.remove(&Vid::from(eid.as_u64()));
716 continue;
717 }
718
719 let mut props =
720 Self::extract_row_properties(&batch, row, &valid_props, type_props)?;
721 Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
722 result.insert(Vid::from(eid.as_u64()), props);
724 }
725 }
726 }
727
728 if let Some(ctx) = ctx {
730 for pending_l0_arc in &ctx.pending_flush_l0s {
732 let pending_l0 = pending_l0_arc.read();
733 self.overlay_l0_edge_batch(eids, &pending_l0, properties, &mut result);
734 }
735
736 let l0 = ctx.l0.read();
738 self.overlay_l0_edge_batch(eids, &l0, properties, &mut result);
739
740 if self.storage.version_high_water_mark().is_none()
744 && let Some(tx_l0_arc) = &ctx.transaction_l0
745 {
746 let tx_l0 = tx_l0_arc.read();
747 self.overlay_l0_edge_batch(eids, &tx_l0, properties, &mut result);
748 }
749 }
750
751 Ok(result)
752 }
753
754 fn overlay_l0_edge_batch(
755 &self,
756 eids: &[uni_common::core::id::Eid],
757 l0: &L0Buffer,
758 properties: &[&str],
759 result: &mut HashMap<Vid, Properties>,
760 ) {
761 let schema = self.schema_manager.schema();
762 for &eid in eids {
763 let vid_key = Vid::from(eid.as_u64());
764 if l0.tombstones.contains_key(&eid) {
765 result.remove(&vid_key);
766 continue;
767 }
768 if let Some(l0_props) = l0.edge_properties.get(&eid) {
769 let entry_version = l0.edge_versions.get(&eid).copied().unwrap_or(0);
771 if self
772 .storage
773 .version_high_water_mark()
774 .is_some_and(|hwm| entry_version > hwm)
775 {
776 continue;
777 }
778
779 let entry = result.entry(vid_key).or_default();
780 let type_name = l0.get_edge_type(eid);
782
783 let include_all = properties.contains(&"_all_props");
784 for (k, v) in l0_props {
785 if include_all || properties.contains(&k.as_str()) {
786 let is_crdt = type_name
788 .and_then(|tn| schema.properties.get(tn))
789 .and_then(|tp| tp.get(k))
790 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
791 .unwrap_or(false);
792
793 if is_crdt {
794 let existing = entry.entry(k.clone()).or_insert(Value::Null);
795 *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
796 } else {
797 entry.insert(k.clone(), v.clone());
798 }
799 }
800 }
801 }
802 }
803 }
804
805 pub async fn load_properties_columnar(
806 &self,
807 vids: &UInt64Array,
808 properties: &[&str],
809 ctx: Option<&QueryContext>,
810 ) -> Result<RecordBatch> {
811 let mut vid_vec = Vec::with_capacity(vids.len());
829 for i in 0..vids.len() {
830 vid_vec.push(Vid::from(vids.value(i)));
831 }
832
833 let _props_map = self
834 .get_batch_vertex_props(&vid_vec, properties, ctx)
835 .await?;
836
837 Err(anyhow!(
871 "Columnar property load not fully implemented yet - use batch load"
872 ))
873 }
874
875 pub async fn get_batch_labels(
877 &self,
878 vids: &[Vid],
879 ctx: Option<&QueryContext>,
880 ) -> Result<HashMap<Vid, Vec<String>>> {
881 let mut result = HashMap::new();
882 if vids.is_empty() {
883 return Ok(result);
884 }
885
886 if let Some(ctx) = ctx {
888 let mut collect_labels = |l0: &L0Buffer| {
889 for &vid in vids {
890 if let Some(labels) = l0.get_vertex_labels(vid) {
891 result
892 .entry(vid)
893 .or_default()
894 .extend(labels.iter().cloned());
895 }
896 }
897 };
898
899 for l0_arc in &ctx.pending_flush_l0s {
900 collect_labels(&l0_arc.read());
901 }
902 collect_labels(&ctx.l0.read());
903 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
904 collect_labels(&tx_l0_arc.read());
905 }
906 }
907
908 let mut vids_needing_lancedb = Vec::new();
910
911 fn merge_labels(existing: &mut Vec<String>, new_labels: Vec<String>) {
913 for l in new_labels {
914 if !existing.contains(&l) {
915 existing.push(l);
916 }
917 }
918 }
919
920 for &vid in vids {
921 if result.contains_key(&vid) {
922 continue; }
924
925 if let Some(labels) = self.storage.get_labels_from_index(vid) {
926 merge_labels(result.entry(vid).or_default(), labels);
927 } else {
928 vids_needing_lancedb.push(vid);
929 }
930 }
931
932 if !vids_needing_lancedb.is_empty() {
934 let lancedb_store = self.storage.lancedb_store();
935 let version = self.storage.version_high_water_mark();
936 let storage_labels = MainVertexDataset::find_batch_labels_by_vids(
937 lancedb_store,
938 &vids_needing_lancedb,
939 version,
940 )
941 .await?;
942
943 for (vid, labels) in storage_labels {
944 merge_labels(result.entry(vid).or_default(), labels);
945 }
946 }
947
948 for labels in result.values_mut() {
950 labels.sort();
951 labels.dedup();
952 }
953
954 Ok(result)
955 }
956
957 pub async fn get_all_vertex_props(&self, vid: Vid) -> Result<Properties> {
958 Ok(self
959 .get_all_vertex_props_with_ctx(vid, None)
960 .await?
961 .unwrap_or_default())
962 }
963
964 pub async fn get_all_vertex_props_with_ctx(
965 &self,
966 vid: Vid,
967 ctx: Option<&QueryContext>,
968 ) -> Result<Option<Properties>> {
969 if l0_visibility::is_vertex_deleted(vid, ctx) {
971 return Ok(None);
972 }
973
974 let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
976
977 let storage_props_opt = self.fetch_all_props_from_storage(vid).await?;
979
980 if l0_props.is_none() && storage_props_opt.is_none() {
982 return Ok(None);
983 }
984
985 let mut final_props = l0_props.unwrap_or_default();
986
987 if let Some(storage_props) = storage_props_opt {
989 for (k, v) in storage_props {
990 final_props.entry(k).or_insert(v);
991 }
992 }
993
994 if let Some(ctx) = ctx {
997 let labels = l0_visibility::get_vertex_labels(vid, ctx);
999 for label in &labels {
1000 self.normalize_crdt_properties(&mut final_props, label)?;
1001 }
1002 }
1003
1004 Ok(Some(final_props))
1005 }
1006
1007 pub async fn get_batch_vertex_props_for_label(
1013 &self,
1014 vids: &[Vid],
1015 label: &str,
1016 ctx: Option<&QueryContext>,
1017 ) -> Result<HashMap<Vid, Properties>> {
1018 let mut result: HashMap<Vid, Properties> = HashMap::new();
1019 let mut need_storage: Vec<Vid> = Vec::new();
1020
1021 for &vid in vids {
1023 if l0_visibility::is_vertex_deleted(vid, ctx) {
1024 continue;
1025 }
1026 let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
1027 if let Some(props) = l0_props {
1028 result.insert(vid, props);
1029 } else {
1030 need_storage.push(vid);
1031 }
1032 }
1033
1034 if need_storage.is_empty() {
1036 if ctx.is_some() {
1038 for props in result.values_mut() {
1039 self.normalize_crdt_properties(props, label)?;
1040 }
1041 }
1042 return Ok(result);
1043 }
1044
1045 let schema = self.schema_manager.schema();
1047 let label_props = schema.properties.get(label);
1048
1049 let table = match self.storage.get_cached_table(label).await {
1050 Ok(t) => t,
1051 Err(e) => {
1052 let err_msg = e.to_string();
1053 if err_msg.contains("was not found")
1054 || err_msg.contains("does not exist")
1055 || err_msg.contains("not found")
1056 {
1057 return Ok(result);
1059 }
1060 return Err(e.context(format!("failed to open cached table for label '{}'", label)));
1062 }
1063 };
1064
1065 let mut prop_names: Vec<String> = Vec::new();
1066 if let Some(props) = label_props {
1067 prop_names = props.keys().cloned().collect();
1068 }
1069
1070 let mut columns: Vec<String> = vec![
1071 "_vid".to_string(),
1072 "_deleted".to_string(),
1073 "_version".to_string(),
1074 ];
1075 columns.extend(prop_names.iter().cloned());
1076 columns.push("overflow_json".to_string());
1077
1078 let vid_list: String = need_storage
1080 .iter()
1081 .map(|v| v.as_u64().to_string())
1082 .collect::<Vec<_>>()
1083 .join(", ");
1084 let base_filter = format!("_vid IN ({})", vid_list);
1085
1086 let filter_expr = self.storage.apply_version_filter(base_filter);
1087
1088 let batches: Vec<RecordBatch> = table
1089 .query()
1090 .only_if(&filter_expr)
1091 .select(Select::Columns(columns.clone()))
1092 .execute()
1093 .await
1094 .map_err(|e| {
1095 anyhow::anyhow!("failed to execute query on label '{}' table: {}", label, e)
1096 })?
1097 .try_collect()
1098 .await
1099 .map_err(|e| {
1100 anyhow::anyhow!(
1101 "failed to collect query results for label '{}': {}",
1102 label,
1103 e
1104 )
1105 })?;
1106
1107 let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
1108
1109 let mut per_vid_best_version: HashMap<Vid, u64> = HashMap::new();
1111 let mut per_vid_props: HashMap<Vid, Properties> = HashMap::new();
1112
1113 for batch in batches {
1114 let vid_col = match batch
1115 .column_by_name("_vid")
1116 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1117 {
1118 Some(c) => c,
1119 None => continue,
1120 };
1121 let deleted_col = match batch
1122 .column_by_name("_deleted")
1123 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1124 {
1125 Some(c) => c,
1126 None => continue,
1127 };
1128 let version_col = match batch
1129 .column_by_name("_version")
1130 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1131 {
1132 Some(c) => c,
1133 None => continue,
1134 };
1135
1136 for row in 0..batch.num_rows() {
1137 let vid = Vid::from(vid_col.value(row));
1138 let version = version_col.value(row);
1139
1140 if deleted_col.value(row) {
1141 if per_vid_best_version
1142 .get(&vid)
1143 .is_none_or(|&best| version >= best)
1144 {
1145 per_vid_best_version.insert(vid, version);
1146 per_vid_props.remove(&vid);
1147 }
1148 continue;
1149 }
1150
1151 let mut current_props =
1152 Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1153
1154 if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1155 for (k, v) in overflow_props {
1156 current_props.entry(k).or_insert(v);
1157 }
1158 }
1159
1160 let best = per_vid_best_version.get(&vid).copied();
1161 let mut best_opt = best;
1162 let mut merged = per_vid_props.remove(&vid);
1163 self.merge_versioned_props(
1164 current_props,
1165 version,
1166 &mut best_opt,
1167 &mut merged,
1168 label_props,
1169 )?;
1170 if let Some(v) = best_opt {
1171 per_vid_best_version.insert(vid, v);
1172 }
1173 if let Some(p) = merged {
1174 per_vid_props.insert(vid, p);
1175 }
1176 }
1177 }
1178
1179 for (vid, storage_props) in per_vid_props {
1181 let entry = result.entry(vid).or_default();
1182 for (k, v) in storage_props {
1183 entry.entry(k).or_insert(v);
1184 }
1185 }
1186
1187 if ctx.is_some() {
1192 for props in result.values_mut() {
1193 self.normalize_crdt_properties(props, label)?;
1194 }
1195 }
1196
1197 Ok(result)
1198 }
1199
1200 fn normalize_crdt_properties(&self, props: &mut Properties, label: &str) -> Result<()> {
1204 let schema = self.schema_manager.schema();
1205 let label_props = match schema.properties.get(label) {
1206 Some(p) => p,
1207 None => return Ok(()),
1208 };
1209
1210 for (prop_name, prop_meta) in label_props {
1211 if let DataType::Crdt(_) = prop_meta.r#type
1212 && let Some(val) = props.get_mut(prop_name)
1213 {
1214 *val = Value::from(Self::parse_crdt_value(val)?);
1215 }
1216 }
1217
1218 Ok(())
1219 }
1220
1221 fn extract_row_properties(
1223 batch: &RecordBatch,
1224 row: usize,
1225 prop_names: &[&str],
1226 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1227 ) -> Result<Properties> {
1228 let mut props = Properties::new();
1229 for name in prop_names {
1230 let col = match batch.column_by_name(name) {
1231 Some(col) => col,
1232 None => continue,
1233 };
1234 if col.is_null(row) {
1235 continue;
1236 }
1237 if let Some(prop_meta) = label_props.and_then(|p| p.get(*name)) {
1238 let val = Self::value_from_column(col.as_ref(), &prop_meta.r#type, row)?;
1239 props.insert((*name).to_string(), val);
1240 }
1241 }
1242 Ok(props)
1243 }
1244
1245 fn extract_overflow_properties(batch: &RecordBatch, row: usize) -> Result<Option<Properties>> {
1250 use arrow_array::LargeBinaryArray;
1251
1252 let overflow_col = match batch.column_by_name("overflow_json") {
1253 Some(col) => col,
1254 None => return Ok(None), };
1256
1257 if overflow_col.is_null(row) {
1258 return Ok(None);
1259 }
1260
1261 let binary_array = overflow_col
1262 .as_any()
1263 .downcast_ref::<LargeBinaryArray>()
1264 .ok_or_else(|| anyhow!("overflow_json is not LargeBinaryArray"))?;
1265
1266 let jsonb_bytes = binary_array.value(row);
1267
1268 let uni_val = uni_common::cypher_value_codec::decode(jsonb_bytes)
1270 .map_err(|e| anyhow!("Failed to decode CypherValue: {}", e))?;
1271 let json_val: serde_json::Value = uni_val.into();
1272
1273 let overflow_props: Properties = serde_json::from_value(json_val)
1275 .map_err(|e| anyhow!("Failed to parse overflow properties: {}", e))?;
1276
1277 Ok(Some(overflow_props))
1278 }
1279
1280 fn merge_overflow_into_props(
1287 batch: &RecordBatch,
1288 row: usize,
1289 properties: &[&str],
1290 props: &mut Properties,
1291 ) -> Result<()> {
1292 use arrow_array::LargeBinaryArray;
1293
1294 let overflow_col = match batch.column_by_name("overflow_json") {
1295 Some(col) if !col.is_null(row) => col,
1296 _ => return Ok(()),
1297 };
1298
1299 if properties.contains(&"overflow_json")
1301 && let Some(binary_array) = overflow_col.as_any().downcast_ref::<LargeBinaryArray>()
1302 {
1303 let jsonb_bytes = binary_array.value(row);
1304 let bytes_list: Vec<Value> =
1305 jsonb_bytes.iter().map(|&b| Value::Int(b as i64)).collect();
1306 props.insert("overflow_json".to_string(), Value::List(bytes_list));
1307 }
1308
1309 if let Some(overflow_props) = Self::extract_overflow_properties(batch, row)? {
1311 for (k, v) in overflow_props {
1312 if properties.contains(&k.as_str()) {
1313 props.entry(k).or_insert(v);
1314 }
1315 }
1316 }
1317
1318 Ok(())
1319 }
1320
1321 fn merge_crdt_into(
1323 &self,
1324 target: &mut Properties,
1325 source: Properties,
1326 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1327 crdt_only: bool,
1328 ) -> Result<()> {
1329 for (k, v) in source {
1330 if let Some(prop_meta) = label_props.and_then(|p| p.get(&k)) {
1331 if let DataType::Crdt(_) = prop_meta.r#type {
1332 let existing_v = target.entry(k).or_insert(Value::Null);
1333 *existing_v = self.merge_crdt_values(existing_v, &v)?;
1334 } else if !crdt_only {
1335 target.insert(k, v);
1336 }
1337 }
1338 }
1339 Ok(())
1340 }
1341
1342 fn merge_versioned_props(
1344 &self,
1345 current_props: Properties,
1346 version: u64,
1347 best_version: &mut Option<u64>,
1348 best_props: &mut Option<Properties>,
1349 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1350 ) -> Result<()> {
1351 if best_version.is_none_or(|best| version > best) {
1352 if let Some(mut existing_props) = best_props.take() {
1354 let mut merged = current_props;
1356 for (k, v) in merged.iter_mut() {
1357 if let Some(prop_meta) = label_props.and_then(|p| p.get(k))
1358 && let DataType::Crdt(_) = prop_meta.r#type
1359 && let Some(existing_val) = existing_props.remove(k)
1360 {
1361 *v = self.merge_crdt_values(v, &existing_val)?;
1362 }
1363 }
1364 *best_props = Some(merged);
1365 } else {
1366 *best_props = Some(current_props);
1367 }
1368 *best_version = Some(version);
1369 } else if Some(version) == *best_version {
1370 if let Some(existing_props) = best_props.as_mut() {
1372 self.merge_crdt_into(existing_props, current_props, label_props, false)?;
1373 } else {
1374 *best_props = Some(current_props);
1375 }
1376 } else {
1377 if let Some(existing_props) = best_props.as_mut() {
1379 self.merge_crdt_into(existing_props, current_props, label_props, true)?;
1380 }
1381 }
1382 Ok(())
1383 }
1384
1385 async fn fetch_all_props_from_storage(&self, vid: Vid) -> Result<Option<Properties>> {
1386 let schema = self.schema_manager.schema();
1389 let mut merged_props: Option<Properties> = None;
1390 let mut global_best_version: Option<u64> = None;
1391
1392 let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1394 {
1395 labels
1396 } else {
1397 schema.labels.keys().cloned().collect() };
1399
1400 for label_name in &label_names {
1401 let label_props = schema.properties.get(label_name);
1402
1403 let table = match self.storage.get_cached_table(label_name).await {
1404 Ok(t) => t,
1405 Err(_) => continue,
1406 };
1407
1408 let mut prop_names: Vec<String> = Vec::new();
1410 if let Some(props) = label_props {
1411 prop_names = props.keys().cloned().collect();
1412 }
1413
1414 let mut columns: Vec<String> = vec!["_deleted".to_string(), "_version".to_string()];
1416 columns.extend(prop_names.iter().cloned());
1417 columns.push("overflow_json".to_string());
1419
1420 let base_filter = format!("_vid = {}", vid.as_u64());
1422
1423 let filter_expr = self.storage.apply_version_filter(base_filter);
1424
1425 let batches: Vec<RecordBatch> = match table
1426 .query()
1427 .only_if(&filter_expr)
1428 .select(Select::Columns(columns.clone()))
1429 .execute()
1430 .await
1431 {
1432 Ok(stream) => match stream.try_collect().await {
1433 Ok(b) => b,
1434 Err(_) => continue,
1435 },
1436 Err(_) => continue,
1437 };
1438
1439 let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
1441
1442 for batch in batches {
1443 let deleted_col = match batch
1444 .column_by_name("_deleted")
1445 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1446 {
1447 Some(c) => c,
1448 None => continue,
1449 };
1450 let version_col = match batch
1451 .column_by_name("_version")
1452 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1453 {
1454 Some(c) => c,
1455 None => continue,
1456 };
1457
1458 for row in 0..batch.num_rows() {
1459 let version = version_col.value(row);
1460
1461 if deleted_col.value(row) {
1462 if global_best_version.is_none_or(|best| version >= best) {
1463 global_best_version = Some(version);
1464 merged_props = None;
1465 }
1466 continue;
1467 }
1468
1469 let mut current_props =
1470 Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1471
1472 if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1474 for (k, v) in overflow_props {
1476 current_props.entry(k).or_insert(v);
1477 }
1478 }
1479
1480 self.merge_versioned_props(
1481 current_props,
1482 version,
1483 &mut global_best_version,
1484 &mut merged_props,
1485 label_props,
1486 )?;
1487 }
1488 }
1489 }
1490
1491 if merged_props.is_none()
1493 && let Some(main_props) = MainVertexDataset::find_props_by_vid(
1494 self.storage.lancedb_store(),
1495 vid,
1496 self.storage.version_high_water_mark(),
1497 )
1498 .await?
1499 {
1500 return Ok(Some(main_props));
1501 }
1502
1503 Ok(merged_props)
1504 }
1505
1506 pub async fn get_vertex_prop(&self, vid: Vid, prop: &str) -> Result<Value> {
1507 self.get_vertex_prop_with_ctx(vid, prop, None).await
1508 }
1509
1510 #[instrument(skip(self, ctx), level = "trace")]
1511 pub async fn get_vertex_prop_with_ctx(
1512 &self,
1513 vid: Vid,
1514 prop: &str,
1515 ctx: Option<&QueryContext>,
1516 ) -> Result<Value> {
1517 if l0_visibility::is_vertex_deleted(vid, ctx) {
1519 return Ok(Value::Null);
1520 }
1521
1522 let schema = self.schema_manager.schema();
1525 let labels = ctx
1526 .map(|c| l0_visibility::get_vertex_labels(vid, c))
1527 .unwrap_or_default();
1528
1529 let is_crdt = if !labels.is_empty() {
1530 labels.iter().any(|ln| {
1532 schema
1533 .properties
1534 .get(ln)
1535 .and_then(|lp| lp.get(prop))
1536 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1537 .unwrap_or(false)
1538 })
1539 } else {
1540 schema.properties.values().any(|label_props| {
1542 label_props
1543 .get(prop)
1544 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1545 .unwrap_or(false)
1546 })
1547 };
1548
1549 if is_crdt {
1551 let l0_val = self.accumulate_crdt_from_l0(vid, prop, ctx)?;
1553 return self.finalize_crdt_lookup(vid, prop, l0_val).await;
1554 }
1555
1556 if let Some(val) = l0_visibility::lookup_vertex_prop(vid, prop, ctx) {
1558 return Ok(val);
1559 }
1560
1561 if let Some(ref cache) = self.vertex_cache {
1563 let mut cache = cache.lock().await;
1564 if let Some(val) = cache.get(&(vid, prop.to_string())) {
1565 debug!(vid = ?vid, prop, "Cache HIT");
1566 metrics::counter!("uni_property_cache_hits_total", "type" => "vertex").increment(1);
1567 return Ok(val.clone());
1568 } else {
1569 debug!(vid = ?vid, prop, "Cache MISS");
1570 metrics::counter!("uni_property_cache_misses_total", "type" => "vertex")
1571 .increment(1);
1572 }
1573 }
1574
1575 let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1577
1578 if let Some(ref cache) = self.vertex_cache {
1580 let mut cache = cache.lock().await;
1581 cache.put((vid, prop.to_string()), storage_val.clone());
1582 }
1583
1584 Ok(storage_val)
1585 }
1586
1587 fn accumulate_crdt_from_l0(
1589 &self,
1590 vid: Vid,
1591 prop: &str,
1592 ctx: Option<&QueryContext>,
1593 ) -> Result<Value> {
1594 let mut merged = Value::Null;
1595 l0_visibility::visit_l0_buffers(ctx, |l0| {
1596 if let Some(props) = l0.vertex_properties.get(&vid)
1597 && let Some(val) = props.get(prop)
1598 {
1599 if let Ok(new_merged) = self.merge_crdt_values(&merged, val) {
1601 merged = new_merged;
1602 }
1603 }
1604 false });
1606 Ok(merged)
1607 }
1608
1609 async fn finalize_crdt_lookup(&self, vid: Vid, prop: &str, l0_val: Value) -> Result<Value> {
1611 let cached_val = if let Some(ref cache) = self.vertex_cache {
1613 let mut cache = cache.lock().await;
1614 cache.get(&(vid, prop.to_string())).cloned()
1615 } else {
1616 None
1617 };
1618
1619 if let Some(val) = cached_val {
1620 let merged = self.merge_crdt_values(&val, &l0_val)?;
1621 return Ok(merged);
1622 }
1623
1624 let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1626
1627 if let Some(ref cache) = self.vertex_cache {
1629 let mut cache = cache.lock().await;
1630 cache.put((vid, prop.to_string()), storage_val.clone());
1631 }
1632
1633 self.merge_crdt_values(&storage_val, &l0_val)
1635 }
1636
1637 async fn fetch_prop_from_storage(&self, vid: Vid, prop: &str) -> Result<Value> {
1638 let schema = self.schema_manager.schema();
1641 let mut best_version: Option<u64> = None;
1642 let mut best_value: Option<Value> = None;
1643
1644 let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1646 {
1647 labels
1648 } else {
1649 schema.labels.keys().cloned().collect() };
1651
1652 for label_name in &label_names {
1653 let prop_meta = schema
1655 .properties
1656 .get(label_name)
1657 .and_then(|props| props.get(prop));
1658
1659 let table = match self.storage.get_cached_table(label_name).await {
1661 Ok(t) => t,
1662 Err(_) => continue,
1663 };
1664
1665 let base_filter = format!("_vid = {}", vid.as_u64());
1667
1668 let filter_expr = self.storage.apply_version_filter(base_filter);
1669
1670 let mut columns = vec![
1672 "_deleted".to_string(),
1673 "_version".to_string(),
1674 "overflow_json".to_string(),
1675 ];
1676
1677 if prop_meta.is_some() {
1679 columns.push(prop.to_string());
1680 }
1681
1682 let batches: Vec<RecordBatch> = match table
1683 .query()
1684 .only_if(&filter_expr)
1685 .select(Select::Columns(columns))
1686 .execute()
1687 .await
1688 {
1689 Ok(stream) => match stream.try_collect().await {
1690 Ok(b) => b,
1691 Err(_) => continue,
1692 },
1693 Err(_) => continue,
1694 };
1695
1696 for batch in batches {
1697 let deleted_col = match batch
1698 .column_by_name("_deleted")
1699 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1700 {
1701 Some(c) => c,
1702 None => continue,
1703 };
1704 let version_col = match batch
1705 .column_by_name("_version")
1706 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1707 {
1708 Some(c) => c,
1709 None => continue,
1710 };
1711 for row in 0..batch.num_rows() {
1712 let version = version_col.value(row);
1713
1714 if deleted_col.value(row) {
1715 if best_version.is_none_or(|best| version >= best) {
1716 best_version = Some(version);
1717 best_value = None;
1718 }
1719 continue;
1720 }
1721
1722 let mut val = None;
1724 if let Some(meta) = prop_meta
1725 && let Some(col) = batch.column_by_name(prop)
1726 {
1727 val = Some(if col.is_null(row) {
1728 Value::Null
1729 } else {
1730 Self::value_from_column(col, &meta.r#type, row)?
1731 });
1732 }
1733
1734 if val.is_none()
1736 && let Some(overflow_props) =
1737 Self::extract_overflow_properties(&batch, row)?
1738 && let Some(overflow_val) = overflow_props.get(prop)
1739 {
1740 val = Some(overflow_val.clone());
1741 }
1742
1743 if let Some(v) = val {
1745 if let Some(meta) = prop_meta {
1746 self.merge_prop_value(
1748 v,
1749 version,
1750 &meta.r#type,
1751 &mut best_version,
1752 &mut best_value,
1753 )?;
1754 } else {
1755 if best_version.is_none_or(|best| version >= best) {
1757 best_version = Some(version);
1758 best_value = Some(v);
1759 }
1760 }
1761 }
1762 }
1763 }
1764 }
1765 Ok(best_value.unwrap_or(Value::Null))
1766 }
1767
1768 pub fn value_from_column(col: &dyn Array, data_type: &DataType, row: usize) -> Result<Value> {
1770 match data_type {
1775 DataType::DateTime | DataType::Timestamp | DataType::Date | DataType::Time => Ok(
1776 crate::storage::arrow_convert::arrow_to_value(col, row, Some(data_type)),
1777 ),
1778 _ => value_codec::value_from_column(col, data_type, row, CrdtDecodeMode::Strict)
1779 .map(Value::from),
1780 }
1781 }
1782
1783 pub(crate) fn merge_crdt_values(&self, a: &Value, b: &Value) -> Result<Value> {
1784 if a.is_null() {
1788 return Self::parse_crdt_value(b).map(Value::from);
1789 }
1790 if b.is_null() {
1791 return Self::parse_crdt_value(a).map(Value::from);
1792 }
1793
1794 let a_parsed = Self::parse_crdt_value(a)?;
1795 let b_parsed = Self::parse_crdt_value(b)?;
1796
1797 let mut crdt_a: Crdt = serde_json::from_value(a_parsed)?;
1798 let crdt_b: Crdt = serde_json::from_value(b_parsed)?;
1799 crdt_a
1800 .try_merge(&crdt_b)
1801 .map_err(|e| anyhow::anyhow!("{e}"))?;
1802 Ok(Value::from(serde_json::to_value(crdt_a)?))
1803 }
1804
1805 fn parse_crdt_value(val: &Value) -> Result<serde_json::Value> {
1808 if let Value::String(s) = val {
1809 serde_json::from_str(s).map_err(|e| anyhow!("Failed to parse CRDT JSON string: {}", e))
1811 } else {
1812 Ok(serde_json::Value::from(val.clone()))
1814 }
1815 }
1816
1817 fn merge_prop_value(
1819 &self,
1820 val: Value,
1821 version: u64,
1822 data_type: &DataType,
1823 best_version: &mut Option<u64>,
1824 best_value: &mut Option<Value>,
1825 ) -> Result<()> {
1826 if let DataType::Crdt(_) = data_type {
1827 self.merge_crdt_prop_value(val, version, best_version, best_value)
1828 } else {
1829 if best_version.is_none_or(|best| version >= best) {
1831 *best_version = Some(version);
1832 *best_value = Some(val);
1833 }
1834 Ok(())
1835 }
1836 }
1837
1838 fn merge_crdt_prop_value(
1840 &self,
1841 val: Value,
1842 version: u64,
1843 best_version: &mut Option<u64>,
1844 best_value: &mut Option<Value>,
1845 ) -> Result<()> {
1846 if best_version.is_none_or(|best| version > best) {
1847 if let Some(existing) = best_value.take() {
1849 *best_value = Some(self.merge_crdt_values(&val, &existing)?);
1850 } else {
1851 *best_value = Some(val);
1852 }
1853 *best_version = Some(version);
1854 } else if Some(version) == *best_version {
1855 let existing = best_value.get_or_insert(Value::Null);
1857 *existing = self.merge_crdt_values(existing, &val)?;
1858 } else {
1859 if let Some(existing) = best_value.as_mut() {
1861 *existing = self.merge_crdt_values(existing, &val)?;
1862 }
1863 }
1864 Ok(())
1865 }
1866}