1use crate::runtime::context::QueryContext;
5use crate::runtime::l0::L0Buffer;
6use crate::runtime::l0_visibility;
7use crate::storage::main_vertex::MainVertexDataset;
8use crate::storage::manager::StorageManager;
9use crate::storage::value_codec::CrdtDecodeMode;
10use anyhow::{Result, anyhow};
11use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
12use lru::LruCache;
13use metrics;
14use std::collections::HashMap;
15use std::num::NonZeroUsize;
16use std::sync::Arc;
17use tokio::sync::Mutex;
18use tracing::{debug, instrument, warn};
19use uni_common::Properties;
20use uni_common::Value;
21use uni_common::core::id::{Eid, Vid};
22use uni_common::core::schema::{DataType, SchemaManager};
23use uni_crdt::Crdt;
24
25pub struct PropertyManager {
26 storage: Arc<StorageManager>,
27 schema_manager: Arc<SchemaManager>,
28 plugin_registry: Arc<uni_plugin::PluginRegistry>,
36 vertex_cache: Option<Mutex<LruCache<(Vid, String), Value>>>,
38 edge_cache: Option<Mutex<LruCache<(uni_common::core::id::Eid, String), Value>>>,
39 cache_capacity: usize,
40}
41
42impl PropertyManager {
43 pub fn new(
49 storage: Arc<StorageManager>,
50 schema_manager: Arc<SchemaManager>,
51 capacity: usize,
52 ) -> Self {
53 Self::with_plugin_registry(
54 storage,
55 schema_manager,
56 capacity,
57 Arc::new(uni_plugin::PluginRegistry::new()),
58 )
59 }
60
61 pub fn with_plugin_registry(
69 storage: Arc<StorageManager>,
70 schema_manager: Arc<SchemaManager>,
71 capacity: usize,
72 plugin_registry: Arc<uni_plugin::PluginRegistry>,
73 ) -> Self {
74 let (vertex_cache, edge_cache) = if capacity == 0 {
76 (None, None)
77 } else {
78 let cap = NonZeroUsize::new(capacity).unwrap();
79 (
80 Some(Mutex::new(LruCache::new(cap))),
81 Some(Mutex::new(LruCache::new(cap))),
82 )
83 };
84
85 Self {
86 storage,
87 schema_manager,
88 plugin_registry,
89 vertex_cache,
90 edge_cache,
91 cache_capacity: capacity,
92 }
93 }
94
95 pub fn cache_size(&self) -> usize {
96 self.cache_capacity
97 }
98
99 pub fn caching_enabled(&self) -> bool {
101 self.cache_capacity > 0
102 }
103
104 pub async fn clear_cache(&self) {
107 if let Some(ref cache) = self.vertex_cache {
108 cache.lock().await.clear();
109 }
110 if let Some(ref cache) = self.edge_cache {
111 cache.lock().await.clear();
112 }
113 }
114
115 pub async fn invalidate_vertex(&self, _vid: Vid) {
117 if let Some(ref cache) = self.vertex_cache {
118 let mut cache = cache.lock().await;
119 cache.clear();
123 }
124 }
125
126 pub async fn invalidate_edge(&self, _eid: uni_common::core::id::Eid) {
128 if let Some(ref cache) = self.edge_cache {
129 let mut cache = cache.lock().await;
130 cache.clear();
132 }
133 }
134
135 #[instrument(skip(self, ctx), level = "trace")]
136 pub async fn get_edge_prop(
137 &self,
138 eid: uni_common::core::id::Eid,
139 prop: &str,
140 ctx: Option<&QueryContext>,
141 ) -> Result<Value> {
142 if l0_visibility::is_edge_deleted(eid, ctx) {
144 return Ok(Value::Null);
145 }
146
147 if let Some(val) = l0_visibility::lookup_edge_prop(eid, prop, ctx) {
149 return Ok(val);
150 }
151
152 if let Some(ref cache) = self.edge_cache {
154 let mut cache = cache.lock().await;
155 if let Some(val) = cache.get(&(eid, prop.to_string())) {
156 debug!(eid = ?eid, prop, "Cache HIT");
157 metrics::counter!("uni_property_cache_hits_total", "type" => "edge").increment(1);
158 return Ok(val.clone());
159 } else {
160 debug!(eid = ?eid, prop, "Cache MISS");
161 metrics::counter!("uni_property_cache_misses_total", "type" => "edge").increment(1);
162 }
163 }
164
165 let all = self.get_all_edge_props_with_ctx(eid, ctx).await?;
167 let val = all
168 .as_ref()
169 .and_then(|props| props.get(prop).cloned())
170 .unwrap_or(Value::Null);
171
172 if let Some(ref cache) = self.edge_cache {
174 let mut cache = cache.lock().await;
175 if let Some(ref props) = all {
176 for (prop_name, prop_val) in props {
177 cache.put((eid, prop_name.clone()), prop_val.clone());
178 }
179 } else {
180 cache.put((eid, prop.to_string()), Value::Null);
182 }
183 }
184
185 Ok(val)
186 }
187
188 pub async fn get_all_edge_props_with_ctx(
189 &self,
190 eid: uni_common::core::id::Eid,
191 ctx: Option<&QueryContext>,
192 ) -> Result<Option<Properties>> {
193 if l0_visibility::is_edge_deleted(eid, ctx) {
195 return Ok(None);
196 }
197
198 let mut final_props = l0_visibility::accumulate_edge_props(eid, ctx).unwrap_or_default();
200
201 let storage_props = self.fetch_all_edge_props_from_storage(eid).await?;
203
204 if final_props.is_empty() && storage_props.is_none() {
206 if l0_visibility::edge_exists_in_l0(eid, ctx) {
207 return Ok(Some(Properties::new()));
208 }
209 return Ok(None);
210 }
211
212 if let Some(sp) = storage_props {
214 for (k, v) in sp {
215 final_props.entry(k).or_insert(v);
216 }
217 }
218
219 Ok(Some(final_props))
220 }
221
222 async fn fetch_all_edge_props_from_storage(&self, eid: Eid) -> Result<Option<Properties>> {
223 self.fetch_all_edge_props_from_storage_with_hint(eid, None)
225 .await
226 }
227
228 async fn fetch_all_edge_props_from_storage_with_hint(
229 &self,
230 eid: Eid,
231 type_name_hint: Option<&str>,
232 ) -> Result<Option<Properties>> {
233 let schema = self.schema_manager.schema();
234 let backend = self.storage.backend();
235
236 let type_names: Vec<&str> = if let Some(hint) = type_name_hint {
238 vec![hint]
239 } else {
240 schema.edge_types.keys().map(|s| s.as_str()).collect()
242 };
243
244 for type_name in type_names {
245 let type_props = schema.properties.get(type_name);
246
247 if self.storage.delta_dataset(type_name, "fwd").is_err() {
250 continue; }
252
253 use crate::backend::table_names;
255 use crate::backend::types::ScanRequest;
256
257 let table_name = table_names::delta_table_name(type_name, "fwd");
258 if !backend.table_exists(&table_name).await.unwrap_or(false) {
259 continue; }
261
262 let base_filter = format!("eid = {}", eid.as_u64());
263 let filter_expr = self.storage.apply_version_filter(base_filter);
264
265 let batches = match backend
266 .scan(ScanRequest::all(&table_name).with_filter(filter_expr))
267 .await
268 {
269 Ok(b) => b,
270 Err(_) => continue,
271 };
272
273 let mut rows: Vec<(u64, u8, Properties)> = Vec::new();
275
276 for batch in batches {
277 let op_col = match batch.column_by_name("op") {
278 Some(c) => c
279 .as_any()
280 .downcast_ref::<arrow_array::UInt8Array>()
281 .unwrap(),
282 None => continue,
283 };
284 let ver_col = match batch.column_by_name("_version") {
285 Some(c) => c.as_any().downcast_ref::<UInt64Array>().unwrap(),
286 None => continue,
287 };
288
289 for row in 0..batch.num_rows() {
290 let ver = ver_col.value(row);
291 let op = op_col.value(row);
292 let mut props = Properties::new();
293
294 if op != 1 {
295 if let Some(tp) = type_props {
297 for (p_name, p_meta) in tp {
298 if let Some(col) = batch.column_by_name(p_name)
299 && !col.is_null(row)
300 {
301 let val =
302 Self::value_from_column(col.as_ref(), &p_meta.r#type, row)?;
303 props.insert(p_name.clone(), val);
304 }
305 }
306 }
307 }
308 rows.push((ver, op, props));
309 }
310 }
311
312 if rows.is_empty() {
313 continue;
314 }
315
316 rows.sort_by_key(|(ver, _, _)| *ver);
318
319 let mut merged_props: Properties = Properties::new();
323 let mut is_deleted = false;
324
325 for (_, op, props) in rows {
326 if op == 1 {
327 is_deleted = true;
329 merged_props.clear();
330 } else {
331 is_deleted = false;
332 for (p_name, p_val) in props {
333 let is_crdt = type_props
335 .and_then(|tp| tp.get(&p_name))
336 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
337 .unwrap_or(false);
338
339 if is_crdt {
340 if let Some(existing) = merged_props.get(&p_name) {
342 if let Ok(merged) = self.merge_crdt_values(existing, &p_val) {
343 merged_props.insert(p_name, merged);
344 }
345 } else {
346 merged_props.insert(p_name, p_val);
347 }
348 } else {
349 merged_props.insert(p_name, p_val);
351 }
352 }
353 }
354 }
355
356 if is_deleted {
357 return Ok(None);
358 }
359
360 if !merged_props.is_empty() {
361 return Ok(Some(merged_props));
362 }
363 }
364
365 use crate::storage::main_edge::MainEdgeDataset;
367 if let Some(props) = MainEdgeDataset::find_props_by_eid(self.storage.backend(), eid).await?
368 {
369 return Ok(Some(props));
370 }
371
372 Ok(None)
373 }
374
375 pub async fn get_batch_vertex_props(
377 &self,
378 vids: &[Vid],
379 properties: &[&str],
380 ctx: Option<&QueryContext>,
381 ) -> Result<HashMap<Vid, Properties>> {
382 let schema = self.schema_manager.schema();
383 let mut result = HashMap::new();
384 if vids.is_empty() {
385 return Ok(result);
386 }
387
388 let labels_to_scan: Vec<String> = {
393 let mut needed: std::collections::HashSet<String> = std::collections::HashSet::new();
394 let mut all_resolved = true;
395 for &vid in vids {
396 if let Some(labels) = self.storage.get_labels_from_index(vid) {
397 needed.extend(labels);
398 } else {
399 all_resolved = false;
400 break;
401 }
402 }
403 if all_resolved {
404 needed.into_iter().collect()
405 } else {
406 schema.labels.keys().cloned().collect() }
408 };
409
410 for label_name in &labels_to_scan {
412 let label_schema_props = schema.properties.get(label_name);
414 let valid_props: Vec<&str> = properties
415 .iter()
416 .cloned()
417 .filter(|p| label_schema_props.is_some_and(|props| props.contains_key(*p)))
418 .collect();
419 let ds = self.storage.vertex_dataset(label_name)?;
422 let backend = self.storage.backend();
423 let vtable_name = ds.table_name();
424
425 if !backend.table_exists(&vtable_name).await.unwrap_or(false) {
426 continue; }
428
429 let vid_list = vids
431 .iter()
432 .map(|v| v.as_u64().to_string())
433 .collect::<Vec<_>>()
434 .join(",");
435 let base_filter = format!("_vid IN ({})", vid_list);
436
437 let final_filter = self.storage.apply_version_filter(base_filter);
438
439 let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
441 columns.push("_vid".to_string());
442 columns.push("_version".to_string());
443 columns.push("_deleted".to_string());
444 columns.extend(valid_props.iter().map(|s| s.to_string()));
445 columns.push("overflow_json".to_string());
447
448 use crate::backend::types::ScanRequest;
449 let request = ScanRequest::all(&vtable_name)
450 .with_filter(final_filter)
451 .with_columns(columns);
452
453 let batches: Vec<RecordBatch> = match backend.scan(request).await {
454 Ok(b) => b,
455 Err(e) => {
456 warn!(
457 label = %label_name,
458 error = %e,
459 "failed to scan label table, skipping"
460 );
461 continue;
462 }
463 };
464 for batch in batches {
465 let vid_col = match batch
466 .column_by_name("_vid")
467 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
468 {
469 Some(c) => c,
470 None => continue,
471 };
472 let del_col = match batch
473 .column_by_name("_deleted")
474 .and_then(|col| col.as_any().downcast_ref::<BooleanArray>())
475 {
476 Some(c) => c,
477 None => continue,
478 };
479
480 for row in 0..batch.num_rows() {
481 let vid = Vid::from(vid_col.value(row));
482
483 if del_col.value(row) {
484 result.remove(&vid);
485 continue;
486 }
487
488 let label_props = schema.properties.get(label_name);
489 let mut props =
490 Self::extract_row_properties(&batch, row, &valid_props, label_props)?;
491 Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
492 result.insert(vid, props);
493 }
494 }
495 }
496
497 if let Some(ctx) = ctx {
499 for pending_l0_arc in &ctx.pending_flush_l0s {
501 let pending_l0 = pending_l0_arc.read();
502 self.overlay_l0_batch(vids, &pending_l0, properties, &mut result);
503 }
504
505 let l0 = ctx.l0.read();
507 self.overlay_l0_batch(vids, &l0, properties, &mut result);
508
509 if self.storage.version_high_water_mark().is_none()
513 && let Some(tx_l0_arc) = &ctx.transaction_l0
514 {
515 let tx_l0 = tx_l0_arc.read();
516 self.overlay_l0_batch(vids, &tx_l0, properties, &mut result);
517 }
518 }
519
520 Ok(result)
521 }
522
523 fn overlay_l0_batch(
524 &self,
525 vids: &[Vid],
526 l0: &L0Buffer,
527 properties: &[&str],
528 result: &mut HashMap<Vid, Properties>,
529 ) {
530 let schema = self.schema_manager.schema();
531 for &vid in vids {
532 if l0.vertex_tombstones.contains(&vid) {
534 result.remove(&vid);
535 continue;
536 }
537 if let Some(l0_props) = l0.vertex_properties.get(&vid) {
539 let entry_version = l0.vertex_versions.get(&vid).copied().unwrap_or(0);
541 if self
542 .storage
543 .version_high_water_mark()
544 .is_some_and(|hwm| entry_version > hwm)
545 {
546 continue;
547 }
548
549 let entry = result.entry(vid).or_default();
550 let labels = l0.get_vertex_labels(vid);
552
553 for (k, v) in l0_props {
554 if properties.contains(&k.as_str()) {
555 let is_crdt = labels
557 .and_then(|label_list| {
558 label_list.iter().find_map(|ln| {
559 schema
560 .properties
561 .get(ln)
562 .and_then(|lp| lp.get(k))
563 .filter(|pm| matches!(pm.r#type, DataType::Crdt(_)))
564 })
565 })
566 .is_some();
567
568 if is_crdt {
569 let existing = entry.entry(k.clone()).or_insert(Value::Null);
570 *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
571 } else {
572 entry.insert(k.clone(), v.clone());
573 }
574 }
575 }
576 }
577 }
578 }
579
580 pub async fn get_batch_edge_props(
583 &self,
584 eids: &[uni_common::core::id::Eid],
585 properties: &[&str],
586 ctx: Option<&QueryContext>,
587 ) -> Result<HashMap<Vid, Properties>> {
588 let schema = self.schema_manager.schema();
589 let mut result = HashMap::new();
590 if eids.is_empty() {
591 return Ok(result);
592 }
593
594 let types_to_scan: Vec<String> = {
599 if let Some(ctx) = ctx {
600 let mut needed: std::collections::HashSet<String> =
601 std::collections::HashSet::new();
602 let mut all_resolved = true;
603 for &eid in eids {
604 if let Some(etype) = ctx.l0.read().get_edge_type(eid) {
605 needed.insert(etype.to_string());
606 } else {
607 all_resolved = false;
608 break;
609 }
610 }
611 if all_resolved {
612 needed.into_iter().collect()
613 } else {
614 schema.edge_types.keys().cloned().collect() }
616 } else {
617 schema.edge_types.keys().cloned().collect() }
619 };
620
621 for type_name in &types_to_scan {
623 let type_props = schema.properties.get(type_name);
624 let valid_props: Vec<&str> = properties
625 .iter()
626 .cloned()
627 .filter(|p| type_props.is_some_and(|props| props.contains_key(*p)))
628 .collect();
629 let delta_ds = match self.storage.delta_dataset(type_name, "fwd") {
632 Ok(ds) => ds,
633 Err(_) => continue,
634 };
635 let backend = self.storage.backend();
636 let dtable_name = delta_ds.table_name();
637
638 if !backend.table_exists(&dtable_name).await.unwrap_or(false) {
639 continue; }
641
642 let eid_list = eids
643 .iter()
644 .map(|e| e.as_u64().to_string())
645 .collect::<Vec<_>>()
646 .join(",");
647 let base_filter = format!("eid IN ({})", eid_list);
648
649 let final_filter = self.storage.apply_version_filter(base_filter);
650
651 let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
653 columns.push("eid".to_string());
654 columns.push("_version".to_string());
655 columns.push("op".to_string());
656 columns.extend(valid_props.iter().map(|s| s.to_string()));
657 columns.push("overflow_json".to_string());
659
660 use crate::backend::types::ScanRequest;
661 let request = ScanRequest::all(&dtable_name)
662 .with_filter(final_filter)
663 .with_columns(columns);
664
665 let batches: Vec<RecordBatch> = match backend.scan(request).await {
666 Ok(b) => b,
667 Err(e) => {
668 warn!(
669 edge_type = %type_name,
670 error = %e,
671 "failed to scan edge delta table, skipping"
672 );
673 continue;
674 }
675 };
676 for batch in batches {
677 let eid_col = match batch
678 .column_by_name("eid")
679 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
680 {
681 Some(c) => c,
682 None => continue,
683 };
684 let op_col = match batch
685 .column_by_name("op")
686 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
687 {
688 Some(c) => c,
689 None => continue,
690 };
691
692 for row in 0..batch.num_rows() {
693 let eid = uni_common::core::id::Eid::from(eid_col.value(row));
694
695 if op_col.value(row) == 1 {
697 result.remove(&Vid::from(eid.as_u64()));
698 continue;
699 }
700
701 let mut props =
702 Self::extract_row_properties(&batch, row, &valid_props, type_props)?;
703 Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
704 result.insert(Vid::from(eid.as_u64()), props);
706 }
707 }
708 }
709
710 if let Some(ctx) = ctx {
712 for pending_l0_arc in &ctx.pending_flush_l0s {
714 let pending_l0 = pending_l0_arc.read();
715 self.overlay_l0_edge_batch(eids, &pending_l0, properties, &mut result);
716 }
717
718 let l0 = ctx.l0.read();
720 self.overlay_l0_edge_batch(eids, &l0, properties, &mut result);
721
722 if self.storage.version_high_water_mark().is_none()
726 && let Some(tx_l0_arc) = &ctx.transaction_l0
727 {
728 let tx_l0 = tx_l0_arc.read();
729 self.overlay_l0_edge_batch(eids, &tx_l0, properties, &mut result);
730 }
731 }
732
733 Ok(result)
734 }
735
736 fn overlay_l0_edge_batch(
737 &self,
738 eids: &[uni_common::core::id::Eid],
739 l0: &L0Buffer,
740 properties: &[&str],
741 result: &mut HashMap<Vid, Properties>,
742 ) {
743 let schema = self.schema_manager.schema();
744 for &eid in eids {
745 let vid_key = Vid::from(eid.as_u64());
746 if l0.tombstones.contains_key(&eid) {
747 result.remove(&vid_key);
748 continue;
749 }
750 if let Some(l0_props) = l0.edge_properties.get(&eid) {
751 let entry_version = l0.edge_versions.get(&eid).copied().unwrap_or(0);
753 if self
754 .storage
755 .version_high_water_mark()
756 .is_some_and(|hwm| entry_version > hwm)
757 {
758 continue;
759 }
760
761 let entry = result.entry(vid_key).or_default();
762 let type_name = l0.get_edge_type(eid);
764
765 let include_all = properties.contains(&"_all_props");
766 for (k, v) in l0_props {
767 if include_all || properties.contains(&k.as_str()) {
768 let is_crdt = type_name
770 .and_then(|tn| schema.properties.get(tn))
771 .and_then(|tp| tp.get(k))
772 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
773 .unwrap_or(false);
774
775 if is_crdt {
776 let existing = entry.entry(k.clone()).or_insert(Value::Null);
777 *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
778 } else {
779 entry.insert(k.clone(), v.clone());
780 }
781 }
782 }
783 }
784 }
785 }
786
787 pub async fn get_batch_labels(
789 &self,
790 vids: &[Vid],
791 ctx: Option<&QueryContext>,
792 ) -> Result<HashMap<Vid, Vec<String>>> {
793 let mut result = HashMap::new();
794 if vids.is_empty() {
795 return Ok(result);
796 }
797
798 if let Some(ctx) = ctx {
800 let mut collect_labels = |l0: &L0Buffer| {
801 for &vid in vids {
802 if let Some(labels) = l0.get_vertex_labels(vid) {
803 result
804 .entry(vid)
805 .or_default()
806 .extend(labels.iter().cloned());
807 }
808 }
809 };
810
811 for l0_arc in &ctx.pending_flush_l0s {
812 collect_labels(&l0_arc.read());
813 }
814 collect_labels(&ctx.l0.read());
815 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
816 collect_labels(&tx_l0_arc.read());
817 }
818 }
819
820 let mut vids_needing_lancedb = Vec::new();
822
823 fn merge_labels(existing: &mut Vec<String>, new_labels: Vec<String>) {
825 for l in new_labels {
826 if !existing.contains(&l) {
827 existing.push(l);
828 }
829 }
830 }
831
832 for &vid in vids {
833 if result.contains_key(&vid) {
834 continue; }
836
837 if let Some(labels) = self.storage.get_labels_from_index(vid) {
838 merge_labels(result.entry(vid).or_default(), labels);
839 } else {
840 vids_needing_lancedb.push(vid);
841 }
842 }
843
844 if !vids_needing_lancedb.is_empty() {
846 let backend = self.storage.backend();
847 let version = self.storage.version_high_water_mark();
848 let storage_labels = MainVertexDataset::find_batch_labels_by_vids(
849 backend,
850 &vids_needing_lancedb,
851 version,
852 )
853 .await?;
854
855 for (vid, labels) in storage_labels {
856 merge_labels(result.entry(vid).or_default(), labels);
857 }
858 }
859
860 for labels in result.values_mut() {
862 labels.sort();
863 labels.dedup();
864 }
865
866 Ok(result)
867 }
868
869 pub async fn get_all_vertex_props(&self, vid: Vid) -> Result<Properties> {
870 Ok(self
871 .get_all_vertex_props_with_ctx(vid, None)
872 .await?
873 .unwrap_or_default())
874 }
875
876 pub async fn get_all_vertex_props_with_ctx(
877 &self,
878 vid: Vid,
879 ctx: Option<&QueryContext>,
880 ) -> Result<Option<Properties>> {
881 if l0_visibility::is_vertex_deleted(vid, ctx) {
883 return Ok(None);
884 }
885
886 let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
888
889 let storage_props_opt = self.fetch_all_props_from_storage(vid).await?;
891
892 if l0_props.is_none() && storage_props_opt.is_none() {
894 return Ok(None);
895 }
896
897 let mut final_props = l0_props.unwrap_or_default();
898
899 if let Some(storage_props) = storage_props_opt {
901 for (k, v) in storage_props {
902 final_props.entry(k).or_insert(v);
903 }
904 }
905
906 if let Some(ctx) = ctx {
909 let labels = l0_visibility::get_vertex_labels(vid, ctx);
911 for label in &labels {
912 self.normalize_crdt_properties(&mut final_props, label)?;
913 }
914 }
915
916 Ok(Some(final_props))
917 }
918
919 pub async fn get_batch_vertex_props_for_label(
925 &self,
926 vids: &[Vid],
927 label: &str,
928 ctx: Option<&QueryContext>,
929 ) -> Result<HashMap<Vid, Properties>> {
930 let mut result: HashMap<Vid, Properties> = HashMap::new();
931 let mut need_storage: Vec<Vid> = Vec::new();
932
933 for &vid in vids {
935 if l0_visibility::is_vertex_deleted(vid, ctx) {
936 continue;
937 }
938 let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
939 if let Some(props) = l0_props {
940 result.insert(vid, props);
941 } else {
942 need_storage.push(vid);
943 }
944 }
945
946 if need_storage.is_empty() {
948 if ctx.is_some() {
950 for props in result.values_mut() {
951 self.normalize_crdt_properties(props, label)?;
952 }
953 }
954 return Ok(result);
955 }
956
957 let schema = self.schema_manager.schema();
959 let label_props = schema.properties.get(label);
960
961 let mut prop_names: Vec<String> = Vec::new();
962 if let Some(props) = label_props {
963 prop_names = props.keys().cloned().collect();
964 }
965
966 let mut columns: Vec<String> = vec![
967 "_vid".to_string(),
968 "_deleted".to_string(),
969 "_version".to_string(),
970 ];
971 columns.extend(prop_names.iter().cloned());
972 columns.push("overflow_json".to_string());
973
974 let vid_list: String = need_storage
976 .iter()
977 .map(|v| v.as_u64().to_string())
978 .collect::<Vec<_>>()
979 .join(", ");
980 let base_filter = format!("_vid IN ({})", vid_list);
981
982 let filter_expr = self.storage.apply_version_filter(base_filter);
983
984 let table_name = crate::backend::table_names::vertex_table_name(label);
985 let batches: Vec<RecordBatch> = self
986 .storage
987 .backend()
988 .scan(
989 crate::backend::types::ScanRequest::all(&table_name)
990 .with_filter(&filter_expr)
991 .with_columns(columns.clone()),
992 )
993 .await?;
994
995 let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
996
997 let mut per_vid_best_version: HashMap<Vid, u64> = HashMap::new();
999 let mut per_vid_props: HashMap<Vid, Properties> = HashMap::new();
1000
1001 for batch in batches {
1002 let vid_col = match batch
1003 .column_by_name("_vid")
1004 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1005 {
1006 Some(c) => c,
1007 None => continue,
1008 };
1009 let deleted_col = match batch
1010 .column_by_name("_deleted")
1011 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1012 {
1013 Some(c) => c,
1014 None => continue,
1015 };
1016 let version_col = match batch
1017 .column_by_name("_version")
1018 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1019 {
1020 Some(c) => c,
1021 None => continue,
1022 };
1023
1024 for row in 0..batch.num_rows() {
1025 let vid = Vid::from(vid_col.value(row));
1026 let version = version_col.value(row);
1027
1028 if deleted_col.value(row) {
1029 if per_vid_best_version
1030 .get(&vid)
1031 .is_none_or(|&best| version >= best)
1032 {
1033 per_vid_best_version.insert(vid, version);
1034 per_vid_props.remove(&vid);
1035 }
1036 continue;
1037 }
1038
1039 let mut current_props =
1040 Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1041
1042 if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1043 for (k, v) in overflow_props {
1044 current_props.entry(k).or_insert(v);
1045 }
1046 }
1047
1048 let best = per_vid_best_version.get(&vid).copied();
1049 let mut best_opt = best;
1050 let mut merged = per_vid_props.remove(&vid);
1051 self.merge_versioned_props(
1052 current_props,
1053 version,
1054 &mut best_opt,
1055 &mut merged,
1056 label_props,
1057 )?;
1058 if let Some(v) = best_opt {
1059 per_vid_best_version.insert(vid, v);
1060 }
1061 if let Some(p) = merged {
1062 per_vid_props.insert(vid, p);
1063 }
1064 }
1065 }
1066
1067 for (vid, storage_props) in per_vid_props {
1069 let entry = result.entry(vid).or_default();
1070 for (k, v) in storage_props {
1071 entry.entry(k).or_insert(v);
1072 }
1073 }
1074
1075 if ctx.is_some() {
1080 for props in result.values_mut() {
1081 self.normalize_crdt_properties(props, label)?;
1082 }
1083 }
1084
1085 Ok(result)
1086 }
1087
1088 pub async fn get_batch_edge_props_for_type(
1102 &self,
1103 eids: &[Eid],
1104 type_name: &str,
1105 ctx: Option<&QueryContext>,
1106 ) -> Result<HashMap<Eid, Properties>> {
1107 use crate::backend::table_names;
1108 use crate::backend::types::ScanRequest;
1109
1110 let mut result: HashMap<Eid, Properties> = HashMap::new();
1111 if eids.is_empty() {
1112 return Ok(result);
1113 }
1114
1115 let mut need_storage: Vec<Eid> = Vec::new();
1118 for &eid in eids {
1119 if l0_visibility::is_edge_deleted(eid, ctx) {
1120 continue;
1121 }
1122 let l0_props = l0_visibility::accumulate_edge_props(eid, ctx);
1123 if let Some(props) = l0_props {
1127 result.insert(eid, props);
1128 }
1129 need_storage.push(eid);
1130 }
1131
1132 if need_storage.is_empty() {
1133 return Ok(result);
1134 }
1135
1136 let schema = self.schema_manager.schema();
1138 let type_props = schema.properties.get(type_name);
1139
1140 if self.storage.delta_dataset(type_name, "fwd").is_err() {
1141 return Ok(result);
1142 }
1143
1144 let table_name = table_names::delta_table_name(type_name, "fwd");
1145 let backend = self.storage.backend();
1146 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1147 return Ok(result);
1148 }
1149
1150 let eid_list: String = need_storage
1151 .iter()
1152 .map(|e| e.as_u64().to_string())
1153 .collect::<Vec<_>>()
1154 .join(", ");
1155 let base_filter = format!("eid IN ({})", eid_list);
1156 let filter_expr = self.storage.apply_version_filter(base_filter);
1157
1158 let batches = match backend
1159 .scan(ScanRequest::all(&table_name).with_filter(filter_expr))
1160 .await
1161 {
1162 Ok(b) => b,
1163 Err(_) => return Ok(result), };
1165
1166 let mut per_eid_rows: HashMap<Eid, Vec<(u64, u8, Properties)>> = HashMap::new();
1168 for batch in batches {
1169 let eid_col = match batch
1170 .column_by_name("eid")
1171 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1172 {
1173 Some(c) => c,
1174 None => continue,
1175 };
1176 let op_col = match batch
1177 .column_by_name("op")
1178 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt8Array>())
1179 {
1180 Some(c) => c,
1181 None => continue,
1182 };
1183 let ver_col = match batch
1184 .column_by_name("_version")
1185 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1186 {
1187 Some(c) => c,
1188 None => continue,
1189 };
1190
1191 for row in 0..batch.num_rows() {
1192 let eid = Eid::from(eid_col.value(row));
1193 let ver = ver_col.value(row);
1194 let op = op_col.value(row);
1195 let mut props = Properties::new();
1196
1197 if op != 1
1198 && let Some(tp) = type_props
1199 {
1200 for (p_name, p_meta) in tp {
1201 if let Some(col) = batch.column_by_name(p_name)
1202 && !col.is_null(row)
1203 {
1204 let val = Self::value_from_column(col.as_ref(), &p_meta.r#type, row)?;
1205 props.insert(p_name.clone(), val);
1206 }
1207 }
1208 }
1209 per_eid_rows.entry(eid).or_default().push((ver, op, props));
1210 }
1211 }
1212
1213 for (eid, mut rows) in per_eid_rows {
1214 rows.sort_by_key(|(ver, _, _)| *ver);
1215
1216 let mut merged_props: Properties = Properties::new();
1217 let mut is_deleted = false;
1218
1219 for (_, op, props) in rows {
1220 if op == 1 {
1221 is_deleted = true;
1222 merged_props.clear();
1223 } else {
1224 is_deleted = false;
1225 for (p_name, p_val) in props {
1226 let is_crdt = type_props
1227 .and_then(|tp| tp.get(&p_name))
1228 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1229 .unwrap_or(false);
1230 if is_crdt {
1231 if let Some(existing) = merged_props.get(&p_name) {
1232 if let Ok(merged) = self.merge_crdt_values(existing, &p_val) {
1233 merged_props.insert(p_name, merged);
1234 }
1235 } else {
1236 merged_props.insert(p_name, p_val);
1237 }
1238 } else {
1239 merged_props.insert(p_name, p_val);
1240 }
1241 }
1242 }
1243 }
1244
1245 if is_deleted {
1246 result.remove(&eid);
1250 continue;
1251 }
1252
1253 let entry = result.entry(eid).or_default();
1256 for (k, v) in merged_props {
1257 entry.entry(k).or_insert(v);
1258 }
1259 }
1260
1261 Ok(result)
1262 }
1263
1264 fn normalize_crdt_properties(&self, props: &mut Properties, label: &str) -> Result<()> {
1268 let schema = self.schema_manager.schema();
1269 let label_props = match schema.properties.get(label) {
1270 Some(p) => p,
1271 None => return Ok(()),
1272 };
1273
1274 for (prop_name, prop_meta) in label_props {
1275 if let DataType::Crdt(_) = prop_meta.r#type
1276 && let Some(val) = props.get_mut(prop_name)
1277 {
1278 *val = Value::from(Self::parse_crdt_value(val)?);
1279 }
1280 }
1281
1282 Ok(())
1283 }
1284
1285 fn extract_row_properties(
1287 batch: &RecordBatch,
1288 row: usize,
1289 prop_names: &[&str],
1290 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1291 ) -> Result<Properties> {
1292 let mut props = Properties::new();
1293 for name in prop_names {
1294 let col = match batch.column_by_name(name) {
1295 Some(col) => col,
1296 None => continue,
1297 };
1298 if col.is_null(row) {
1299 continue;
1300 }
1301 if let Some(prop_meta) = label_props.and_then(|p| p.get(*name)) {
1302 let val = Self::value_from_column(col.as_ref(), &prop_meta.r#type, row)?;
1303 props.insert((*name).to_string(), val);
1304 }
1305 }
1306 Ok(props)
1307 }
1308
1309 fn extract_overflow_properties(batch: &RecordBatch, row: usize) -> Result<Option<Properties>> {
1314 use arrow_array::LargeBinaryArray;
1315
1316 let overflow_col = match batch.column_by_name("overflow_json") {
1317 Some(col) => col,
1318 None => return Ok(None), };
1320
1321 if overflow_col.is_null(row) {
1322 return Ok(None);
1323 }
1324
1325 let binary_array = overflow_col
1326 .as_any()
1327 .downcast_ref::<LargeBinaryArray>()
1328 .ok_or_else(|| anyhow!("overflow_json is not LargeBinaryArray"))?;
1329
1330 let jsonb_bytes = binary_array.value(row);
1331
1332 match uni_common::cypher_value_codec::decode(jsonb_bytes)
1336 .map_err(|e| anyhow!("Failed to decode CypherValue: {}", e))?
1337 {
1338 Value::Map(map) => Ok(Some(map)),
1339 Value::Null => Ok(None),
1340 other => Err(anyhow!(
1341 "overflow_json decoded to a non-map value: {other:?}"
1342 )),
1343 }
1344 }
1345
1346 fn merge_overflow_into_props(
1353 batch: &RecordBatch,
1354 row: usize,
1355 properties: &[&str],
1356 props: &mut Properties,
1357 ) -> Result<()> {
1358 use arrow_array::LargeBinaryArray;
1359
1360 let overflow_col = match batch.column_by_name("overflow_json") {
1361 Some(col) if !col.is_null(row) => col,
1362 _ => return Ok(()),
1363 };
1364
1365 if properties.contains(&"overflow_json")
1367 && let Some(binary_array) = overflow_col.as_any().downcast_ref::<LargeBinaryArray>()
1368 {
1369 let jsonb_bytes = binary_array.value(row);
1370 let bytes_list: Vec<Value> =
1371 jsonb_bytes.iter().map(|&b| Value::Int(b as i64)).collect();
1372 props.insert("overflow_json".to_string(), Value::List(bytes_list));
1373 }
1374
1375 if let Some(overflow_props) = Self::extract_overflow_properties(batch, row)? {
1377 for (k, v) in overflow_props {
1378 if properties.contains(&k.as_str()) {
1379 props.entry(k).or_insert(v);
1380 }
1381 }
1382 }
1383
1384 Ok(())
1385 }
1386
1387 fn merge_crdt_into(
1389 &self,
1390 target: &mut Properties,
1391 source: Properties,
1392 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1393 crdt_only: bool,
1394 ) -> Result<()> {
1395 for (k, v) in source {
1396 if let Some(prop_meta) = label_props.and_then(|p| p.get(&k)) {
1397 if let DataType::Crdt(_) = prop_meta.r#type {
1398 let existing_v = target.entry(k).or_insert(Value::Null);
1399 *existing_v = self.merge_crdt_values(existing_v, &v)?;
1400 } else if !crdt_only {
1401 target.insert(k, v);
1402 }
1403 }
1404 }
1405 Ok(())
1406 }
1407
1408 fn merge_versioned_props(
1410 &self,
1411 current_props: Properties,
1412 version: u64,
1413 best_version: &mut Option<u64>,
1414 best_props: &mut Option<Properties>,
1415 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1416 ) -> Result<()> {
1417 if best_version.is_none_or(|best| version > best) {
1418 if let Some(mut existing_props) = best_props.take() {
1420 let mut merged = current_props;
1422 for (k, v) in merged.iter_mut() {
1423 if let Some(prop_meta) = label_props.and_then(|p| p.get(k))
1424 && let DataType::Crdt(_) = prop_meta.r#type
1425 && let Some(existing_val) = existing_props.remove(k)
1426 {
1427 *v = self.merge_crdt_values(v, &existing_val)?;
1428 }
1429 }
1430 *best_props = Some(merged);
1431 } else {
1432 *best_props = Some(current_props);
1433 }
1434 *best_version = Some(version);
1435 } else if Some(version) == *best_version {
1436 if let Some(existing_props) = best_props.as_mut() {
1438 self.merge_crdt_into(existing_props, current_props, label_props, false)?;
1439 } else {
1440 *best_props = Some(current_props);
1441 }
1442 } else {
1443 if let Some(existing_props) = best_props.as_mut() {
1445 self.merge_crdt_into(existing_props, current_props, label_props, true)?;
1446 }
1447 }
1448 Ok(())
1449 }
1450
1451 async fn fetch_all_props_from_storage(&self, vid: Vid) -> Result<Option<Properties>> {
1452 let schema = self.schema_manager.schema();
1455 let mut merged_props: Option<Properties> = None;
1456 let mut global_best_version: Option<u64> = None;
1457
1458 let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1460 {
1461 labels
1462 } else {
1463 schema.labels.keys().cloned().collect() };
1465
1466 for label_name in &label_names {
1467 let label_props = schema.properties.get(label_name);
1468
1469 let mut prop_names: Vec<String> = Vec::new();
1471 if let Some(props) = label_props {
1472 prop_names = props.keys().cloned().collect();
1473 }
1474
1475 let mut columns: Vec<String> = vec!["_deleted".to_string(), "_version".to_string()];
1477 columns.extend(prop_names.iter().cloned());
1478 columns.push("overflow_json".to_string());
1480
1481 let base_filter = format!("_vid = {}", vid.as_u64());
1483
1484 let filter_expr = self.storage.apply_version_filter(base_filter);
1485
1486 let table_name = crate::backend::table_names::vertex_table_name(label_name);
1487 let batches: Vec<RecordBatch> = match self
1488 .storage
1489 .backend()
1490 .scan(
1491 crate::backend::types::ScanRequest::all(&table_name)
1492 .with_filter(&filter_expr)
1493 .with_columns(columns.clone()),
1494 )
1495 .await
1496 {
1497 Ok(b) => b,
1498 Err(_) => continue,
1499 };
1500
1501 let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
1503
1504 for batch in batches {
1505 let deleted_col = match batch
1506 .column_by_name("_deleted")
1507 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1508 {
1509 Some(c) => c,
1510 None => continue,
1511 };
1512 let version_col = match batch
1513 .column_by_name("_version")
1514 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1515 {
1516 Some(c) => c,
1517 None => continue,
1518 };
1519
1520 for row in 0..batch.num_rows() {
1521 let version = version_col.value(row);
1522
1523 if deleted_col.value(row) {
1524 if global_best_version.is_none_or(|best| version >= best) {
1525 global_best_version = Some(version);
1526 merged_props = None;
1527 }
1528 continue;
1529 }
1530
1531 let mut current_props =
1532 Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1533
1534 if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1536 for (k, v) in overflow_props {
1538 current_props.entry(k).or_insert(v);
1539 }
1540 }
1541
1542 self.merge_versioned_props(
1543 current_props,
1544 version,
1545 &mut global_best_version,
1546 &mut merged_props,
1547 label_props,
1548 )?;
1549 }
1550 }
1551 }
1552
1553 if merged_props.is_none()
1558 && global_best_version.is_none()
1559 && let Some(main_props) = MainVertexDataset::find_props_by_vid(
1560 self.storage.backend(),
1561 vid,
1562 self.storage.version_high_water_mark(),
1563 )
1564 .await?
1565 {
1566 return Ok(Some(main_props));
1567 }
1568
1569 Ok(merged_props)
1570 }
1571
1572 pub async fn get_vertex_prop(&self, vid: Vid, prop: &str) -> Result<Value> {
1573 self.get_vertex_prop_with_ctx(vid, prop, None).await
1574 }
1575
1576 #[instrument(skip(self, ctx), level = "trace")]
1577 pub async fn get_vertex_prop_with_ctx(
1578 &self,
1579 vid: Vid,
1580 prop: &str,
1581 ctx: Option<&QueryContext>,
1582 ) -> Result<Value> {
1583 if l0_visibility::is_vertex_deleted(vid, ctx) {
1585 return Ok(Value::Null);
1586 }
1587
1588 let schema = self.schema_manager.schema();
1591 let labels = ctx
1592 .map(|c| l0_visibility::get_vertex_labels(vid, c))
1593 .unwrap_or_default();
1594
1595 let is_crdt = if !labels.is_empty() {
1596 labels.iter().any(|ln| {
1598 schema
1599 .properties
1600 .get(ln)
1601 .and_then(|lp| lp.get(prop))
1602 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1603 .unwrap_or(false)
1604 })
1605 } else {
1606 schema.properties.values().any(|label_props| {
1608 label_props
1609 .get(prop)
1610 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1611 .unwrap_or(false)
1612 })
1613 };
1614
1615 if is_crdt {
1617 let l0_val = self.accumulate_crdt_from_l0(vid, prop, ctx)?;
1619 return self.finalize_crdt_lookup(vid, prop, l0_val).await;
1620 }
1621
1622 if let Some(val) = l0_visibility::lookup_vertex_prop(vid, prop, ctx) {
1624 return Ok(val);
1625 }
1626
1627 if let Some(ref cache) = self.vertex_cache {
1629 let mut cache = cache.lock().await;
1630 if let Some(val) = cache.get(&(vid, prop.to_string())) {
1631 debug!(vid = ?vid, prop, "Cache HIT");
1632 metrics::counter!("uni_property_cache_hits_total", "type" => "vertex").increment(1);
1633 return Ok(val.clone());
1634 } else {
1635 debug!(vid = ?vid, prop, "Cache MISS");
1636 metrics::counter!("uni_property_cache_misses_total", "type" => "vertex")
1637 .increment(1);
1638 }
1639 }
1640
1641 let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1643
1644 if let Some(ref cache) = self.vertex_cache {
1646 let mut cache = cache.lock().await;
1647 cache.put((vid, prop.to_string()), storage_val.clone());
1648 }
1649
1650 Ok(storage_val)
1651 }
1652
1653 fn accumulate_crdt_from_l0(
1655 &self,
1656 vid: Vid,
1657 prop: &str,
1658 ctx: Option<&QueryContext>,
1659 ) -> Result<Value> {
1660 let mut merged = Value::Null;
1661 l0_visibility::visit_l0_buffers(ctx, |l0| {
1662 if let Some(props) = l0.vertex_properties.get(&vid)
1663 && let Some(val) = props.get(prop)
1664 {
1665 if let Ok(new_merged) = self.merge_crdt_values(&merged, val) {
1667 merged = new_merged;
1668 }
1669 }
1670 false });
1672 Ok(merged)
1673 }
1674
1675 async fn finalize_crdt_lookup(&self, vid: Vid, prop: &str, l0_val: Value) -> Result<Value> {
1677 let cached_val = if let Some(ref cache) = self.vertex_cache {
1679 let mut cache = cache.lock().await;
1680 cache.get(&(vid, prop.to_string())).cloned()
1681 } else {
1682 None
1683 };
1684
1685 if let Some(val) = cached_val {
1686 let merged = self.merge_crdt_values(&val, &l0_val)?;
1687 return Ok(merged);
1688 }
1689
1690 let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1692
1693 if let Some(ref cache) = self.vertex_cache {
1695 let mut cache = cache.lock().await;
1696 cache.put((vid, prop.to_string()), storage_val.clone());
1697 }
1698
1699 self.merge_crdt_values(&storage_val, &l0_val)
1701 }
1702
1703 async fn fetch_prop_from_storage(&self, vid: Vid, prop: &str) -> Result<Value> {
1704 let schema = self.schema_manager.schema();
1707 let mut best_version: Option<u64> = None;
1708 let mut best_value: Option<Value> = None;
1709
1710 let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1712 {
1713 labels
1714 } else {
1715 schema.labels.keys().cloned().collect() };
1717
1718 for label_name in &label_names {
1719 let prop_meta = schema
1721 .properties
1722 .get(label_name)
1723 .and_then(|props| props.get(prop));
1724
1725 let base_filter = format!("_vid = {}", vid.as_u64());
1729
1730 let filter_expr = self.storage.apply_version_filter(base_filter);
1731
1732 let mut columns = vec![
1734 "_deleted".to_string(),
1735 "_version".to_string(),
1736 "overflow_json".to_string(),
1737 ];
1738
1739 if prop_meta.is_some() {
1741 columns.push(prop.to_string());
1742 }
1743
1744 let table_name = crate::backend::table_names::vertex_table_name(label_name);
1745 let batches: Vec<RecordBatch> = match self
1746 .storage
1747 .backend()
1748 .scan(
1749 crate::backend::types::ScanRequest::all(&table_name)
1750 .with_filter(&filter_expr)
1751 .with_columns(columns),
1752 )
1753 .await
1754 {
1755 Ok(b) => b,
1756 Err(_) => continue,
1757 };
1758
1759 for batch in batches {
1760 let deleted_col = match batch
1761 .column_by_name("_deleted")
1762 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1763 {
1764 Some(c) => c,
1765 None => continue,
1766 };
1767 let version_col = match batch
1768 .column_by_name("_version")
1769 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1770 {
1771 Some(c) => c,
1772 None => continue,
1773 };
1774 for row in 0..batch.num_rows() {
1775 let version = version_col.value(row);
1776
1777 if deleted_col.value(row) {
1778 if best_version.is_none_or(|best| version >= best) {
1779 best_version = Some(version);
1780 best_value = None;
1781 }
1782 continue;
1783 }
1784
1785 let mut val = None;
1787 if let Some(meta) = prop_meta
1788 && let Some(col) = batch.column_by_name(prop)
1789 {
1790 val = Some(if col.is_null(row) {
1791 Value::Null
1792 } else {
1793 Self::value_from_column(col, &meta.r#type, row)?
1794 });
1795 }
1796
1797 if val.is_none()
1799 && let Some(overflow_props) =
1800 Self::extract_overflow_properties(&batch, row)?
1801 && let Some(overflow_val) = overflow_props.get(prop)
1802 {
1803 val = Some(overflow_val.clone());
1804 }
1805
1806 if let Some(v) = val {
1808 if let Some(meta) = prop_meta {
1809 self.merge_prop_value(
1811 v,
1812 version,
1813 &meta.r#type,
1814 &mut best_version,
1815 &mut best_value,
1816 )?;
1817 } else {
1818 if best_version.is_none_or(|best| version >= best) {
1820 best_version = Some(version);
1821 best_value = Some(v);
1822 }
1823 }
1824 }
1825 }
1826 }
1827 }
1828
1829 if best_value.is_none()
1835 && best_version.is_none()
1836 && let Some(main_props) = MainVertexDataset::find_props_by_vid(
1837 self.storage.backend(),
1838 vid,
1839 self.storage.version_high_water_mark(),
1840 )
1841 .await?
1842 {
1843 return Ok(main_props.get(prop).cloned().unwrap_or(Value::Null));
1844 }
1845
1846 Ok(best_value.unwrap_or(Value::Null))
1847 }
1848
1849 pub fn value_from_column(col: &dyn Array, data_type: &DataType, row: usize) -> Result<Value> {
1851 crate::storage::value_codec::decode_column_value(
1852 col,
1853 data_type,
1854 row,
1855 CrdtDecodeMode::Strict,
1856 )
1857 }
1858
1859 pub fn merge_crdt_values(&self, a: &Value, b: &Value) -> Result<Value> {
1872 if a.is_null() {
1876 return Self::parse_crdt_value(b).map(Value::from);
1877 }
1878 if b.is_null() {
1879 return Self::parse_crdt_value(a).map(Value::from);
1880 }
1881
1882 let a_parsed = Self::parse_crdt_value(a)?;
1883 let b_parsed = Self::parse_crdt_value(b)?;
1884
1885 let mut crdt_a: Crdt = serde_json::from_value(a_parsed)?;
1886 let crdt_b: Crdt = serde_json::from_value(b_parsed)?;
1887 crdt_a
1893 .merge_via_registry(&crdt_b, &self.plugin_registry)
1894 .map_err(|e| anyhow::anyhow!("{e}"))?;
1895 Ok(Value::from(serde_json::to_value(crdt_a)?))
1896 }
1897
1898 fn parse_crdt_value(val: &Value) -> Result<serde_json::Value> {
1901 if let Value::String(s) = val {
1902 serde_json::from_str(s).map_err(|e| anyhow!("Failed to parse CRDT JSON string: {}", e))
1904 } else {
1905 Ok(serde_json::Value::from(val.clone()))
1907 }
1908 }
1909
1910 fn merge_prop_value(
1912 &self,
1913 val: Value,
1914 version: u64,
1915 data_type: &DataType,
1916 best_version: &mut Option<u64>,
1917 best_value: &mut Option<Value>,
1918 ) -> Result<()> {
1919 if let DataType::Crdt(_) = data_type {
1920 self.merge_crdt_prop_value(val, version, best_version, best_value)
1921 } else {
1922 if best_version.is_none_or(|best| version >= best) {
1924 *best_version = Some(version);
1925 *best_value = Some(val);
1926 }
1927 Ok(())
1928 }
1929 }
1930
1931 fn merge_crdt_prop_value(
1933 &self,
1934 val: Value,
1935 version: u64,
1936 best_version: &mut Option<u64>,
1937 best_value: &mut Option<Value>,
1938 ) -> Result<()> {
1939 if best_version.is_none_or(|best| version > best) {
1940 if let Some(existing) = best_value.take() {
1942 *best_value = Some(self.merge_crdt_values(&val, &existing)?);
1943 } else {
1944 *best_value = Some(val);
1945 }
1946 *best_version = Some(version);
1947 } else if Some(version) == *best_version {
1948 let existing = best_value.get_or_insert(Value::Null);
1950 *existing = self.merge_crdt_values(existing, &val)?;
1951 } else {
1952 if let Some(existing) = best_value.as_mut() {
1954 *existing = self.merge_crdt_values(existing, &val)?;
1955 }
1956 }
1957 Ok(())
1958 }
1959}