1use crate::runtime::context::QueryContext;
5use crate::runtime::l0::L0Buffer;
6use crate::runtime::l0_visibility;
7use crate::storage::main_vertex::MainVertexDataset;
8use crate::storage::manager::StorageManager;
9use crate::storage::value_codec::{self, CrdtDecodeMode};
10use anyhow::{Result, anyhow};
11use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
12use futures::TryStreamExt;
13use lancedb::query::{ExecutableQuery, QueryBase, Select};
14use lru::LruCache;
15use metrics;
16use std::collections::HashMap;
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use tokio::sync::Mutex;
20use tracing::{debug, instrument};
21use uni_common::Properties;
22use uni_common::Value;
23use uni_common::core::id::{Eid, Vid};
24use uni_common::core::schema::{DataType, SchemaManager};
25use uni_crdt::Crdt;
26
27pub struct PropertyManager {
28 storage: Arc<StorageManager>,
29 schema_manager: Arc<SchemaManager>,
30 vertex_cache: Option<Mutex<LruCache<(Vid, String), Value>>>,
32 edge_cache: Option<Mutex<LruCache<(uni_common::core::id::Eid, String), Value>>>,
33 cache_capacity: usize,
34}
35
36impl PropertyManager {
37 pub fn new(
38 storage: Arc<StorageManager>,
39 schema_manager: Arc<SchemaManager>,
40 capacity: usize,
41 ) -> Self {
42 let (vertex_cache, edge_cache) = if capacity == 0 {
44 (None, None)
45 } else {
46 let cap = NonZeroUsize::new(capacity).unwrap();
47 (
48 Some(Mutex::new(LruCache::new(cap))),
49 Some(Mutex::new(LruCache::new(cap))),
50 )
51 };
52
53 Self {
54 storage,
55 schema_manager,
56 vertex_cache,
57 edge_cache,
58 cache_capacity: capacity,
59 }
60 }
61
62 pub fn cache_size(&self) -> usize {
63 self.cache_capacity
64 }
65
66 pub fn caching_enabled(&self) -> bool {
68 self.cache_capacity > 0
69 }
70
71 pub async fn clear_cache(&self) {
74 if let Some(ref cache) = self.vertex_cache {
75 cache.lock().await.clear();
76 }
77 if let Some(ref cache) = self.edge_cache {
78 cache.lock().await.clear();
79 }
80 }
81
82 pub async fn invalidate_vertex(&self, _vid: Vid) {
84 if let Some(ref cache) = self.vertex_cache {
85 let mut cache = cache.lock().await;
86 cache.clear();
90 }
91 }
92
93 pub async fn invalidate_edge(&self, _eid: uni_common::core::id::Eid) {
95 if let Some(ref cache) = self.edge_cache {
96 let mut cache = cache.lock().await;
97 cache.clear();
99 }
100 }
101
102 #[instrument(skip(self, ctx), level = "trace")]
103 pub async fn get_edge_prop(
104 &self,
105 eid: uni_common::core::id::Eid,
106 prop: &str,
107 ctx: Option<&QueryContext>,
108 ) -> Result<Value> {
109 if l0_visibility::is_edge_deleted(eid, ctx) {
111 return Ok(Value::Null);
112 }
113
114 if let Some(val) = l0_visibility::lookup_edge_prop(eid, prop, ctx) {
116 return Ok(val);
117 }
118
119 if let Some(ref cache) = self.edge_cache {
121 let mut cache = cache.lock().await;
122 if let Some(val) = cache.get(&(eid, prop.to_string())) {
123 debug!(eid = ?eid, prop, "Cache HIT");
124 metrics::counter!("uni_property_cache_hits_total", "type" => "edge").increment(1);
125 return Ok(val.clone());
126 } else {
127 debug!(eid = ?eid, prop, "Cache MISS");
128 metrics::counter!("uni_property_cache_misses_total", "type" => "edge").increment(1);
129 }
130 }
131
132 let all = self.get_all_edge_props_with_ctx(eid, ctx).await?;
134 let val = all
135 .as_ref()
136 .and_then(|props| props.get(prop).cloned())
137 .unwrap_or(Value::Null);
138
139 if let Some(ref cache) = self.edge_cache {
141 let mut cache = cache.lock().await;
142 if let Some(ref props) = all {
143 for (prop_name, prop_val) in props {
144 cache.put((eid, prop_name.clone()), prop_val.clone());
145 }
146 } else {
147 cache.put((eid, prop.to_string()), Value::Null);
149 }
150 }
151
152 Ok(val)
153 }
154
155 pub async fn get_all_edge_props_with_ctx(
156 &self,
157 eid: uni_common::core::id::Eid,
158 ctx: Option<&QueryContext>,
159 ) -> Result<Option<Properties>> {
160 if l0_visibility::is_edge_deleted(eid, ctx) {
162 return Ok(None);
163 }
164
165 let mut final_props = l0_visibility::accumulate_edge_props(eid, ctx).unwrap_or_default();
167
168 let storage_props = self.fetch_all_edge_props_from_storage(eid).await?;
170
171 if final_props.is_empty() && storage_props.is_none() {
173 if l0_visibility::edge_exists_in_l0(eid, ctx) {
174 return Ok(Some(Properties::new()));
175 }
176 return Ok(None);
177 }
178
179 if let Some(sp) = storage_props {
181 for (k, v) in sp {
182 final_props.entry(k).or_insert(v);
183 }
184 }
185
186 Ok(Some(final_props))
187 }
188
189 async fn fetch_all_edge_props_from_storage(&self, eid: Eid) -> Result<Option<Properties>> {
190 self.fetch_all_edge_props_from_storage_with_hint(eid, None)
192 .await
193 }
194
195 async fn fetch_all_edge_props_from_storage_with_hint(
196 &self,
197 eid: Eid,
198 type_name_hint: Option<&str>,
199 ) -> Result<Option<Properties>> {
200 let schema = self.schema_manager.schema();
201 let lancedb_store = self.storage.lancedb_store();
202
203 let type_names: Vec<&str> = if let Some(hint) = type_name_hint {
205 vec![hint]
206 } else {
207 schema.edge_types.keys().map(|s| s.as_str()).collect()
209 };
210
211 for type_name in type_names {
212 let type_props = schema.properties.get(type_name);
213
214 let delta_ds = match self.storage.delta_dataset(type_name, "fwd") {
217 Ok(ds) => ds,
218 Err(_) => continue, };
220
221 let table = match delta_ds.open_lancedb(lancedb_store).await {
223 Ok(t) => t,
224 Err(_) => continue, };
226
227 use lancedb::query::{ExecutableQuery, QueryBase};
228
229 let base_filter = format!("eid = {}", eid.as_u64());
230
231 let filter_expr = self.storage.apply_version_filter(base_filter);
232
233 let query = table.query().only_if(filter_expr);
234 let stream = match query.execute().await {
235 Ok(s) => s,
236 Err(_) => continue,
237 };
238
239 let batches: Vec<arrow_array::RecordBatch> =
240 stream.try_collect().await.unwrap_or_default();
241
242 let mut rows: Vec<(u64, u8, Properties)> = Vec::new();
244
245 for batch in batches {
246 let op_col = match batch.column_by_name("op") {
247 Some(c) => c
248 .as_any()
249 .downcast_ref::<arrow_array::UInt8Array>()
250 .unwrap(),
251 None => continue,
252 };
253 let ver_col = match batch.column_by_name("_version") {
254 Some(c) => c.as_any().downcast_ref::<UInt64Array>().unwrap(),
255 None => continue,
256 };
257
258 for row in 0..batch.num_rows() {
259 let ver = ver_col.value(row);
260 let op = op_col.value(row);
261 let mut props = Properties::new();
262
263 if op != 1 {
264 if let Some(tp) = type_props {
266 for (p_name, p_meta) in tp {
267 if let Some(col) = batch.column_by_name(p_name)
268 && !col.is_null(row)
269 {
270 let val =
271 Self::value_from_column(col.as_ref(), &p_meta.r#type, row)?;
272 props.insert(p_name.clone(), val);
273 }
274 }
275 }
276 }
277 rows.push((ver, op, props));
278 }
279 }
280
281 if rows.is_empty() {
282 continue;
283 }
284
285 rows.sort_by_key(|(ver, _, _)| *ver);
287
288 let mut merged_props: Properties = Properties::new();
292 let mut is_deleted = false;
293
294 for (_, op, props) in rows {
295 if op == 1 {
296 is_deleted = true;
298 merged_props.clear();
299 } else {
300 is_deleted = false;
301 for (p_name, p_val) in props {
302 let is_crdt = type_props
304 .and_then(|tp| tp.get(&p_name))
305 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
306 .unwrap_or(false);
307
308 if is_crdt {
309 if let Some(existing) = merged_props.get(&p_name) {
311 if let Ok(merged) = self.merge_crdt_values(existing, &p_val) {
312 merged_props.insert(p_name, merged);
313 }
314 } else {
315 merged_props.insert(p_name, p_val);
316 }
317 } else {
318 merged_props.insert(p_name, p_val);
320 }
321 }
322 }
323 }
324
325 if is_deleted {
326 return Ok(None);
327 }
328
329 if !merged_props.is_empty() {
330 return Ok(Some(merged_props));
331 }
332 }
333
334 use crate::storage::main_edge::MainEdgeDataset;
336 if let Some(props) = MainEdgeDataset::find_props_by_eid(lancedb_store, eid).await? {
337 return Ok(Some(props));
338 }
339
340 Ok(None)
341 }
342
343 pub async fn get_batch_vertex_props(
345 &self,
346 vids: &[Vid],
347 properties: &[&str],
348 ctx: Option<&QueryContext>,
349 ) -> Result<HashMap<Vid, Properties>> {
350 let schema = self.schema_manager.schema();
351 let mut result = HashMap::new();
352 if vids.is_empty() {
353 return Ok(result);
354 }
355
356 let labels_to_scan: Vec<String> = {
361 let mut needed: std::collections::HashSet<String> = std::collections::HashSet::new();
362 let mut all_resolved = true;
363 for &vid in vids {
364 if let Some(labels) = self.storage.get_labels_from_index(vid) {
365 needed.extend(labels);
366 } else {
367 all_resolved = false;
368 break;
369 }
370 }
371 if all_resolved {
372 needed.into_iter().collect()
373 } else {
374 schema.labels.keys().cloned().collect() }
376 };
377
378 for label_name in &labels_to_scan {
380 let label_schema_props = schema.properties.get(label_name);
382 let valid_props: Vec<&str> = properties
383 .iter()
384 .cloned()
385 .filter(|p| label_schema_props.is_some_and(|props| props.contains_key(*p)))
386 .collect();
387 let ds = self.storage.vertex_dataset(label_name)?;
390 let lancedb_store = self.storage.lancedb_store();
391 let table = match ds.open_lancedb(lancedb_store).await {
392 Ok(t) => t,
393 Err(_) => continue,
394 };
395
396 let vid_list = vids
398 .iter()
399 .map(|v| v.as_u64().to_string())
400 .collect::<Vec<_>>()
401 .join(",");
402 let base_filter = format!("_vid IN ({})", vid_list);
403
404 let final_filter = self.storage.apply_version_filter(base_filter);
405
406 let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
408 columns.push("_vid".to_string());
409 columns.push("_version".to_string());
410 columns.push("_deleted".to_string());
411 columns.extend(valid_props.iter().map(|s| s.to_string()));
412 columns.push("overflow_json".to_string());
414
415 let query = table
416 .query()
417 .only_if(final_filter)
418 .select(Select::Columns(columns));
419
420 let stream = match query.execute().await {
421 Ok(s) => s,
422 Err(_) => continue,
423 };
424
425 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
426 for batch in batches {
427 let vid_col = match batch.column_by_name("_vid") {
428 Some(col) => col.as_any().downcast_ref::<UInt64Array>().unwrap(),
429 None => continue,
430 };
431 let del_col = match batch.column_by_name("_deleted") {
432 Some(col) => col.as_any().downcast_ref::<BooleanArray>().unwrap(),
433 None => continue,
434 };
435
436 for row in 0..batch.num_rows() {
437 let vid = Vid::from(vid_col.value(row));
438
439 if del_col.value(row) {
440 result.remove(&vid);
441 continue;
442 }
443
444 let label_props = schema.properties.get(label_name);
445 let mut props =
446 Self::extract_row_properties(&batch, row, &valid_props, label_props)?;
447 Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
448 result.insert(vid, props);
449 }
450 }
451 }
452
453 if let Some(ctx) = ctx {
455 for pending_l0_arc in &ctx.pending_flush_l0s {
457 let pending_l0 = pending_l0_arc.read();
458 self.overlay_l0_batch(vids, &pending_l0, properties, &mut result);
459 }
460
461 let l0 = ctx.l0.read();
463 self.overlay_l0_batch(vids, &l0, properties, &mut result);
464
465 if self.storage.version_high_water_mark().is_none()
469 && let Some(tx_l0_arc) = &ctx.transaction_l0
470 {
471 let tx_l0 = tx_l0_arc.read();
472 self.overlay_l0_batch(vids, &tx_l0, properties, &mut result);
473 }
474 }
475
476 Ok(result)
477 }
478
479 fn overlay_l0_batch(
480 &self,
481 vids: &[Vid],
482 l0: &L0Buffer,
483 properties: &[&str],
484 result: &mut HashMap<Vid, Properties>,
485 ) {
486 let schema = self.schema_manager.schema();
487 for &vid in vids {
488 if l0.vertex_tombstones.contains(&vid) {
490 result.remove(&vid);
491 continue;
492 }
493 if let Some(l0_props) = l0.vertex_properties.get(&vid) {
495 let entry_version = l0.vertex_versions.get(&vid).copied().unwrap_or(0);
497 if self
498 .storage
499 .version_high_water_mark()
500 .is_some_and(|hwm| entry_version > hwm)
501 {
502 continue;
503 }
504
505 let entry = result.entry(vid).or_default();
506 let labels = l0.get_vertex_labels(vid);
508
509 for (k, v) in l0_props {
510 if properties.contains(&k.as_str()) {
511 let is_crdt = labels
513 .and_then(|label_list| {
514 label_list.iter().find_map(|ln| {
515 schema
516 .properties
517 .get(ln)
518 .and_then(|lp| lp.get(k))
519 .filter(|pm| matches!(pm.r#type, DataType::Crdt(_)))
520 })
521 })
522 .is_some();
523
524 if is_crdt {
525 let existing = entry.entry(k.clone()).or_insert(Value::Null);
526 *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
527 } else {
528 entry.insert(k.clone(), v.clone());
529 }
530 }
531 }
532 }
533 }
534 }
535
536 pub async fn get_batch_edge_props(
539 &self,
540 eids: &[uni_common::core::id::Eid],
541 properties: &[&str],
542 ctx: Option<&QueryContext>,
543 ) -> Result<HashMap<Vid, Properties>> {
544 let schema = self.schema_manager.schema();
545 let mut result = HashMap::new();
546 if eids.is_empty() {
547 return Ok(result);
548 }
549
550 let types_to_scan: Vec<String> = {
555 if let Some(ctx) = ctx {
556 let mut needed: std::collections::HashSet<String> =
557 std::collections::HashSet::new();
558 let mut all_resolved = true;
559 for &eid in eids {
560 if let Some(etype) = ctx.l0.read().get_edge_type(eid) {
561 needed.insert(etype.to_string());
562 } else {
563 all_resolved = false;
564 break;
565 }
566 }
567 if all_resolved {
568 needed.into_iter().collect()
569 } else {
570 schema.edge_types.keys().cloned().collect() }
572 } else {
573 schema.edge_types.keys().cloned().collect() }
575 };
576
577 for type_name in &types_to_scan {
579 let type_props = schema.properties.get(type_name);
580 let valid_props: Vec<&str> = properties
581 .iter()
582 .cloned()
583 .filter(|p| type_props.is_some_and(|props| props.contains_key(*p)))
584 .collect();
585 let delta_ds = match self.storage.delta_dataset(type_name, "fwd") {
588 Ok(ds) => ds,
589 Err(_) => continue,
590 };
591 let lancedb_store = self.storage.lancedb_store();
592 let table = match delta_ds.open_lancedb(lancedb_store).await {
593 Ok(t) => t,
594 Err(_) => continue,
595 };
596
597 let eid_list = eids
598 .iter()
599 .map(|e| e.as_u64().to_string())
600 .collect::<Vec<_>>()
601 .join(",");
602 let base_filter = format!("eid IN ({})", eid_list);
603
604 let final_filter = self.storage.apply_version_filter(base_filter);
605
606 let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
608 columns.push("eid".to_string());
609 columns.push("_version".to_string());
610 columns.push("op".to_string());
611 columns.extend(valid_props.iter().map(|s| s.to_string()));
612 columns.push("overflow_json".to_string());
614
615 let query = table
616 .query()
617 .only_if(final_filter)
618 .select(Select::Columns(columns));
619
620 let stream = match query.execute().await {
621 Ok(s) => s,
622 Err(_) => continue,
623 };
624
625 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
626 for batch in batches {
627 let eid_col = match batch.column_by_name("eid") {
628 Some(col) => col.as_any().downcast_ref::<UInt64Array>().unwrap(),
629 None => continue,
630 };
631 let op_col = match batch.column_by_name("op") {
632 Some(col) => col
633 .as_any()
634 .downcast_ref::<arrow_array::UInt8Array>()
635 .unwrap(),
636 None => continue,
637 };
638
639 for row in 0..batch.num_rows() {
640 let eid = uni_common::core::id::Eid::from(eid_col.value(row));
641
642 if op_col.value(row) == 1 {
644 result.remove(&Vid::from(eid.as_u64()));
645 continue;
646 }
647
648 let mut props =
649 Self::extract_row_properties(&batch, row, &valid_props, type_props)?;
650 Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
651 result.insert(Vid::from(eid.as_u64()), props);
653 }
654 }
655 }
656
657 if let Some(ctx) = ctx {
659 for pending_l0_arc in &ctx.pending_flush_l0s {
661 let pending_l0 = pending_l0_arc.read();
662 self.overlay_l0_edge_batch(eids, &pending_l0, properties, &mut result);
663 }
664
665 let l0 = ctx.l0.read();
667 self.overlay_l0_edge_batch(eids, &l0, properties, &mut result);
668
669 if self.storage.version_high_water_mark().is_none()
673 && let Some(tx_l0_arc) = &ctx.transaction_l0
674 {
675 let tx_l0 = tx_l0_arc.read();
676 self.overlay_l0_edge_batch(eids, &tx_l0, properties, &mut result);
677 }
678 }
679
680 Ok(result)
681 }
682
683 fn overlay_l0_edge_batch(
684 &self,
685 eids: &[uni_common::core::id::Eid],
686 l0: &L0Buffer,
687 properties: &[&str],
688 result: &mut HashMap<Vid, Properties>,
689 ) {
690 let schema = self.schema_manager.schema();
691 for &eid in eids {
692 let vid_key = Vid::from(eid.as_u64());
693 if l0.tombstones.contains_key(&eid) {
694 result.remove(&vid_key);
695 continue;
696 }
697 if let Some(l0_props) = l0.edge_properties.get(&eid) {
698 let entry_version = l0.edge_versions.get(&eid).copied().unwrap_or(0);
700 if self
701 .storage
702 .version_high_water_mark()
703 .is_some_and(|hwm| entry_version > hwm)
704 {
705 continue;
706 }
707
708 let entry = result.entry(vid_key).or_default();
709 let type_name = l0.get_edge_type(eid);
711
712 let include_all = properties.contains(&"_all_props");
713 for (k, v) in l0_props {
714 if include_all || properties.contains(&k.as_str()) {
715 let is_crdt = type_name
717 .and_then(|tn| schema.properties.get(tn))
718 .and_then(|tp| tp.get(k))
719 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
720 .unwrap_or(false);
721
722 if is_crdt {
723 let existing = entry.entry(k.clone()).or_insert(Value::Null);
724 *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
725 } else {
726 entry.insert(k.clone(), v.clone());
727 }
728 }
729 }
730 }
731 }
732 }
733
734 pub async fn load_properties_columnar(
735 &self,
736 vids: &UInt64Array,
737 properties: &[&str],
738 ctx: Option<&QueryContext>,
739 ) -> Result<RecordBatch> {
740 let mut vid_vec = Vec::with_capacity(vids.len());
758 for i in 0..vids.len() {
759 vid_vec.push(Vid::from(vids.value(i)));
760 }
761
762 let _props_map = self
763 .get_batch_vertex_props(&vid_vec, properties, ctx)
764 .await?;
765
766 Err(anyhow!(
800 "Columnar property load not fully implemented yet - use batch load"
801 ))
802 }
803
804 pub async fn get_batch_labels(
806 &self,
807 vids: &[Vid],
808 ctx: Option<&QueryContext>,
809 ) -> Result<HashMap<Vid, Vec<String>>> {
810 let mut result = HashMap::new();
811 if vids.is_empty() {
812 return Ok(result);
813 }
814
815 if let Some(ctx) = ctx {
817 let mut collect_labels = |l0: &L0Buffer| {
818 for &vid in vids {
819 if let Some(labels) = l0.get_vertex_labels(vid) {
820 result
821 .entry(vid)
822 .or_default()
823 .extend(labels.iter().cloned());
824 }
825 }
826 };
827
828 for l0_arc in &ctx.pending_flush_l0s {
829 collect_labels(&l0_arc.read());
830 }
831 collect_labels(&ctx.l0.read());
832 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
833 collect_labels(&tx_l0_arc.read());
834 }
835 }
836
837 let mut vids_needing_lancedb = Vec::new();
839
840 fn merge_labels(existing: &mut Vec<String>, new_labels: Vec<String>) {
842 for l in new_labels {
843 if !existing.contains(&l) {
844 existing.push(l);
845 }
846 }
847 }
848
849 for &vid in vids {
850 if result.contains_key(&vid) {
851 continue; }
853
854 if let Some(labels) = self.storage.get_labels_from_index(vid) {
855 merge_labels(result.entry(vid).or_default(), labels);
856 } else {
857 vids_needing_lancedb.push(vid);
858 }
859 }
860
861 if !vids_needing_lancedb.is_empty() {
863 let lancedb_store = self.storage.lancedb_store();
864 let version = self.storage.version_high_water_mark();
865 let storage_labels = MainVertexDataset::find_batch_labels_by_vids(
866 lancedb_store,
867 &vids_needing_lancedb,
868 version,
869 )
870 .await?;
871
872 for (vid, labels) in storage_labels {
873 merge_labels(result.entry(vid).or_default(), labels);
874 }
875 }
876
877 for labels in result.values_mut() {
879 labels.sort();
880 labels.dedup();
881 }
882
883 Ok(result)
884 }
885
886 pub async fn get_all_vertex_props(&self, vid: Vid) -> Result<Properties> {
887 Ok(self
888 .get_all_vertex_props_with_ctx(vid, None)
889 .await?
890 .unwrap_or_default())
891 }
892
893 pub async fn get_all_vertex_props_with_ctx(
894 &self,
895 vid: Vid,
896 ctx: Option<&QueryContext>,
897 ) -> Result<Option<Properties>> {
898 if l0_visibility::is_vertex_deleted(vid, ctx) {
900 return Ok(None);
901 }
902
903 let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
905
906 let storage_props_opt = self.fetch_all_props_from_storage(vid).await?;
908
909 if l0_props.is_none() && storage_props_opt.is_none() {
911 return Ok(None);
912 }
913
914 let mut final_props = l0_props.unwrap_or_default();
915
916 if let Some(storage_props) = storage_props_opt {
918 for (k, v) in storage_props {
919 final_props.entry(k).or_insert(v);
920 }
921 }
922
923 if let Some(ctx) = ctx {
926 let labels = l0_visibility::get_vertex_labels(vid, ctx);
928 for label in &labels {
929 self.normalize_crdt_properties(&mut final_props, label)?;
930 }
931 }
932
933 Ok(Some(final_props))
934 }
935
936 pub async fn get_batch_vertex_props_for_label(
942 &self,
943 vids: &[Vid],
944 label: &str,
945 ctx: Option<&QueryContext>,
946 ) -> Result<HashMap<Vid, Properties>> {
947 let mut result: HashMap<Vid, Properties> = HashMap::new();
948 let mut need_storage: Vec<Vid> = Vec::new();
949
950 for &vid in vids {
952 if l0_visibility::is_vertex_deleted(vid, ctx) {
953 continue;
954 }
955 let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
956 if let Some(props) = l0_props {
957 result.insert(vid, props);
958 } else {
959 need_storage.push(vid);
960 }
961 }
962
963 if need_storage.is_empty() {
965 if ctx.is_some() {
967 for props in result.values_mut() {
968 self.normalize_crdt_properties(props, label)?;
969 }
970 }
971 return Ok(result);
972 }
973
974 let schema = self.schema_manager.schema();
976 let label_props = schema.properties.get(label);
977
978 let table = match self.storage.get_cached_table(label).await {
979 Ok(t) => t,
980 Err(_) => {
981 return Ok(result);
983 }
984 };
985
986 let mut prop_names: Vec<String> = Vec::new();
987 if let Some(props) = label_props {
988 prop_names = props.keys().cloned().collect();
989 }
990
991 let mut columns: Vec<String> = vec![
992 "_vid".to_string(),
993 "_deleted".to_string(),
994 "_version".to_string(),
995 ];
996 columns.extend(prop_names.iter().cloned());
997 columns.push("overflow_json".to_string());
998
999 let vid_list: String = need_storage
1001 .iter()
1002 .map(|v| v.as_u64().to_string())
1003 .collect::<Vec<_>>()
1004 .join(", ");
1005 let base_filter = format!("_vid IN ({})", vid_list);
1006
1007 let filter_expr = self.storage.apply_version_filter(base_filter);
1008
1009 let batches: Vec<RecordBatch> = match table
1010 .query()
1011 .only_if(&filter_expr)
1012 .select(Select::Columns(columns.clone()))
1013 .execute()
1014 .await
1015 {
1016 Ok(stream) => stream.try_collect().await.unwrap_or_default(),
1017 Err(_) => Vec::new(),
1018 };
1019
1020 let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
1021
1022 let mut per_vid_best_version: HashMap<Vid, u64> = HashMap::new();
1024 let mut per_vid_props: HashMap<Vid, Properties> = HashMap::new();
1025
1026 for batch in batches {
1027 let vid_col = match batch
1028 .column_by_name("_vid")
1029 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1030 {
1031 Some(c) => c,
1032 None => continue,
1033 };
1034 let deleted_col = match batch
1035 .column_by_name("_deleted")
1036 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1037 {
1038 Some(c) => c,
1039 None => continue,
1040 };
1041 let version_col = match batch
1042 .column_by_name("_version")
1043 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1044 {
1045 Some(c) => c,
1046 None => continue,
1047 };
1048
1049 for row in 0..batch.num_rows() {
1050 let vid = Vid::from(vid_col.value(row));
1051 let version = version_col.value(row);
1052
1053 if deleted_col.value(row) {
1054 if per_vid_best_version
1055 .get(&vid)
1056 .is_none_or(|&best| version >= best)
1057 {
1058 per_vid_best_version.insert(vid, version);
1059 per_vid_props.remove(&vid);
1060 }
1061 continue;
1062 }
1063
1064 let mut current_props =
1065 Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1066
1067 if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1068 for (k, v) in overflow_props {
1069 current_props.entry(k).or_insert(v);
1070 }
1071 }
1072
1073 let best = per_vid_best_version.get(&vid).copied();
1074 let mut best_opt = best;
1075 let mut merged = per_vid_props.remove(&vid);
1076 self.merge_versioned_props(
1077 current_props,
1078 version,
1079 &mut best_opt,
1080 &mut merged,
1081 label_props,
1082 )?;
1083 if let Some(v) = best_opt {
1084 per_vid_best_version.insert(vid, v);
1085 }
1086 if let Some(p) = merged {
1087 per_vid_props.insert(vid, p);
1088 }
1089 }
1090 }
1091
1092 for (vid, storage_props) in per_vid_props {
1094 let entry = result.entry(vid).or_default();
1095 for (k, v) in storage_props {
1096 entry.entry(k).or_insert(v);
1097 }
1098 }
1099
1100 if ctx.is_some() {
1105 for props in result.values_mut() {
1106 self.normalize_crdt_properties(props, label)?;
1107 }
1108 }
1109
1110 Ok(result)
1111 }
1112
1113 fn normalize_crdt_properties(&self, props: &mut Properties, label: &str) -> Result<()> {
1117 let schema = self.schema_manager.schema();
1118 let label_props = match schema.properties.get(label) {
1119 Some(p) => p,
1120 None => return Ok(()),
1121 };
1122
1123 for (prop_name, prop_meta) in label_props {
1124 if let DataType::Crdt(_) = prop_meta.r#type
1125 && let Some(val) = props.get_mut(prop_name)
1126 {
1127 *val = Value::from(Self::parse_crdt_value(val)?);
1128 }
1129 }
1130
1131 Ok(())
1132 }
1133
1134 fn extract_row_properties(
1136 batch: &RecordBatch,
1137 row: usize,
1138 prop_names: &[&str],
1139 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1140 ) -> Result<Properties> {
1141 let mut props = Properties::new();
1142 for name in prop_names {
1143 let col = match batch.column_by_name(name) {
1144 Some(col) => col,
1145 None => continue,
1146 };
1147 if col.is_null(row) {
1148 continue;
1149 }
1150 if let Some(prop_meta) = label_props.and_then(|p| p.get(*name)) {
1151 let val = Self::value_from_column(col.as_ref(), &prop_meta.r#type, row)?;
1152 props.insert((*name).to_string(), val);
1153 }
1154 }
1155 Ok(props)
1156 }
1157
1158 fn extract_overflow_properties(batch: &RecordBatch, row: usize) -> Result<Option<Properties>> {
1163 use arrow_array::LargeBinaryArray;
1164
1165 let overflow_col = match batch.column_by_name("overflow_json") {
1166 Some(col) => col,
1167 None => return Ok(None), };
1169
1170 if overflow_col.is_null(row) {
1171 return Ok(None);
1172 }
1173
1174 let binary_array = overflow_col
1175 .as_any()
1176 .downcast_ref::<LargeBinaryArray>()
1177 .ok_or_else(|| anyhow!("overflow_json is not LargeBinaryArray"))?;
1178
1179 let jsonb_bytes = binary_array.value(row);
1180
1181 let uni_val = uni_common::cypher_value_codec::decode(jsonb_bytes)
1183 .map_err(|e| anyhow!("Failed to decode CypherValue: {}", e))?;
1184 let json_val: serde_json::Value = uni_val.into();
1185
1186 let overflow_props: Properties = serde_json::from_value(json_val)
1188 .map_err(|e| anyhow!("Failed to parse overflow properties: {}", e))?;
1189
1190 Ok(Some(overflow_props))
1191 }
1192
1193 fn merge_overflow_into_props(
1200 batch: &RecordBatch,
1201 row: usize,
1202 properties: &[&str],
1203 props: &mut Properties,
1204 ) -> Result<()> {
1205 use arrow_array::LargeBinaryArray;
1206
1207 let overflow_col = match batch.column_by_name("overflow_json") {
1208 Some(col) if !col.is_null(row) => col,
1209 _ => return Ok(()),
1210 };
1211
1212 if properties.contains(&"overflow_json")
1214 && let Some(binary_array) = overflow_col.as_any().downcast_ref::<LargeBinaryArray>()
1215 {
1216 let jsonb_bytes = binary_array.value(row);
1217 let bytes_list: Vec<Value> =
1218 jsonb_bytes.iter().map(|&b| Value::Int(b as i64)).collect();
1219 props.insert("overflow_json".to_string(), Value::List(bytes_list));
1220 }
1221
1222 if let Some(overflow_props) = Self::extract_overflow_properties(batch, row)? {
1224 for (k, v) in overflow_props {
1225 if properties.contains(&k.as_str()) {
1226 props.entry(k).or_insert(v);
1227 }
1228 }
1229 }
1230
1231 Ok(())
1232 }
1233
1234 fn merge_crdt_into(
1236 &self,
1237 target: &mut Properties,
1238 source: Properties,
1239 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1240 crdt_only: bool,
1241 ) -> Result<()> {
1242 for (k, v) in source {
1243 if let Some(prop_meta) = label_props.and_then(|p| p.get(&k)) {
1244 if let DataType::Crdt(_) = prop_meta.r#type {
1245 let existing_v = target.entry(k).or_insert(Value::Null);
1246 *existing_v = self.merge_crdt_values(existing_v, &v)?;
1247 } else if !crdt_only {
1248 target.insert(k, v);
1249 }
1250 }
1251 }
1252 Ok(())
1253 }
1254
1255 fn merge_versioned_props(
1257 &self,
1258 current_props: Properties,
1259 version: u64,
1260 best_version: &mut Option<u64>,
1261 best_props: &mut Option<Properties>,
1262 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1263 ) -> Result<()> {
1264 if best_version.is_none_or(|best| version > best) {
1265 if let Some(mut existing_props) = best_props.take() {
1267 let mut merged = current_props;
1269 for (k, v) in merged.iter_mut() {
1270 if let Some(prop_meta) = label_props.and_then(|p| p.get(k))
1271 && let DataType::Crdt(_) = prop_meta.r#type
1272 && let Some(existing_val) = existing_props.remove(k)
1273 {
1274 *v = self.merge_crdt_values(v, &existing_val)?;
1275 }
1276 }
1277 *best_props = Some(merged);
1278 } else {
1279 *best_props = Some(current_props);
1280 }
1281 *best_version = Some(version);
1282 } else if Some(version) == *best_version {
1283 if let Some(existing_props) = best_props.as_mut() {
1285 self.merge_crdt_into(existing_props, current_props, label_props, false)?;
1286 } else {
1287 *best_props = Some(current_props);
1288 }
1289 } else {
1290 if let Some(existing_props) = best_props.as_mut() {
1292 self.merge_crdt_into(existing_props, current_props, label_props, true)?;
1293 }
1294 }
1295 Ok(())
1296 }
1297
1298 async fn fetch_all_props_from_storage(&self, vid: Vid) -> Result<Option<Properties>> {
1299 let schema = self.schema_manager.schema();
1302 let mut merged_props: Option<Properties> = None;
1303 let mut global_best_version: Option<u64> = None;
1304
1305 let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1307 {
1308 labels
1309 } else {
1310 schema.labels.keys().cloned().collect() };
1312
1313 for label_name in &label_names {
1314 let label_props = schema.properties.get(label_name);
1315
1316 let table = match self.storage.get_cached_table(label_name).await {
1317 Ok(t) => t,
1318 Err(_) => continue,
1319 };
1320
1321 let mut prop_names: Vec<String> = Vec::new();
1323 if let Some(props) = label_props {
1324 prop_names = props.keys().cloned().collect();
1325 }
1326
1327 let mut columns: Vec<String> = vec!["_deleted".to_string(), "_version".to_string()];
1329 columns.extend(prop_names.iter().cloned());
1330 columns.push("overflow_json".to_string());
1332
1333 let base_filter = format!("_vid = {}", vid.as_u64());
1335
1336 let filter_expr = self.storage.apply_version_filter(base_filter);
1337
1338 let batches: Vec<RecordBatch> = match table
1339 .query()
1340 .only_if(&filter_expr)
1341 .select(Select::Columns(columns.clone()))
1342 .execute()
1343 .await
1344 {
1345 Ok(stream) => match stream.try_collect().await {
1346 Ok(b) => b,
1347 Err(_) => continue,
1348 },
1349 Err(_) => continue,
1350 };
1351
1352 let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
1354
1355 for batch in batches {
1356 let deleted_col = match batch
1357 .column_by_name("_deleted")
1358 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1359 {
1360 Some(c) => c,
1361 None => continue,
1362 };
1363 let version_col = match batch
1364 .column_by_name("_version")
1365 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1366 {
1367 Some(c) => c,
1368 None => continue,
1369 };
1370
1371 for row in 0..batch.num_rows() {
1372 let version = version_col.value(row);
1373
1374 if deleted_col.value(row) {
1375 if global_best_version.is_none_or(|best| version >= best) {
1376 global_best_version = Some(version);
1377 merged_props = None;
1378 }
1379 continue;
1380 }
1381
1382 let mut current_props =
1383 Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1384
1385 if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1387 for (k, v) in overflow_props {
1389 current_props.entry(k).or_insert(v);
1390 }
1391 }
1392
1393 self.merge_versioned_props(
1394 current_props,
1395 version,
1396 &mut global_best_version,
1397 &mut merged_props,
1398 label_props,
1399 )?;
1400 }
1401 }
1402 }
1403
1404 if merged_props.is_none()
1406 && let Some(main_props) = MainVertexDataset::find_props_by_vid(
1407 self.storage.lancedb_store(),
1408 vid,
1409 self.storage.version_high_water_mark(),
1410 )
1411 .await?
1412 {
1413 return Ok(Some(main_props));
1414 }
1415
1416 Ok(merged_props)
1417 }
1418
1419 pub async fn get_vertex_prop(&self, vid: Vid, prop: &str) -> Result<Value> {
1420 self.get_vertex_prop_with_ctx(vid, prop, None).await
1421 }
1422
1423 #[instrument(skip(self, ctx), level = "trace")]
1424 pub async fn get_vertex_prop_with_ctx(
1425 &self,
1426 vid: Vid,
1427 prop: &str,
1428 ctx: Option<&QueryContext>,
1429 ) -> Result<Value> {
1430 if l0_visibility::is_vertex_deleted(vid, ctx) {
1432 return Ok(Value::Null);
1433 }
1434
1435 let schema = self.schema_manager.schema();
1438 let labels = ctx
1439 .map(|c| l0_visibility::get_vertex_labels(vid, c))
1440 .unwrap_or_default();
1441
1442 let is_crdt = if !labels.is_empty() {
1443 labels.iter().any(|ln| {
1445 schema
1446 .properties
1447 .get(ln)
1448 .and_then(|lp| lp.get(prop))
1449 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1450 .unwrap_or(false)
1451 })
1452 } else {
1453 schema.properties.values().any(|label_props| {
1455 label_props
1456 .get(prop)
1457 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1458 .unwrap_or(false)
1459 })
1460 };
1461
1462 if is_crdt {
1464 let l0_val = self.accumulate_crdt_from_l0(vid, prop, ctx)?;
1466 return self.finalize_crdt_lookup(vid, prop, l0_val).await;
1467 }
1468
1469 if let Some(val) = l0_visibility::lookup_vertex_prop(vid, prop, ctx) {
1471 return Ok(val);
1472 }
1473
1474 if let Some(ref cache) = self.vertex_cache {
1476 let mut cache = cache.lock().await;
1477 if let Some(val) = cache.get(&(vid, prop.to_string())) {
1478 debug!(vid = ?vid, prop, "Cache HIT");
1479 metrics::counter!("uni_property_cache_hits_total", "type" => "vertex").increment(1);
1480 return Ok(val.clone());
1481 } else {
1482 debug!(vid = ?vid, prop, "Cache MISS");
1483 metrics::counter!("uni_property_cache_misses_total", "type" => "vertex")
1484 .increment(1);
1485 }
1486 }
1487
1488 let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1490
1491 if let Some(ref cache) = self.vertex_cache {
1493 let mut cache = cache.lock().await;
1494 cache.put((vid, prop.to_string()), storage_val.clone());
1495 }
1496
1497 Ok(storage_val)
1498 }
1499
1500 fn accumulate_crdt_from_l0(
1502 &self,
1503 vid: Vid,
1504 prop: &str,
1505 ctx: Option<&QueryContext>,
1506 ) -> Result<Value> {
1507 let mut merged = Value::Null;
1508 l0_visibility::visit_l0_buffers(ctx, |l0| {
1509 if let Some(props) = l0.vertex_properties.get(&vid)
1510 && let Some(val) = props.get(prop)
1511 {
1512 if let Ok(new_merged) = self.merge_crdt_values(&merged, val) {
1514 merged = new_merged;
1515 }
1516 }
1517 false });
1519 Ok(merged)
1520 }
1521
1522 async fn finalize_crdt_lookup(&self, vid: Vid, prop: &str, l0_val: Value) -> Result<Value> {
1524 let cached_val = if let Some(ref cache) = self.vertex_cache {
1526 let mut cache = cache.lock().await;
1527 cache.get(&(vid, prop.to_string())).cloned()
1528 } else {
1529 None
1530 };
1531
1532 if let Some(val) = cached_val {
1533 let merged = self.merge_crdt_values(&val, &l0_val)?;
1534 return Ok(merged);
1535 }
1536
1537 let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1539
1540 if let Some(ref cache) = self.vertex_cache {
1542 let mut cache = cache.lock().await;
1543 cache.put((vid, prop.to_string()), storage_val.clone());
1544 }
1545
1546 self.merge_crdt_values(&storage_val, &l0_val)
1548 }
1549
1550 async fn fetch_prop_from_storage(&self, vid: Vid, prop: &str) -> Result<Value> {
1551 let schema = self.schema_manager.schema();
1554 let mut best_version: Option<u64> = None;
1555 let mut best_value: Option<Value> = None;
1556
1557 let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1559 {
1560 labels
1561 } else {
1562 schema.labels.keys().cloned().collect() };
1564
1565 for label_name in &label_names {
1566 let prop_meta = schema
1568 .properties
1569 .get(label_name)
1570 .and_then(|props| props.get(prop));
1571
1572 let table = match self.storage.get_cached_table(label_name).await {
1574 Ok(t) => t,
1575 Err(_) => continue,
1576 };
1577
1578 let base_filter = format!("_vid = {}", vid.as_u64());
1580
1581 let filter_expr = self.storage.apply_version_filter(base_filter);
1582
1583 let mut columns = vec![
1585 "_deleted".to_string(),
1586 "_version".to_string(),
1587 "overflow_json".to_string(),
1588 ];
1589
1590 if prop_meta.is_some() {
1592 columns.push(prop.to_string());
1593 }
1594
1595 let batches: Vec<RecordBatch> = match table
1596 .query()
1597 .only_if(&filter_expr)
1598 .select(Select::Columns(columns))
1599 .execute()
1600 .await
1601 {
1602 Ok(stream) => match stream.try_collect().await {
1603 Ok(b) => b,
1604 Err(_) => continue,
1605 },
1606 Err(_) => continue,
1607 };
1608
1609 for batch in batches {
1610 let deleted_col = match batch
1611 .column_by_name("_deleted")
1612 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1613 {
1614 Some(c) => c,
1615 None => continue,
1616 };
1617 let version_col = match batch
1618 .column_by_name("_version")
1619 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1620 {
1621 Some(c) => c,
1622 None => continue,
1623 };
1624 for row in 0..batch.num_rows() {
1625 let version = version_col.value(row);
1626
1627 if deleted_col.value(row) {
1628 if best_version.is_none_or(|best| version >= best) {
1629 best_version = Some(version);
1630 best_value = None;
1631 }
1632 continue;
1633 }
1634
1635 let mut val = None;
1637 if let Some(meta) = prop_meta
1638 && let Some(col) = batch.column_by_name(prop)
1639 {
1640 val = Some(if col.is_null(row) {
1641 Value::Null
1642 } else {
1643 Self::value_from_column(col, &meta.r#type, row)?
1644 });
1645 }
1646
1647 if val.is_none()
1649 && let Some(overflow_props) =
1650 Self::extract_overflow_properties(&batch, row)?
1651 && let Some(overflow_val) = overflow_props.get(prop)
1652 {
1653 val = Some(overflow_val.clone());
1654 }
1655
1656 if let Some(v) = val {
1658 if let Some(meta) = prop_meta {
1659 self.merge_prop_value(
1661 v,
1662 version,
1663 &meta.r#type,
1664 &mut best_version,
1665 &mut best_value,
1666 )?;
1667 } else {
1668 if best_version.is_none_or(|best| version >= best) {
1670 best_version = Some(version);
1671 best_value = Some(v);
1672 }
1673 }
1674 }
1675 }
1676 }
1677 }
1678 Ok(best_value.unwrap_or(Value::Null))
1679 }
1680
1681 pub fn value_from_column(col: &dyn Array, data_type: &DataType, row: usize) -> Result<Value> {
1683 match data_type {
1688 DataType::DateTime | DataType::Timestamp | DataType::Date | DataType::Time => Ok(
1689 crate::storage::arrow_convert::arrow_to_value(col, row, Some(data_type)),
1690 ),
1691 _ => value_codec::value_from_column(col, data_type, row, CrdtDecodeMode::Strict)
1692 .map(Value::from),
1693 }
1694 }
1695
1696 pub(crate) fn merge_crdt_values(&self, a: &Value, b: &Value) -> Result<Value> {
1697 if a.is_null() {
1701 return Self::parse_crdt_value(b).map(Value::from);
1702 }
1703 if b.is_null() {
1704 return Self::parse_crdt_value(a).map(Value::from);
1705 }
1706
1707 let a_parsed = Self::parse_crdt_value(a)?;
1708 let b_parsed = Self::parse_crdt_value(b)?;
1709
1710 let mut crdt_a: Crdt = serde_json::from_value(a_parsed)?;
1711 let crdt_b: Crdt = serde_json::from_value(b_parsed)?;
1712 crdt_a
1713 .try_merge(&crdt_b)
1714 .map_err(|e| anyhow::anyhow!("{e}"))?;
1715 Ok(Value::from(serde_json::to_value(crdt_a)?))
1716 }
1717
1718 fn parse_crdt_value(val: &Value) -> Result<serde_json::Value> {
1721 if let Value::String(s) = val {
1722 serde_json::from_str(s).map_err(|e| anyhow!("Failed to parse CRDT JSON string: {}", e))
1724 } else {
1725 Ok(serde_json::Value::from(val.clone()))
1727 }
1728 }
1729
1730 fn merge_prop_value(
1732 &self,
1733 val: Value,
1734 version: u64,
1735 data_type: &DataType,
1736 best_version: &mut Option<u64>,
1737 best_value: &mut Option<Value>,
1738 ) -> Result<()> {
1739 if let DataType::Crdt(_) = data_type {
1740 self.merge_crdt_prop_value(val, version, best_version, best_value)
1741 } else {
1742 if best_version.is_none_or(|best| version >= best) {
1744 *best_version = Some(version);
1745 *best_value = Some(val);
1746 }
1747 Ok(())
1748 }
1749 }
1750
1751 fn merge_crdt_prop_value(
1753 &self,
1754 val: Value,
1755 version: u64,
1756 best_version: &mut Option<u64>,
1757 best_value: &mut Option<Value>,
1758 ) -> Result<()> {
1759 if best_version.is_none_or(|best| version > best) {
1760 if let Some(existing) = best_value.take() {
1762 *best_value = Some(self.merge_crdt_values(&val, &existing)?);
1763 } else {
1764 *best_value = Some(val);
1765 }
1766 *best_version = Some(version);
1767 } else if Some(version) == *best_version {
1768 let existing = best_value.get_or_insert(Value::Null);
1770 *existing = self.merge_crdt_values(existing, &val)?;
1771 } else {
1772 if let Some(existing) = best_value.as_mut() {
1774 *existing = self.merge_crdt_values(existing, &val)?;
1775 }
1776 }
1777 Ok(())
1778 }
1779}