1use crate::runtime::context::QueryContext;
5use crate::runtime::l0::L0Buffer;
6use crate::runtime::l0_visibility;
7use crate::storage::main_vertex::MainVertexDataset;
8use crate::storage::manager::StorageManager;
9use crate::storage::value_codec::CrdtDecodeMode;
10use anyhow::{Result, anyhow};
11use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
12use lru::LruCache;
13use metrics;
14use std::collections::HashMap;
15use std::num::NonZeroUsize;
16use std::sync::Arc;
17use tokio::sync::Mutex;
18use tracing::{debug, instrument, warn};
19use uni_common::Properties;
20use uni_common::Value;
21use uni_common::core::id::{Eid, Vid};
22use uni_common::core::schema::{DataType, SchemaManager};
23use uni_crdt::Crdt;
24
25pub struct PropertyManager {
26 storage: Arc<StorageManager>,
27 schema_manager: Arc<SchemaManager>,
28 vertex_cache: Option<Mutex<LruCache<(Vid, String), Value>>>,
30 edge_cache: Option<Mutex<LruCache<(uni_common::core::id::Eid, String), Value>>>,
31 cache_capacity: usize,
32}
33
34impl PropertyManager {
35 pub fn new(
36 storage: Arc<StorageManager>,
37 schema_manager: Arc<SchemaManager>,
38 capacity: usize,
39 ) -> Self {
40 let (vertex_cache, edge_cache) = if capacity == 0 {
42 (None, None)
43 } else {
44 let cap = NonZeroUsize::new(capacity).unwrap();
45 (
46 Some(Mutex::new(LruCache::new(cap))),
47 Some(Mutex::new(LruCache::new(cap))),
48 )
49 };
50
51 Self {
52 storage,
53 schema_manager,
54 vertex_cache,
55 edge_cache,
56 cache_capacity: capacity,
57 }
58 }
59
60 pub fn cache_size(&self) -> usize {
61 self.cache_capacity
62 }
63
64 pub fn caching_enabled(&self) -> bool {
66 self.cache_capacity > 0
67 }
68
69 pub async fn clear_cache(&self) {
72 if let Some(ref cache) = self.vertex_cache {
73 cache.lock().await.clear();
74 }
75 if let Some(ref cache) = self.edge_cache {
76 cache.lock().await.clear();
77 }
78 }
79
80 pub async fn invalidate_vertex(&self, _vid: Vid) {
82 if let Some(ref cache) = self.vertex_cache {
83 let mut cache = cache.lock().await;
84 cache.clear();
88 }
89 }
90
91 pub async fn invalidate_edge(&self, _eid: uni_common::core::id::Eid) {
93 if let Some(ref cache) = self.edge_cache {
94 let mut cache = cache.lock().await;
95 cache.clear();
97 }
98 }
99
100 #[instrument(skip(self, ctx), level = "trace")]
101 pub async fn get_edge_prop(
102 &self,
103 eid: uni_common::core::id::Eid,
104 prop: &str,
105 ctx: Option<&QueryContext>,
106 ) -> Result<Value> {
107 if l0_visibility::is_edge_deleted(eid, ctx) {
109 return Ok(Value::Null);
110 }
111
112 if let Some(val) = l0_visibility::lookup_edge_prop(eid, prop, ctx) {
114 return Ok(val);
115 }
116
117 if let Some(ref cache) = self.edge_cache {
119 let mut cache = cache.lock().await;
120 if let Some(val) = cache.get(&(eid, prop.to_string())) {
121 debug!(eid = ?eid, prop, "Cache HIT");
122 metrics::counter!("uni_property_cache_hits_total", "type" => "edge").increment(1);
123 return Ok(val.clone());
124 } else {
125 debug!(eid = ?eid, prop, "Cache MISS");
126 metrics::counter!("uni_property_cache_misses_total", "type" => "edge").increment(1);
127 }
128 }
129
130 let all = self.get_all_edge_props_with_ctx(eid, ctx).await?;
132 let val = all
133 .as_ref()
134 .and_then(|props| props.get(prop).cloned())
135 .unwrap_or(Value::Null);
136
137 if let Some(ref cache) = self.edge_cache {
139 let mut cache = cache.lock().await;
140 if let Some(ref props) = all {
141 for (prop_name, prop_val) in props {
142 cache.put((eid, prop_name.clone()), prop_val.clone());
143 }
144 } else {
145 cache.put((eid, prop.to_string()), Value::Null);
147 }
148 }
149
150 Ok(val)
151 }
152
153 pub async fn get_all_edge_props_with_ctx(
154 &self,
155 eid: uni_common::core::id::Eid,
156 ctx: Option<&QueryContext>,
157 ) -> Result<Option<Properties>> {
158 if l0_visibility::is_edge_deleted(eid, ctx) {
160 return Ok(None);
161 }
162
163 let mut final_props = l0_visibility::accumulate_edge_props(eid, ctx).unwrap_or_default();
165
166 let storage_props = self.fetch_all_edge_props_from_storage(eid).await?;
168
169 if final_props.is_empty() && storage_props.is_none() {
171 if l0_visibility::edge_exists_in_l0(eid, ctx) {
172 return Ok(Some(Properties::new()));
173 }
174 return Ok(None);
175 }
176
177 if let Some(sp) = storage_props {
179 for (k, v) in sp {
180 final_props.entry(k).or_insert(v);
181 }
182 }
183
184 Ok(Some(final_props))
185 }
186
187 async fn fetch_all_edge_props_from_storage(&self, eid: Eid) -> Result<Option<Properties>> {
188 self.fetch_all_edge_props_from_storage_with_hint(eid, None)
190 .await
191 }
192
193 async fn fetch_all_edge_props_from_storage_with_hint(
194 &self,
195 eid: Eid,
196 type_name_hint: Option<&str>,
197 ) -> Result<Option<Properties>> {
198 let schema = self.schema_manager.schema();
199 let backend = self.storage.backend();
200
201 let type_names: Vec<&str> = if let Some(hint) = type_name_hint {
203 vec![hint]
204 } else {
205 schema.edge_types.keys().map(|s| s.as_str()).collect()
207 };
208
209 for type_name in type_names {
210 let type_props = schema.properties.get(type_name);
211
212 if self.storage.delta_dataset(type_name, "fwd").is_err() {
215 continue; }
217
218 use crate::backend::table_names;
220 use crate::backend::types::ScanRequest;
221
222 let table_name = table_names::delta_table_name(type_name, "fwd");
223 if !backend.table_exists(&table_name).await.unwrap_or(false) {
224 continue; }
226
227 let base_filter = format!("eid = {}", eid.as_u64());
228 let filter_expr = self.storage.apply_version_filter(base_filter);
229
230 let batches = match backend
231 .scan(ScanRequest::all(&table_name).with_filter(filter_expr))
232 .await
233 {
234 Ok(b) => b,
235 Err(_) => continue,
236 };
237
238 let mut rows: Vec<(u64, u8, Properties)> = Vec::new();
240
241 for batch in batches {
242 let op_col = match batch.column_by_name("op") {
243 Some(c) => c
244 .as_any()
245 .downcast_ref::<arrow_array::UInt8Array>()
246 .unwrap(),
247 None => continue,
248 };
249 let ver_col = match batch.column_by_name("_version") {
250 Some(c) => c.as_any().downcast_ref::<UInt64Array>().unwrap(),
251 None => continue,
252 };
253
254 for row in 0..batch.num_rows() {
255 let ver = ver_col.value(row);
256 let op = op_col.value(row);
257 let mut props = Properties::new();
258
259 if op != 1 {
260 if let Some(tp) = type_props {
262 for (p_name, p_meta) in tp {
263 if let Some(col) = batch.column_by_name(p_name)
264 && !col.is_null(row)
265 {
266 let val =
267 Self::value_from_column(col.as_ref(), &p_meta.r#type, row)?;
268 props.insert(p_name.clone(), val);
269 }
270 }
271 }
272 }
273 rows.push((ver, op, props));
274 }
275 }
276
277 if rows.is_empty() {
278 continue;
279 }
280
281 rows.sort_by_key(|(ver, _, _)| *ver);
283
284 let mut merged_props: Properties = Properties::new();
288 let mut is_deleted = false;
289
290 for (_, op, props) in rows {
291 if op == 1 {
292 is_deleted = true;
294 merged_props.clear();
295 } else {
296 is_deleted = false;
297 for (p_name, p_val) in props {
298 let is_crdt = type_props
300 .and_then(|tp| tp.get(&p_name))
301 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
302 .unwrap_or(false);
303
304 if is_crdt {
305 if let Some(existing) = merged_props.get(&p_name) {
307 if let Ok(merged) = self.merge_crdt_values(existing, &p_val) {
308 merged_props.insert(p_name, merged);
309 }
310 } else {
311 merged_props.insert(p_name, p_val);
312 }
313 } else {
314 merged_props.insert(p_name, p_val);
316 }
317 }
318 }
319 }
320
321 if is_deleted {
322 return Ok(None);
323 }
324
325 if !merged_props.is_empty() {
326 return Ok(Some(merged_props));
327 }
328 }
329
330 use crate::storage::main_edge::MainEdgeDataset;
332 if let Some(props) = MainEdgeDataset::find_props_by_eid(self.storage.backend(), eid).await?
333 {
334 return Ok(Some(props));
335 }
336
337 Ok(None)
338 }
339
340 pub async fn get_batch_vertex_props(
342 &self,
343 vids: &[Vid],
344 properties: &[&str],
345 ctx: Option<&QueryContext>,
346 ) -> Result<HashMap<Vid, Properties>> {
347 let schema = self.schema_manager.schema();
348 let mut result = HashMap::new();
349 if vids.is_empty() {
350 return Ok(result);
351 }
352
353 let labels_to_scan: Vec<String> = {
358 let mut needed: std::collections::HashSet<String> = std::collections::HashSet::new();
359 let mut all_resolved = true;
360 for &vid in vids {
361 if let Some(labels) = self.storage.get_labels_from_index(vid) {
362 needed.extend(labels);
363 } else {
364 all_resolved = false;
365 break;
366 }
367 }
368 if all_resolved {
369 needed.into_iter().collect()
370 } else {
371 schema.labels.keys().cloned().collect() }
373 };
374
375 for label_name in &labels_to_scan {
377 let label_schema_props = schema.properties.get(label_name);
379 let valid_props: Vec<&str> = properties
380 .iter()
381 .cloned()
382 .filter(|p| label_schema_props.is_some_and(|props| props.contains_key(*p)))
383 .collect();
384 let ds = self.storage.vertex_dataset(label_name)?;
387 let backend = self.storage.backend();
388 let vtable_name = ds.table_name();
389
390 if !backend.table_exists(&vtable_name).await.unwrap_or(false) {
391 continue; }
393
394 let vid_list = vids
396 .iter()
397 .map(|v| v.as_u64().to_string())
398 .collect::<Vec<_>>()
399 .join(",");
400 let base_filter = format!("_vid IN ({})", vid_list);
401
402 let final_filter = self.storage.apply_version_filter(base_filter);
403
404 let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
406 columns.push("_vid".to_string());
407 columns.push("_version".to_string());
408 columns.push("_deleted".to_string());
409 columns.extend(valid_props.iter().map(|s| s.to_string()));
410 columns.push("overflow_json".to_string());
412
413 use crate::backend::types::ScanRequest;
414 let request = ScanRequest::all(&vtable_name)
415 .with_filter(final_filter)
416 .with_columns(columns);
417
418 let batches: Vec<RecordBatch> = match backend.scan(request).await {
419 Ok(b) => b,
420 Err(e) => {
421 warn!(
422 label = %label_name,
423 error = %e,
424 "failed to scan label table, skipping"
425 );
426 continue;
427 }
428 };
429 for batch in batches {
430 let vid_col = match batch
431 .column_by_name("_vid")
432 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
433 {
434 Some(c) => c,
435 None => continue,
436 };
437 let del_col = match batch
438 .column_by_name("_deleted")
439 .and_then(|col| col.as_any().downcast_ref::<BooleanArray>())
440 {
441 Some(c) => c,
442 None => continue,
443 };
444
445 for row in 0..batch.num_rows() {
446 let vid = Vid::from(vid_col.value(row));
447
448 if del_col.value(row) {
449 result.remove(&vid);
450 continue;
451 }
452
453 let label_props = schema.properties.get(label_name);
454 let mut props =
455 Self::extract_row_properties(&batch, row, &valid_props, label_props)?;
456 Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
457 result.insert(vid, props);
458 }
459 }
460 }
461
462 if let Some(ctx) = ctx {
464 for pending_l0_arc in &ctx.pending_flush_l0s {
466 let pending_l0 = pending_l0_arc.read();
467 self.overlay_l0_batch(vids, &pending_l0, properties, &mut result);
468 }
469
470 let l0 = ctx.l0.read();
472 self.overlay_l0_batch(vids, &l0, properties, &mut result);
473
474 if self.storage.version_high_water_mark().is_none()
478 && let Some(tx_l0_arc) = &ctx.transaction_l0
479 {
480 let tx_l0 = tx_l0_arc.read();
481 self.overlay_l0_batch(vids, &tx_l0, properties, &mut result);
482 }
483 }
484
485 Ok(result)
486 }
487
488 fn overlay_l0_batch(
489 &self,
490 vids: &[Vid],
491 l0: &L0Buffer,
492 properties: &[&str],
493 result: &mut HashMap<Vid, Properties>,
494 ) {
495 let schema = self.schema_manager.schema();
496 for &vid in vids {
497 if l0.vertex_tombstones.contains(&vid) {
499 result.remove(&vid);
500 continue;
501 }
502 if let Some(l0_props) = l0.vertex_properties.get(&vid) {
504 let entry_version = l0.vertex_versions.get(&vid).copied().unwrap_or(0);
506 if self
507 .storage
508 .version_high_water_mark()
509 .is_some_and(|hwm| entry_version > hwm)
510 {
511 continue;
512 }
513
514 let entry = result.entry(vid).or_default();
515 let labels = l0.get_vertex_labels(vid);
517
518 for (k, v) in l0_props {
519 if properties.contains(&k.as_str()) {
520 let is_crdt = labels
522 .and_then(|label_list| {
523 label_list.iter().find_map(|ln| {
524 schema
525 .properties
526 .get(ln)
527 .and_then(|lp| lp.get(k))
528 .filter(|pm| matches!(pm.r#type, DataType::Crdt(_)))
529 })
530 })
531 .is_some();
532
533 if is_crdt {
534 let existing = entry.entry(k.clone()).or_insert(Value::Null);
535 *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
536 } else {
537 entry.insert(k.clone(), v.clone());
538 }
539 }
540 }
541 }
542 }
543 }
544
545 pub async fn get_batch_edge_props(
548 &self,
549 eids: &[uni_common::core::id::Eid],
550 properties: &[&str],
551 ctx: Option<&QueryContext>,
552 ) -> Result<HashMap<Vid, Properties>> {
553 let schema = self.schema_manager.schema();
554 let mut result = HashMap::new();
555 if eids.is_empty() {
556 return Ok(result);
557 }
558
559 let types_to_scan: Vec<String> = {
564 if let Some(ctx) = ctx {
565 let mut needed: std::collections::HashSet<String> =
566 std::collections::HashSet::new();
567 let mut all_resolved = true;
568 for &eid in eids {
569 if let Some(etype) = ctx.l0.read().get_edge_type(eid) {
570 needed.insert(etype.to_string());
571 } else {
572 all_resolved = false;
573 break;
574 }
575 }
576 if all_resolved {
577 needed.into_iter().collect()
578 } else {
579 schema.edge_types.keys().cloned().collect() }
581 } else {
582 schema.edge_types.keys().cloned().collect() }
584 };
585
586 for type_name in &types_to_scan {
588 let type_props = schema.properties.get(type_name);
589 let valid_props: Vec<&str> = properties
590 .iter()
591 .cloned()
592 .filter(|p| type_props.is_some_and(|props| props.contains_key(*p)))
593 .collect();
594 let delta_ds = match self.storage.delta_dataset(type_name, "fwd") {
597 Ok(ds) => ds,
598 Err(_) => continue,
599 };
600 let backend = self.storage.backend();
601 let dtable_name = delta_ds.table_name();
602
603 if !backend.table_exists(&dtable_name).await.unwrap_or(false) {
604 continue; }
606
607 let eid_list = eids
608 .iter()
609 .map(|e| e.as_u64().to_string())
610 .collect::<Vec<_>>()
611 .join(",");
612 let base_filter = format!("eid IN ({})", eid_list);
613
614 let final_filter = self.storage.apply_version_filter(base_filter);
615
616 let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
618 columns.push("eid".to_string());
619 columns.push("_version".to_string());
620 columns.push("op".to_string());
621 columns.extend(valid_props.iter().map(|s| s.to_string()));
622 columns.push("overflow_json".to_string());
624
625 use crate::backend::types::ScanRequest;
626 let request = ScanRequest::all(&dtable_name)
627 .with_filter(final_filter)
628 .with_columns(columns);
629
630 let batches: Vec<RecordBatch> = match backend.scan(request).await {
631 Ok(b) => b,
632 Err(e) => {
633 warn!(
634 edge_type = %type_name,
635 error = %e,
636 "failed to scan edge delta table, skipping"
637 );
638 continue;
639 }
640 };
641 for batch in batches {
642 let eid_col = match batch
643 .column_by_name("eid")
644 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
645 {
646 Some(c) => c,
647 None => continue,
648 };
649 let op_col = match batch
650 .column_by_name("op")
651 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
652 {
653 Some(c) => c,
654 None => continue,
655 };
656
657 for row in 0..batch.num_rows() {
658 let eid = uni_common::core::id::Eid::from(eid_col.value(row));
659
660 if op_col.value(row) == 1 {
662 result.remove(&Vid::from(eid.as_u64()));
663 continue;
664 }
665
666 let mut props =
667 Self::extract_row_properties(&batch, row, &valid_props, type_props)?;
668 Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
669 result.insert(Vid::from(eid.as_u64()), props);
671 }
672 }
673 }
674
675 if let Some(ctx) = ctx {
677 for pending_l0_arc in &ctx.pending_flush_l0s {
679 let pending_l0 = pending_l0_arc.read();
680 self.overlay_l0_edge_batch(eids, &pending_l0, properties, &mut result);
681 }
682
683 let l0 = ctx.l0.read();
685 self.overlay_l0_edge_batch(eids, &l0, properties, &mut result);
686
687 if self.storage.version_high_water_mark().is_none()
691 && let Some(tx_l0_arc) = &ctx.transaction_l0
692 {
693 let tx_l0 = tx_l0_arc.read();
694 self.overlay_l0_edge_batch(eids, &tx_l0, properties, &mut result);
695 }
696 }
697
698 Ok(result)
699 }
700
701 fn overlay_l0_edge_batch(
702 &self,
703 eids: &[uni_common::core::id::Eid],
704 l0: &L0Buffer,
705 properties: &[&str],
706 result: &mut HashMap<Vid, Properties>,
707 ) {
708 let schema = self.schema_manager.schema();
709 for &eid in eids {
710 let vid_key = Vid::from(eid.as_u64());
711 if l0.tombstones.contains_key(&eid) {
712 result.remove(&vid_key);
713 continue;
714 }
715 if let Some(l0_props) = l0.edge_properties.get(&eid) {
716 let entry_version = l0.edge_versions.get(&eid).copied().unwrap_or(0);
718 if self
719 .storage
720 .version_high_water_mark()
721 .is_some_and(|hwm| entry_version > hwm)
722 {
723 continue;
724 }
725
726 let entry = result.entry(vid_key).or_default();
727 let type_name = l0.get_edge_type(eid);
729
730 let include_all = properties.contains(&"_all_props");
731 for (k, v) in l0_props {
732 if include_all || properties.contains(&k.as_str()) {
733 let is_crdt = type_name
735 .and_then(|tn| schema.properties.get(tn))
736 .and_then(|tp| tp.get(k))
737 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
738 .unwrap_or(false);
739
740 if is_crdt {
741 let existing = entry.entry(k.clone()).or_insert(Value::Null);
742 *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
743 } else {
744 entry.insert(k.clone(), v.clone());
745 }
746 }
747 }
748 }
749 }
750 }
751
752 pub async fn load_properties_columnar(
753 &self,
754 vids: &UInt64Array,
755 properties: &[&str],
756 ctx: Option<&QueryContext>,
757 ) -> Result<RecordBatch> {
758 let mut vid_vec = Vec::with_capacity(vids.len());
776 for i in 0..vids.len() {
777 vid_vec.push(Vid::from(vids.value(i)));
778 }
779
780 let _props_map = self
781 .get_batch_vertex_props(&vid_vec, properties, ctx)
782 .await?;
783
784 Err(anyhow!(
818 "Columnar property load not fully implemented yet - use batch load"
819 ))
820 }
821
822 pub async fn get_batch_labels(
824 &self,
825 vids: &[Vid],
826 ctx: Option<&QueryContext>,
827 ) -> Result<HashMap<Vid, Vec<String>>> {
828 let mut result = HashMap::new();
829 if vids.is_empty() {
830 return Ok(result);
831 }
832
833 if let Some(ctx) = ctx {
835 let mut collect_labels = |l0: &L0Buffer| {
836 for &vid in vids {
837 if let Some(labels) = l0.get_vertex_labels(vid) {
838 result
839 .entry(vid)
840 .or_default()
841 .extend(labels.iter().cloned());
842 }
843 }
844 };
845
846 for l0_arc in &ctx.pending_flush_l0s {
847 collect_labels(&l0_arc.read());
848 }
849 collect_labels(&ctx.l0.read());
850 if let Some(tx_l0_arc) = &ctx.transaction_l0 {
851 collect_labels(&tx_l0_arc.read());
852 }
853 }
854
855 let mut vids_needing_lancedb = Vec::new();
857
858 fn merge_labels(existing: &mut Vec<String>, new_labels: Vec<String>) {
860 for l in new_labels {
861 if !existing.contains(&l) {
862 existing.push(l);
863 }
864 }
865 }
866
867 for &vid in vids {
868 if result.contains_key(&vid) {
869 continue; }
871
872 if let Some(labels) = self.storage.get_labels_from_index(vid) {
873 merge_labels(result.entry(vid).or_default(), labels);
874 } else {
875 vids_needing_lancedb.push(vid);
876 }
877 }
878
879 if !vids_needing_lancedb.is_empty() {
881 let backend = self.storage.backend();
882 let version = self.storage.version_high_water_mark();
883 let storage_labels = MainVertexDataset::find_batch_labels_by_vids(
884 backend,
885 &vids_needing_lancedb,
886 version,
887 )
888 .await?;
889
890 for (vid, labels) in storage_labels {
891 merge_labels(result.entry(vid).or_default(), labels);
892 }
893 }
894
895 for labels in result.values_mut() {
897 labels.sort();
898 labels.dedup();
899 }
900
901 Ok(result)
902 }
903
904 pub async fn get_all_vertex_props(&self, vid: Vid) -> Result<Properties> {
905 Ok(self
906 .get_all_vertex_props_with_ctx(vid, None)
907 .await?
908 .unwrap_or_default())
909 }
910
911 pub async fn get_all_vertex_props_with_ctx(
912 &self,
913 vid: Vid,
914 ctx: Option<&QueryContext>,
915 ) -> Result<Option<Properties>> {
916 if l0_visibility::is_vertex_deleted(vid, ctx) {
918 return Ok(None);
919 }
920
921 let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
923
924 let storage_props_opt = self.fetch_all_props_from_storage(vid).await?;
926
927 if l0_props.is_none() && storage_props_opt.is_none() {
929 return Ok(None);
930 }
931
932 let mut final_props = l0_props.unwrap_or_default();
933
934 if let Some(storage_props) = storage_props_opt {
936 for (k, v) in storage_props {
937 final_props.entry(k).or_insert(v);
938 }
939 }
940
941 if let Some(ctx) = ctx {
944 let labels = l0_visibility::get_vertex_labels(vid, ctx);
946 for label in &labels {
947 self.normalize_crdt_properties(&mut final_props, label)?;
948 }
949 }
950
951 Ok(Some(final_props))
952 }
953
954 pub async fn get_batch_vertex_props_for_label(
960 &self,
961 vids: &[Vid],
962 label: &str,
963 ctx: Option<&QueryContext>,
964 ) -> Result<HashMap<Vid, Properties>> {
965 let mut result: HashMap<Vid, Properties> = HashMap::new();
966 let mut need_storage: Vec<Vid> = Vec::new();
967
968 for &vid in vids {
970 if l0_visibility::is_vertex_deleted(vid, ctx) {
971 continue;
972 }
973 let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
974 if let Some(props) = l0_props {
975 result.insert(vid, props);
976 } else {
977 need_storage.push(vid);
978 }
979 }
980
981 if need_storage.is_empty() {
983 if ctx.is_some() {
985 for props in result.values_mut() {
986 self.normalize_crdt_properties(props, label)?;
987 }
988 }
989 return Ok(result);
990 }
991
992 let schema = self.schema_manager.schema();
994 let label_props = schema.properties.get(label);
995
996 let mut prop_names: Vec<String> = Vec::new();
997 if let Some(props) = label_props {
998 prop_names = props.keys().cloned().collect();
999 }
1000
1001 let mut columns: Vec<String> = vec![
1002 "_vid".to_string(),
1003 "_deleted".to_string(),
1004 "_version".to_string(),
1005 ];
1006 columns.extend(prop_names.iter().cloned());
1007 columns.push("overflow_json".to_string());
1008
1009 let vid_list: String = need_storage
1011 .iter()
1012 .map(|v| v.as_u64().to_string())
1013 .collect::<Vec<_>>()
1014 .join(", ");
1015 let base_filter = format!("_vid IN ({})", vid_list);
1016
1017 let filter_expr = self.storage.apply_version_filter(base_filter);
1018
1019 let table_name = crate::backend::table_names::vertex_table_name(label);
1020 let batches: Vec<RecordBatch> = self
1021 .storage
1022 .backend()
1023 .scan(
1024 crate::backend::types::ScanRequest::all(&table_name)
1025 .with_filter(&filter_expr)
1026 .with_columns(columns.clone()),
1027 )
1028 .await?;
1029
1030 let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
1031
1032 let mut per_vid_best_version: HashMap<Vid, u64> = HashMap::new();
1034 let mut per_vid_props: HashMap<Vid, Properties> = HashMap::new();
1035
1036 for batch in batches {
1037 let vid_col = match batch
1038 .column_by_name("_vid")
1039 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1040 {
1041 Some(c) => c,
1042 None => continue,
1043 };
1044 let deleted_col = match batch
1045 .column_by_name("_deleted")
1046 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1047 {
1048 Some(c) => c,
1049 None => continue,
1050 };
1051 let version_col = match batch
1052 .column_by_name("_version")
1053 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1054 {
1055 Some(c) => c,
1056 None => continue,
1057 };
1058
1059 for row in 0..batch.num_rows() {
1060 let vid = Vid::from(vid_col.value(row));
1061 let version = version_col.value(row);
1062
1063 if deleted_col.value(row) {
1064 if per_vid_best_version
1065 .get(&vid)
1066 .is_none_or(|&best| version >= best)
1067 {
1068 per_vid_best_version.insert(vid, version);
1069 per_vid_props.remove(&vid);
1070 }
1071 continue;
1072 }
1073
1074 let mut current_props =
1075 Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1076
1077 if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1078 for (k, v) in overflow_props {
1079 current_props.entry(k).or_insert(v);
1080 }
1081 }
1082
1083 let best = per_vid_best_version.get(&vid).copied();
1084 let mut best_opt = best;
1085 let mut merged = per_vid_props.remove(&vid);
1086 self.merge_versioned_props(
1087 current_props,
1088 version,
1089 &mut best_opt,
1090 &mut merged,
1091 label_props,
1092 )?;
1093 if let Some(v) = best_opt {
1094 per_vid_best_version.insert(vid, v);
1095 }
1096 if let Some(p) = merged {
1097 per_vid_props.insert(vid, p);
1098 }
1099 }
1100 }
1101
1102 for (vid, storage_props) in per_vid_props {
1104 let entry = result.entry(vid).or_default();
1105 for (k, v) in storage_props {
1106 entry.entry(k).or_insert(v);
1107 }
1108 }
1109
1110 if ctx.is_some() {
1115 for props in result.values_mut() {
1116 self.normalize_crdt_properties(props, label)?;
1117 }
1118 }
1119
1120 Ok(result)
1121 }
1122
1123 fn normalize_crdt_properties(&self, props: &mut Properties, label: &str) -> Result<()> {
1127 let schema = self.schema_manager.schema();
1128 let label_props = match schema.properties.get(label) {
1129 Some(p) => p,
1130 None => return Ok(()),
1131 };
1132
1133 for (prop_name, prop_meta) in label_props {
1134 if let DataType::Crdt(_) = prop_meta.r#type
1135 && let Some(val) = props.get_mut(prop_name)
1136 {
1137 *val = Value::from(Self::parse_crdt_value(val)?);
1138 }
1139 }
1140
1141 Ok(())
1142 }
1143
1144 fn extract_row_properties(
1146 batch: &RecordBatch,
1147 row: usize,
1148 prop_names: &[&str],
1149 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1150 ) -> Result<Properties> {
1151 let mut props = Properties::new();
1152 for name in prop_names {
1153 let col = match batch.column_by_name(name) {
1154 Some(col) => col,
1155 None => continue,
1156 };
1157 if col.is_null(row) {
1158 continue;
1159 }
1160 if let Some(prop_meta) = label_props.and_then(|p| p.get(*name)) {
1161 let val = Self::value_from_column(col.as_ref(), &prop_meta.r#type, row)?;
1162 props.insert((*name).to_string(), val);
1163 }
1164 }
1165 Ok(props)
1166 }
1167
1168 fn extract_overflow_properties(batch: &RecordBatch, row: usize) -> Result<Option<Properties>> {
1173 use arrow_array::LargeBinaryArray;
1174
1175 let overflow_col = match batch.column_by_name("overflow_json") {
1176 Some(col) => col,
1177 None => return Ok(None), };
1179
1180 if overflow_col.is_null(row) {
1181 return Ok(None);
1182 }
1183
1184 let binary_array = overflow_col
1185 .as_any()
1186 .downcast_ref::<LargeBinaryArray>()
1187 .ok_or_else(|| anyhow!("overflow_json is not LargeBinaryArray"))?;
1188
1189 let jsonb_bytes = binary_array.value(row);
1190
1191 let uni_val = uni_common::cypher_value_codec::decode(jsonb_bytes)
1193 .map_err(|e| anyhow!("Failed to decode CypherValue: {}", e))?;
1194 let json_val: serde_json::Value = uni_val.into();
1195
1196 let overflow_props: Properties = serde_json::from_value(json_val)
1198 .map_err(|e| anyhow!("Failed to parse overflow properties: {}", e))?;
1199
1200 Ok(Some(overflow_props))
1201 }
1202
1203 fn merge_overflow_into_props(
1210 batch: &RecordBatch,
1211 row: usize,
1212 properties: &[&str],
1213 props: &mut Properties,
1214 ) -> Result<()> {
1215 use arrow_array::LargeBinaryArray;
1216
1217 let overflow_col = match batch.column_by_name("overflow_json") {
1218 Some(col) if !col.is_null(row) => col,
1219 _ => return Ok(()),
1220 };
1221
1222 if properties.contains(&"overflow_json")
1224 && let Some(binary_array) = overflow_col.as_any().downcast_ref::<LargeBinaryArray>()
1225 {
1226 let jsonb_bytes = binary_array.value(row);
1227 let bytes_list: Vec<Value> =
1228 jsonb_bytes.iter().map(|&b| Value::Int(b as i64)).collect();
1229 props.insert("overflow_json".to_string(), Value::List(bytes_list));
1230 }
1231
1232 if let Some(overflow_props) = Self::extract_overflow_properties(batch, row)? {
1234 for (k, v) in overflow_props {
1235 if properties.contains(&k.as_str()) {
1236 props.entry(k).or_insert(v);
1237 }
1238 }
1239 }
1240
1241 Ok(())
1242 }
1243
1244 fn merge_crdt_into(
1246 &self,
1247 target: &mut Properties,
1248 source: Properties,
1249 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1250 crdt_only: bool,
1251 ) -> Result<()> {
1252 for (k, v) in source {
1253 if let Some(prop_meta) = label_props.and_then(|p| p.get(&k)) {
1254 if let DataType::Crdt(_) = prop_meta.r#type {
1255 let existing_v = target.entry(k).or_insert(Value::Null);
1256 *existing_v = self.merge_crdt_values(existing_v, &v)?;
1257 } else if !crdt_only {
1258 target.insert(k, v);
1259 }
1260 }
1261 }
1262 Ok(())
1263 }
1264
1265 fn merge_versioned_props(
1267 &self,
1268 current_props: Properties,
1269 version: u64,
1270 best_version: &mut Option<u64>,
1271 best_props: &mut Option<Properties>,
1272 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1273 ) -> Result<()> {
1274 if best_version.is_none_or(|best| version > best) {
1275 if let Some(mut existing_props) = best_props.take() {
1277 let mut merged = current_props;
1279 for (k, v) in merged.iter_mut() {
1280 if let Some(prop_meta) = label_props.and_then(|p| p.get(k))
1281 && let DataType::Crdt(_) = prop_meta.r#type
1282 && let Some(existing_val) = existing_props.remove(k)
1283 {
1284 *v = self.merge_crdt_values(v, &existing_val)?;
1285 }
1286 }
1287 *best_props = Some(merged);
1288 } else {
1289 *best_props = Some(current_props);
1290 }
1291 *best_version = Some(version);
1292 } else if Some(version) == *best_version {
1293 if let Some(existing_props) = best_props.as_mut() {
1295 self.merge_crdt_into(existing_props, current_props, label_props, false)?;
1296 } else {
1297 *best_props = Some(current_props);
1298 }
1299 } else {
1300 if let Some(existing_props) = best_props.as_mut() {
1302 self.merge_crdt_into(existing_props, current_props, label_props, true)?;
1303 }
1304 }
1305 Ok(())
1306 }
1307
1308 async fn fetch_all_props_from_storage(&self, vid: Vid) -> Result<Option<Properties>> {
1309 let schema = self.schema_manager.schema();
1312 let mut merged_props: Option<Properties> = None;
1313 let mut global_best_version: Option<u64> = None;
1314
1315 let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1317 {
1318 labels
1319 } else {
1320 schema.labels.keys().cloned().collect() };
1322
1323 for label_name in &label_names {
1324 let label_props = schema.properties.get(label_name);
1325
1326 let mut prop_names: Vec<String> = Vec::new();
1328 if let Some(props) = label_props {
1329 prop_names = props.keys().cloned().collect();
1330 }
1331
1332 let mut columns: Vec<String> = vec!["_deleted".to_string(), "_version".to_string()];
1334 columns.extend(prop_names.iter().cloned());
1335 columns.push("overflow_json".to_string());
1337
1338 let base_filter = format!("_vid = {}", vid.as_u64());
1340
1341 let filter_expr = self.storage.apply_version_filter(base_filter);
1342
1343 let table_name = crate::backend::table_names::vertex_table_name(label_name);
1344 let batches: Vec<RecordBatch> = match self
1345 .storage
1346 .backend()
1347 .scan(
1348 crate::backend::types::ScanRequest::all(&table_name)
1349 .with_filter(&filter_expr)
1350 .with_columns(columns.clone()),
1351 )
1352 .await
1353 {
1354 Ok(b) => b,
1355 Err(_) => continue,
1356 };
1357
1358 let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
1360
1361 for batch in batches {
1362 let deleted_col = match batch
1363 .column_by_name("_deleted")
1364 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1365 {
1366 Some(c) => c,
1367 None => continue,
1368 };
1369 let version_col = match batch
1370 .column_by_name("_version")
1371 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1372 {
1373 Some(c) => c,
1374 None => continue,
1375 };
1376
1377 for row in 0..batch.num_rows() {
1378 let version = version_col.value(row);
1379
1380 if deleted_col.value(row) {
1381 if global_best_version.is_none_or(|best| version >= best) {
1382 global_best_version = Some(version);
1383 merged_props = None;
1384 }
1385 continue;
1386 }
1387
1388 let mut current_props =
1389 Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1390
1391 if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1393 for (k, v) in overflow_props {
1395 current_props.entry(k).or_insert(v);
1396 }
1397 }
1398
1399 self.merge_versioned_props(
1400 current_props,
1401 version,
1402 &mut global_best_version,
1403 &mut merged_props,
1404 label_props,
1405 )?;
1406 }
1407 }
1408 }
1409
1410 if merged_props.is_none()
1412 && let Some(main_props) = MainVertexDataset::find_props_by_vid(
1413 self.storage.backend(),
1414 vid,
1415 self.storage.version_high_water_mark(),
1416 )
1417 .await?
1418 {
1419 return Ok(Some(main_props));
1420 }
1421
1422 Ok(merged_props)
1423 }
1424
1425 pub async fn get_vertex_prop(&self, vid: Vid, prop: &str) -> Result<Value> {
1426 self.get_vertex_prop_with_ctx(vid, prop, None).await
1427 }
1428
1429 #[instrument(skip(self, ctx), level = "trace")]
1430 pub async fn get_vertex_prop_with_ctx(
1431 &self,
1432 vid: Vid,
1433 prop: &str,
1434 ctx: Option<&QueryContext>,
1435 ) -> Result<Value> {
1436 if l0_visibility::is_vertex_deleted(vid, ctx) {
1438 return Ok(Value::Null);
1439 }
1440
1441 let schema = self.schema_manager.schema();
1444 let labels = ctx
1445 .map(|c| l0_visibility::get_vertex_labels(vid, c))
1446 .unwrap_or_default();
1447
1448 let is_crdt = if !labels.is_empty() {
1449 labels.iter().any(|ln| {
1451 schema
1452 .properties
1453 .get(ln)
1454 .and_then(|lp| lp.get(prop))
1455 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1456 .unwrap_or(false)
1457 })
1458 } else {
1459 schema.properties.values().any(|label_props| {
1461 label_props
1462 .get(prop)
1463 .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1464 .unwrap_or(false)
1465 })
1466 };
1467
1468 if is_crdt {
1470 let l0_val = self.accumulate_crdt_from_l0(vid, prop, ctx)?;
1472 return self.finalize_crdt_lookup(vid, prop, l0_val).await;
1473 }
1474
1475 if let Some(val) = l0_visibility::lookup_vertex_prop(vid, prop, ctx) {
1477 return Ok(val);
1478 }
1479
1480 if let Some(ref cache) = self.vertex_cache {
1482 let mut cache = cache.lock().await;
1483 if let Some(val) = cache.get(&(vid, prop.to_string())) {
1484 debug!(vid = ?vid, prop, "Cache HIT");
1485 metrics::counter!("uni_property_cache_hits_total", "type" => "vertex").increment(1);
1486 return Ok(val.clone());
1487 } else {
1488 debug!(vid = ?vid, prop, "Cache MISS");
1489 metrics::counter!("uni_property_cache_misses_total", "type" => "vertex")
1490 .increment(1);
1491 }
1492 }
1493
1494 let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1496
1497 if let Some(ref cache) = self.vertex_cache {
1499 let mut cache = cache.lock().await;
1500 cache.put((vid, prop.to_string()), storage_val.clone());
1501 }
1502
1503 Ok(storage_val)
1504 }
1505
1506 fn accumulate_crdt_from_l0(
1508 &self,
1509 vid: Vid,
1510 prop: &str,
1511 ctx: Option<&QueryContext>,
1512 ) -> Result<Value> {
1513 let mut merged = Value::Null;
1514 l0_visibility::visit_l0_buffers(ctx, |l0| {
1515 if let Some(props) = l0.vertex_properties.get(&vid)
1516 && let Some(val) = props.get(prop)
1517 {
1518 if let Ok(new_merged) = self.merge_crdt_values(&merged, val) {
1520 merged = new_merged;
1521 }
1522 }
1523 false });
1525 Ok(merged)
1526 }
1527
1528 async fn finalize_crdt_lookup(&self, vid: Vid, prop: &str, l0_val: Value) -> Result<Value> {
1530 let cached_val = if let Some(ref cache) = self.vertex_cache {
1532 let mut cache = cache.lock().await;
1533 cache.get(&(vid, prop.to_string())).cloned()
1534 } else {
1535 None
1536 };
1537
1538 if let Some(val) = cached_val {
1539 let merged = self.merge_crdt_values(&val, &l0_val)?;
1540 return Ok(merged);
1541 }
1542
1543 let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1545
1546 if let Some(ref cache) = self.vertex_cache {
1548 let mut cache = cache.lock().await;
1549 cache.put((vid, prop.to_string()), storage_val.clone());
1550 }
1551
1552 self.merge_crdt_values(&storage_val, &l0_val)
1554 }
1555
1556 async fn fetch_prop_from_storage(&self, vid: Vid, prop: &str) -> Result<Value> {
1557 let schema = self.schema_manager.schema();
1560 let mut best_version: Option<u64> = None;
1561 let mut best_value: Option<Value> = None;
1562
1563 let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1565 {
1566 labels
1567 } else {
1568 schema.labels.keys().cloned().collect() };
1570
1571 for label_name in &label_names {
1572 let prop_meta = schema
1574 .properties
1575 .get(label_name)
1576 .and_then(|props| props.get(prop));
1577
1578 let base_filter = format!("_vid = {}", vid.as_u64());
1582
1583 let filter_expr = self.storage.apply_version_filter(base_filter);
1584
1585 let mut columns = vec![
1587 "_deleted".to_string(),
1588 "_version".to_string(),
1589 "overflow_json".to_string(),
1590 ];
1591
1592 if prop_meta.is_some() {
1594 columns.push(prop.to_string());
1595 }
1596
1597 let table_name = crate::backend::table_names::vertex_table_name(label_name);
1598 let batches: Vec<RecordBatch> = match self
1599 .storage
1600 .backend()
1601 .scan(
1602 crate::backend::types::ScanRequest::all(&table_name)
1603 .with_filter(&filter_expr)
1604 .with_columns(columns),
1605 )
1606 .await
1607 {
1608 Ok(b) => b,
1609 Err(_) => continue,
1610 };
1611
1612 for batch in batches {
1613 let deleted_col = match batch
1614 .column_by_name("_deleted")
1615 .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1616 {
1617 Some(c) => c,
1618 None => continue,
1619 };
1620 let version_col = match batch
1621 .column_by_name("_version")
1622 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1623 {
1624 Some(c) => c,
1625 None => continue,
1626 };
1627 for row in 0..batch.num_rows() {
1628 let version = version_col.value(row);
1629
1630 if deleted_col.value(row) {
1631 if best_version.is_none_or(|best| version >= best) {
1632 best_version = Some(version);
1633 best_value = None;
1634 }
1635 continue;
1636 }
1637
1638 let mut val = None;
1640 if let Some(meta) = prop_meta
1641 && let Some(col) = batch.column_by_name(prop)
1642 {
1643 val = Some(if col.is_null(row) {
1644 Value::Null
1645 } else {
1646 Self::value_from_column(col, &meta.r#type, row)?
1647 });
1648 }
1649
1650 if val.is_none()
1652 && let Some(overflow_props) =
1653 Self::extract_overflow_properties(&batch, row)?
1654 && let Some(overflow_val) = overflow_props.get(prop)
1655 {
1656 val = Some(overflow_val.clone());
1657 }
1658
1659 if let Some(v) = val {
1661 if let Some(meta) = prop_meta {
1662 self.merge_prop_value(
1664 v,
1665 version,
1666 &meta.r#type,
1667 &mut best_version,
1668 &mut best_value,
1669 )?;
1670 } else {
1671 if best_version.is_none_or(|best| version >= best) {
1673 best_version = Some(version);
1674 best_value = Some(v);
1675 }
1676 }
1677 }
1678 }
1679 }
1680 }
1681 Ok(best_value.unwrap_or(Value::Null))
1682 }
1683
1684 pub fn value_from_column(col: &dyn Array, data_type: &DataType, row: usize) -> Result<Value> {
1686 crate::storage::value_codec::decode_column_value(
1687 col,
1688 data_type,
1689 row,
1690 CrdtDecodeMode::Strict,
1691 )
1692 }
1693
1694 pub(crate) fn merge_crdt_values(&self, a: &Value, b: &Value) -> Result<Value> {
1695 if a.is_null() {
1699 return Self::parse_crdt_value(b).map(Value::from);
1700 }
1701 if b.is_null() {
1702 return Self::parse_crdt_value(a).map(Value::from);
1703 }
1704
1705 let a_parsed = Self::parse_crdt_value(a)?;
1706 let b_parsed = Self::parse_crdt_value(b)?;
1707
1708 let mut crdt_a: Crdt = serde_json::from_value(a_parsed)?;
1709 let crdt_b: Crdt = serde_json::from_value(b_parsed)?;
1710 crdt_a
1711 .try_merge(&crdt_b)
1712 .map_err(|e| anyhow::anyhow!("{e}"))?;
1713 Ok(Value::from(serde_json::to_value(crdt_a)?))
1714 }
1715
1716 fn parse_crdt_value(val: &Value) -> Result<serde_json::Value> {
1719 if let Value::String(s) = val {
1720 serde_json::from_str(s).map_err(|e| anyhow!("Failed to parse CRDT JSON string: {}", e))
1722 } else {
1723 Ok(serde_json::Value::from(val.clone()))
1725 }
1726 }
1727
1728 fn merge_prop_value(
1730 &self,
1731 val: Value,
1732 version: u64,
1733 data_type: &DataType,
1734 best_version: &mut Option<u64>,
1735 best_value: &mut Option<Value>,
1736 ) -> Result<()> {
1737 if let DataType::Crdt(_) = data_type {
1738 self.merge_crdt_prop_value(val, version, best_version, best_value)
1739 } else {
1740 if best_version.is_none_or(|best| version >= best) {
1742 *best_version = Some(version);
1743 *best_value = Some(val);
1744 }
1745 Ok(())
1746 }
1747 }
1748
1749 fn merge_crdt_prop_value(
1751 &self,
1752 val: Value,
1753 version: u64,
1754 best_version: &mut Option<u64>,
1755 best_value: &mut Option<Value>,
1756 ) -> Result<()> {
1757 if best_version.is_none_or(|best| version > best) {
1758 if let Some(existing) = best_value.take() {
1760 *best_value = Some(self.merge_crdt_values(&val, &existing)?);
1761 } else {
1762 *best_value = Some(val);
1763 }
1764 *best_version = Some(version);
1765 } else if Some(version) == *best_version {
1766 let existing = best_value.get_or_insert(Value::Null);
1768 *existing = self.merge_crdt_values(existing, &val)?;
1769 } else {
1770 if let Some(existing) = best_value.as_mut() {
1772 *existing = self.merge_crdt_values(existing, &val)?;
1773 }
1774 }
1775 Ok(())
1776 }
1777}