Skip to main content

uni_store/runtime/
property_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::l0::L0Buffer;
6use crate::runtime::l0_visibility;
7use crate::storage::main_vertex::MainVertexDataset;
8use crate::storage::manager::StorageManager;
9use crate::storage::value_codec::CrdtDecodeMode;
10use anyhow::{Result, anyhow};
11use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
12use lru::LruCache;
13use metrics;
14use std::collections::HashMap;
15use std::num::NonZeroUsize;
16use std::sync::Arc;
17use tokio::sync::Mutex;
18use tracing::{debug, instrument, warn};
19use uni_common::Properties;
20use uni_common::Value;
21use uni_common::core::id::{Eid, Vid};
22use uni_common::core::schema::{DataType, SchemaManager};
23use uni_crdt::Crdt;
24
25pub struct PropertyManager {
26    storage: Arc<StorageManager>,
27    schema_manager: Arc<SchemaManager>,
28    /// Plugin registry consulted by CRDT merges via
29    /// [`uni_crdt::Crdt::merge_via_registry`]. The legacy 3-arg
30    /// [`Self::new`] passes an empty registry so callers that don't
31    /// wire plugin dispatch keep getting bit-identical native
32    /// behavior (empty registry → `merge_via_registry`'s native
33    /// fallback). Production paths in `UniInner` use
34    /// [`Self::with_plugin_registry`] to share the host's registry.
35    plugin_registry: Arc<uni_plugin::PluginRegistry>,
36    /// Cache is None when capacity=0 (caching disabled)
37    vertex_cache: Option<Mutex<LruCache<(Vid, String), Value>>>,
38    edge_cache: Option<Mutex<LruCache<(uni_common::core::id::Eid, String), Value>>>,
39    cache_capacity: usize,
40}
41
42impl PropertyManager {
43    /// Construct a `PropertyManager` with an empty plugin registry.
44    ///
45    /// Back-compat shim for the ~17 algorithm and test call sites that
46    /// don't need registry-dispatched CRDT merges. Equivalent to
47    /// [`Self::with_plugin_registry`] with `Arc::new(PluginRegistry::new())`.
48    pub fn new(
49        storage: Arc<StorageManager>,
50        schema_manager: Arc<SchemaManager>,
51        capacity: usize,
52    ) -> Self {
53        Self::with_plugin_registry(
54            storage,
55            schema_manager,
56            capacity,
57            Arc::new(uni_plugin::PluginRegistry::new()),
58        )
59    }
60
61    /// Construct a `PropertyManager` wired to a shared `PluginRegistry`.
62    ///
63    /// CRDT merges in this `PropertyManager` consult `plugin_registry`
64    /// for `CrdtKindProvider`s matching each `Crdt::kind()`; matched
65    /// kinds dispatch through the provider (so hot-reloaded plugins
66    /// take effect immediately), unmatched kinds fall back to the
67    /// native `Crdt::try_merge`.
68    pub fn with_plugin_registry(
69        storage: Arc<StorageManager>,
70        schema_manager: Arc<SchemaManager>,
71        capacity: usize,
72        plugin_registry: Arc<uni_plugin::PluginRegistry>,
73    ) -> Self {
74        // Capacity of 0 disables caching
75        let (vertex_cache, edge_cache) = if capacity == 0 {
76            (None, None)
77        } else {
78            let cap = NonZeroUsize::new(capacity).unwrap();
79            (
80                Some(Mutex::new(LruCache::new(cap))),
81                Some(Mutex::new(LruCache::new(cap))),
82            )
83        };
84
85        Self {
86            storage,
87            schema_manager,
88            plugin_registry,
89            vertex_cache,
90            edge_cache,
91            cache_capacity: capacity,
92        }
93    }
94
95    pub fn cache_size(&self) -> usize {
96        self.cache_capacity
97    }
98
99    /// Check if caching is enabled
100    pub fn caching_enabled(&self) -> bool {
101        self.cache_capacity > 0
102    }
103
104    /// Clear all caches.
105    /// Call this when L0 is rotated, flushed, or compaction occurs to prevent stale reads.
106    pub async fn clear_cache(&self) {
107        if let Some(ref cache) = self.vertex_cache {
108            cache.lock().await.clear();
109        }
110        if let Some(ref cache) = self.edge_cache {
111            cache.lock().await.clear();
112        }
113    }
114
115    /// Invalidate a specific vertex's cached properties.
116    pub async fn invalidate_vertex(&self, _vid: Vid) {
117        if let Some(ref cache) = self.vertex_cache {
118            let mut cache = cache.lock().await;
119            // LruCache doesn't have a way to iterate and remove, so we pop entries
120            // that match the vid. This is O(n) but necessary for targeted invalidation.
121            // For simplicity, clear the entire cache - LRU will repopulate as needed.
122            cache.clear();
123        }
124    }
125
126    /// Invalidate a specific edge's cached properties.
127    pub async fn invalidate_edge(&self, _eid: uni_common::core::id::Eid) {
128        if let Some(ref cache) = self.edge_cache {
129            let mut cache = cache.lock().await;
130            // Same approach as invalidate_vertex
131            cache.clear();
132        }
133    }
134
135    #[instrument(skip(self, ctx), level = "trace")]
136    pub async fn get_edge_prop(
137        &self,
138        eid: uni_common::core::id::Eid,
139        prop: &str,
140        ctx: Option<&QueryContext>,
141    ) -> Result<Value> {
142        // 1. Check if deleted in any L0 layer
143        if l0_visibility::is_edge_deleted(eid, ctx) {
144            return Ok(Value::Null);
145        }
146
147        // 2. Check L0 chain for property (transaction -> main -> pending)
148        if let Some(val) = l0_visibility::lookup_edge_prop(eid, prop, ctx) {
149            return Ok(val);
150        }
151
152        // 3. Check Cache (if enabled)
153        if let Some(ref cache) = self.edge_cache {
154            let mut cache = cache.lock().await;
155            if let Some(val) = cache.get(&(eid, prop.to_string())) {
156                debug!(eid = ?eid, prop, "Cache HIT");
157                metrics::counter!("uni_property_cache_hits_total", "type" => "edge").increment(1);
158                return Ok(val.clone());
159            } else {
160                debug!(eid = ?eid, prop, "Cache MISS");
161                metrics::counter!("uni_property_cache_misses_total", "type" => "edge").increment(1);
162            }
163        }
164
165        // 4. Fetch from Storage
166        let all = self.get_all_edge_props_with_ctx(eid, ctx).await?;
167        let val = all
168            .as_ref()
169            .and_then(|props| props.get(prop).cloned())
170            .unwrap_or(Value::Null);
171
172        // 5. Update Cache (if enabled) - Cache ALL fetched properties, not just requested one
173        if let Some(ref cache) = self.edge_cache {
174            let mut cache = cache.lock().await;
175            if let Some(ref props) = all {
176                for (prop_name, prop_val) in props {
177                    cache.put((eid, prop_name.clone()), prop_val.clone());
178                }
179            } else {
180                // No properties found, cache the null result for this property
181                cache.put((eid, prop.to_string()), Value::Null);
182            }
183        }
184
185        Ok(val)
186    }
187
188    pub async fn get_all_edge_props_with_ctx(
189        &self,
190        eid: uni_common::core::id::Eid,
191        ctx: Option<&QueryContext>,
192    ) -> Result<Option<Properties>> {
193        // 1. Check if deleted in any L0 layer
194        if l0_visibility::is_edge_deleted(eid, ctx) {
195            return Ok(None);
196        }
197
198        // 2. Accumulate properties from L0 layers (oldest to newest)
199        let mut final_props = l0_visibility::accumulate_edge_props(eid, ctx).unwrap_or_default();
200
201        // 3. Fetch from storage runs
202        let storage_props = self.fetch_all_edge_props_from_storage(eid).await?;
203
204        // 4. Handle case where edge exists but has no properties
205        if final_props.is_empty() && storage_props.is_none() {
206            if l0_visibility::edge_exists_in_l0(eid, ctx) {
207                return Ok(Some(Properties::new()));
208            }
209            return Ok(None);
210        }
211
212        // 5. Merge storage properties (L0 takes precedence)
213        if let Some(sp) = storage_props {
214            for (k, v) in sp {
215                final_props.entry(k).or_insert(v);
216            }
217        }
218
219        Ok(Some(final_props))
220    }
221
222    async fn fetch_all_edge_props_from_storage(&self, eid: Eid) -> Result<Option<Properties>> {
223        // In the new design, we scan all edge types since EID doesn't embed type info
224        self.fetch_all_edge_props_from_storage_with_hint(eid, None)
225            .await
226    }
227
228    async fn fetch_all_edge_props_from_storage_with_hint(
229        &self,
230        eid: Eid,
231        type_name_hint: Option<&str>,
232    ) -> Result<Option<Properties>> {
233        let schema = self.schema_manager.schema();
234        let backend = self.storage.backend();
235
236        // If hint provided, use it directly
237        let type_names: Vec<&str> = if let Some(hint) = type_name_hint {
238            vec![hint]
239        } else {
240            // Scan all edge types
241            schema.edge_types.keys().map(|s| s.as_str()).collect()
242        };
243
244        for type_name in type_names {
245            let type_props = schema.properties.get(type_name);
246
247            // For now, edges are primarily in Delta runs before compaction to L2 CSR.
248            // We check FWD delta runs.
249            if self.storage.delta_dataset(type_name, "fwd").is_err() {
250                continue; // Edge type doesn't exist, try next
251            }
252
253            // Use backend for edge property lookup
254            use crate::backend::table_names;
255            use crate::backend::types::ScanRequest;
256
257            let table_name = table_names::delta_table_name(type_name, "fwd");
258            if !backend.table_exists(&table_name).await.unwrap_or(false) {
259                continue; // No data for this type, try next
260            }
261
262            let base_filter = format!("eid = {}", eid.as_u64());
263            let filter_expr = self.storage.apply_version_filter(base_filter);
264
265            let batches = match backend
266                .scan(ScanRequest::all(&table_name).with_filter(filter_expr))
267                .await
268            {
269                Ok(b) => b,
270                Err(_) => continue,
271            };
272
273            // Collect all rows for this edge, sorted by version
274            let mut rows: Vec<(u64, u8, Properties)> = Vec::new();
275
276            for batch in batches {
277                let op_col = match batch.column_by_name("op") {
278                    Some(c) => c
279                        .as_any()
280                        .downcast_ref::<arrow_array::UInt8Array>()
281                        .unwrap(),
282                    None => continue,
283                };
284                let ver_col = match batch.column_by_name("_version") {
285                    Some(c) => c.as_any().downcast_ref::<UInt64Array>().unwrap(),
286                    None => continue,
287                };
288
289                for row in 0..batch.num_rows() {
290                    let ver = ver_col.value(row);
291                    let op = op_col.value(row);
292                    let mut props = Properties::new();
293
294                    if op != 1 {
295                        // Not a delete - extract properties
296                        if let Some(tp) = type_props {
297                            for (p_name, p_meta) in tp {
298                                if let Some(col) = batch.column_by_name(p_name)
299                                    && !col.is_null(row)
300                                {
301                                    let val =
302                                        Self::value_from_column(col.as_ref(), &p_meta.r#type, row)?;
303                                    props.insert(p_name.clone(), val);
304                                }
305                            }
306                        }
307                    }
308                    rows.push((ver, op, props));
309                }
310            }
311
312            if rows.is_empty() {
313                continue;
314            }
315
316            // Sort by version (ascending) so we merge in order
317            rows.sort_by_key(|(ver, _, _)| *ver);
318
319            // Merge properties across all versions
320            // For CRDT properties: merge values
321            // For non-CRDT properties: later versions overwrite earlier ones
322            let mut merged_props: Properties = Properties::new();
323            let mut is_deleted = false;
324
325            for (_, op, props) in rows {
326                if op == 1 {
327                    // Delete operation - mark as deleted
328                    is_deleted = true;
329                    merged_props.clear();
330                } else {
331                    is_deleted = false;
332                    for (p_name, p_val) in props {
333                        // Check if this is a CRDT property
334                        let is_crdt = type_props
335                            .and_then(|tp| tp.get(&p_name))
336                            .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
337                            .unwrap_or(false);
338
339                        if is_crdt {
340                            // Merge CRDT values
341                            if let Some(existing) = merged_props.get(&p_name) {
342                                if let Ok(merged) = self.merge_crdt_values(existing, &p_val) {
343                                    merged_props.insert(p_name, merged);
344                                }
345                            } else {
346                                merged_props.insert(p_name, p_val);
347                            }
348                        } else {
349                            // Non-CRDT: later version overwrites
350                            merged_props.insert(p_name, p_val);
351                        }
352                    }
353                }
354            }
355
356            if is_deleted {
357                return Ok(None);
358            }
359
360            if !merged_props.is_empty() {
361                return Ok(Some(merged_props));
362            }
363        }
364
365        // Fallback to main edges table props_json for unknown/schemaless types
366        use crate::storage::main_edge::MainEdgeDataset;
367        if let Some(props) = MainEdgeDataset::find_props_by_eid(self.storage.backend(), eid).await?
368        {
369            return Ok(Some(props));
370        }
371
372        Ok(None)
373    }
374
375    /// Batch load properties for multiple vertices
376    pub async fn get_batch_vertex_props(
377        &self,
378        vids: &[Vid],
379        properties: &[&str],
380        ctx: Option<&QueryContext>,
381    ) -> Result<HashMap<Vid, Properties>> {
382        let schema = self.schema_manager.schema();
383        let mut result = HashMap::new();
384        if vids.is_empty() {
385            return Ok(result);
386        }
387
388        // In the new storage model, VIDs are pure auto-increment and don't embed label info.
389        // We need to scan all label datasets to find the vertices.
390
391        // Try VidLabelsIndex for O(1) label resolution
392        let labels_to_scan: Vec<String> = {
393            let mut needed: std::collections::HashSet<String> = std::collections::HashSet::new();
394            let mut all_resolved = true;
395            for &vid in vids {
396                if let Some(labels) = self.storage.get_labels_from_index(vid) {
397                    needed.extend(labels);
398                } else {
399                    all_resolved = false;
400                    break;
401                }
402            }
403            if all_resolved {
404                needed.into_iter().collect()
405            } else {
406                schema.labels.keys().cloned().collect() // Fallback to full scan
407            }
408        };
409
410        // 2. Fetch from storage - scan relevant label datasets
411        for label_name in &labels_to_scan {
412            // Filter to properties that exist in this label's schema
413            let label_schema_props = schema.properties.get(label_name);
414            let valid_props: Vec<&str> = properties
415                .iter()
416                .cloned()
417                .filter(|p| label_schema_props.is_some_and(|props| props.contains_key(*p)))
418                .collect();
419            // Note: don't skip when valid_props is empty; overflow_json may have the properties
420
421            let ds = self.storage.vertex_dataset(label_name)?;
422            let backend = self.storage.backend();
423            let vtable_name = ds.table_name();
424
425            if !backend.table_exists(&vtable_name).await.unwrap_or(false) {
426                continue; // Table doesn't exist yet — skip this label
427            }
428
429            // Construct filter: _vid IN (...)
430            let vid_list = vids
431                .iter()
432                .map(|v| v.as_u64().to_string())
433                .collect::<Vec<_>>()
434                .join(",");
435            let base_filter = format!("_vid IN ({})", vid_list);
436
437            let final_filter = self.storage.apply_version_filter(base_filter);
438
439            // Build column list for projection
440            let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
441            columns.push("_vid".to_string());
442            columns.push("_version".to_string());
443            columns.push("_deleted".to_string());
444            columns.extend(valid_props.iter().map(|s| s.to_string()));
445            // Add overflow_json to fetch non-schema properties
446            columns.push("overflow_json".to_string());
447
448            use crate::backend::types::ScanRequest;
449            let request = ScanRequest::all(&vtable_name)
450                .with_filter(final_filter)
451                .with_columns(columns);
452
453            let batches: Vec<RecordBatch> = match backend.scan(request).await {
454                Ok(b) => b,
455                Err(e) => {
456                    warn!(
457                        label = %label_name,
458                        error = %e,
459                        "failed to scan label table, skipping"
460                    );
461                    continue;
462                }
463            };
464            for batch in batches {
465                let vid_col = match batch
466                    .column_by_name("_vid")
467                    .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
468                {
469                    Some(c) => c,
470                    None => continue,
471                };
472                let del_col = match batch
473                    .column_by_name("_deleted")
474                    .and_then(|col| col.as_any().downcast_ref::<BooleanArray>())
475                {
476                    Some(c) => c,
477                    None => continue,
478                };
479
480                for row in 0..batch.num_rows() {
481                    let vid = Vid::from(vid_col.value(row));
482
483                    if del_col.value(row) {
484                        result.remove(&vid);
485                        continue;
486                    }
487
488                    let label_props = schema.properties.get(label_name);
489                    let mut props =
490                        Self::extract_row_properties(&batch, row, &valid_props, label_props)?;
491                    Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
492                    result.insert(vid, props);
493                }
494            }
495        }
496
497        // 3. Overlay L0 buffers in age order: pending (oldest to newest) -> current -> transaction
498        if let Some(ctx) = ctx {
499            // First, overlay pending flush L0s in order (oldest first, so iterate forward)
500            for pending_l0_arc in &ctx.pending_flush_l0s {
501                let pending_l0 = pending_l0_arc.read();
502                self.overlay_l0_batch(vids, &pending_l0, properties, &mut result);
503            }
504
505            // Then overlay current L0 (newer than pending)
506            let l0 = ctx.l0.read();
507            self.overlay_l0_batch(vids, &l0, properties, &mut result);
508
509            // Finally overlay transaction L0 (newest)
510            // Skip transaction L0 if querying a snapshot
511            // (Transaction changes are at current version, not in snapshot)
512            if self.storage.version_high_water_mark().is_none()
513                && let Some(tx_l0_arc) = &ctx.transaction_l0
514            {
515                let tx_l0 = tx_l0_arc.read();
516                self.overlay_l0_batch(vids, &tx_l0, properties, &mut result);
517            }
518        }
519
520        Ok(result)
521    }
522
523    fn overlay_l0_batch(
524        &self,
525        vids: &[Vid],
526        l0: &L0Buffer,
527        properties: &[&str],
528        result: &mut HashMap<Vid, Properties>,
529    ) {
530        let schema = self.schema_manager.schema();
531        for &vid in vids {
532            // If deleted in L0, remove from result
533            if l0.vertex_tombstones.contains(&vid) {
534                result.remove(&vid);
535                continue;
536            }
537            // If in L0, check version before merging
538            if let Some(l0_props) = l0.vertex_properties.get(&vid) {
539                // Skip entries beyond snapshot boundary
540                let entry_version = l0.vertex_versions.get(&vid).copied().unwrap_or(0);
541                if self
542                    .storage
543                    .version_high_water_mark()
544                    .is_some_and(|hwm| entry_version > hwm)
545                {
546                    continue;
547                }
548
549                let entry = result.entry(vid).or_default();
550                // In new storage model, get labels from L0Buffer
551                let labels = l0.get_vertex_labels(vid);
552
553                for (k, v) in l0_props {
554                    if properties.contains(&k.as_str()) {
555                        // Check if property is CRDT by looking up in any of the vertex's labels
556                        let is_crdt = labels
557                            .and_then(|label_list| {
558                                label_list.iter().find_map(|ln| {
559                                    schema
560                                        .properties
561                                        .get(ln)
562                                        .and_then(|lp| lp.get(k))
563                                        .filter(|pm| matches!(pm.r#type, DataType::Crdt(_)))
564                                })
565                            })
566                            .is_some();
567
568                        if is_crdt {
569                            let existing = entry.entry(k.clone()).or_insert(Value::Null);
570                            *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
571                        } else {
572                            entry.insert(k.clone(), v.clone());
573                        }
574                    }
575                }
576            }
577        }
578    }
579
580    /// Load properties as Arrow columns for vectorized processing
581    /// Batch load properties for multiple edges
582    pub async fn get_batch_edge_props(
583        &self,
584        eids: &[uni_common::core::id::Eid],
585        properties: &[&str],
586        ctx: Option<&QueryContext>,
587    ) -> Result<HashMap<Vid, Properties>> {
588        let schema = self.schema_manager.schema();
589        let mut result = HashMap::new();
590        if eids.is_empty() {
591            return Ok(result);
592        }
593
594        // In the new storage model, EIDs are pure auto-increment and don't embed type info.
595        // We need to scan all edge type datasets to find the edges.
596
597        // Try to resolve edge types from L0 context for O(1) lookup
598        let types_to_scan: Vec<String> = {
599            if let Some(ctx) = ctx {
600                let mut needed: std::collections::HashSet<String> =
601                    std::collections::HashSet::new();
602                let mut all_resolved = true;
603                for &eid in eids {
604                    if let Some(etype) = ctx.l0.read().get_edge_type(eid) {
605                        needed.insert(etype.to_string());
606                    } else {
607                        all_resolved = false;
608                        break;
609                    }
610                }
611                if all_resolved {
612                    needed.into_iter().collect()
613                } else {
614                    schema.edge_types.keys().cloned().collect() // Fallback to full scan
615                }
616            } else {
617                schema.edge_types.keys().cloned().collect() // No context, full scan
618            }
619        };
620
621        // 2. Fetch from storage (Delta runs) - scan relevant edge types
622        for type_name in &types_to_scan {
623            let type_props = schema.properties.get(type_name);
624            let valid_props: Vec<&str> = properties
625                .iter()
626                .cloned()
627                .filter(|p| type_props.is_some_and(|props| props.contains_key(*p)))
628                .collect();
629            // Note: don't skip when valid_props is empty; overflow_json may have the properties
630
631            let delta_ds = match self.storage.delta_dataset(type_name, "fwd") {
632                Ok(ds) => ds,
633                Err(_) => continue,
634            };
635            let backend = self.storage.backend();
636            let dtable_name = delta_ds.table_name();
637
638            if !backend.table_exists(&dtable_name).await.unwrap_or(false) {
639                continue; // Table doesn't exist yet — skip this edge type
640            }
641
642            let eid_list = eids
643                .iter()
644                .map(|e| e.as_u64().to_string())
645                .collect::<Vec<_>>()
646                .join(",");
647            let base_filter = format!("eid IN ({})", eid_list);
648
649            let final_filter = self.storage.apply_version_filter(base_filter);
650
651            // Build column list for projection
652            let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
653            columns.push("eid".to_string());
654            columns.push("_version".to_string());
655            columns.push("op".to_string());
656            columns.extend(valid_props.iter().map(|s| s.to_string()));
657            // Add overflow_json to fetch non-schema properties
658            columns.push("overflow_json".to_string());
659
660            use crate::backend::types::ScanRequest;
661            let request = ScanRequest::all(&dtable_name)
662                .with_filter(final_filter)
663                .with_columns(columns);
664
665            let batches: Vec<RecordBatch> = match backend.scan(request).await {
666                Ok(b) => b,
667                Err(e) => {
668                    warn!(
669                        edge_type = %type_name,
670                        error = %e,
671                        "failed to scan edge delta table, skipping"
672                    );
673                    continue;
674                }
675            };
676            for batch in batches {
677                let eid_col = match batch
678                    .column_by_name("eid")
679                    .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
680                {
681                    Some(c) => c,
682                    None => continue,
683                };
684                let op_col = match batch
685                    .column_by_name("op")
686                    .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
687                {
688                    Some(c) => c,
689                    None => continue,
690                };
691
692                for row in 0..batch.num_rows() {
693                    let eid = uni_common::core::id::Eid::from(eid_col.value(row));
694
695                    // op=1 is Delete
696                    if op_col.value(row) == 1 {
697                        result.remove(&Vid::from(eid.as_u64()));
698                        continue;
699                    }
700
701                    let mut props =
702                        Self::extract_row_properties(&batch, row, &valid_props, type_props)?;
703                    Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
704                    // Reuse Vid as key for compatibility with materialized_property
705                    result.insert(Vid::from(eid.as_u64()), props);
706                }
707            }
708        }
709
710        // 3. Overlay L0 buffers in age order: pending (oldest to newest) -> current -> transaction
711        if let Some(ctx) = ctx {
712            // First, overlay pending flush L0s in order (oldest first, so iterate forward)
713            for pending_l0_arc in &ctx.pending_flush_l0s {
714                let pending_l0 = pending_l0_arc.read();
715                self.overlay_l0_edge_batch(eids, &pending_l0, properties, &mut result);
716            }
717
718            // Then overlay current L0 (newer than pending)
719            let l0 = ctx.l0.read();
720            self.overlay_l0_edge_batch(eids, &l0, properties, &mut result);
721
722            // Finally overlay transaction L0 (newest)
723            // Skip transaction L0 if querying a snapshot
724            // (Transaction changes are at current version, not in snapshot)
725            if self.storage.version_high_water_mark().is_none()
726                && let Some(tx_l0_arc) = &ctx.transaction_l0
727            {
728                let tx_l0 = tx_l0_arc.read();
729                self.overlay_l0_edge_batch(eids, &tx_l0, properties, &mut result);
730            }
731        }
732
733        Ok(result)
734    }
735
736    fn overlay_l0_edge_batch(
737        &self,
738        eids: &[uni_common::core::id::Eid],
739        l0: &L0Buffer,
740        properties: &[&str],
741        result: &mut HashMap<Vid, Properties>,
742    ) {
743        let schema = self.schema_manager.schema();
744        for &eid in eids {
745            let vid_key = Vid::from(eid.as_u64());
746            if l0.tombstones.contains_key(&eid) {
747                result.remove(&vid_key);
748                continue;
749            }
750            if let Some(l0_props) = l0.edge_properties.get(&eid) {
751                // Skip entries beyond snapshot boundary
752                let entry_version = l0.edge_versions.get(&eid).copied().unwrap_or(0);
753                if self
754                    .storage
755                    .version_high_water_mark()
756                    .is_some_and(|hwm| entry_version > hwm)
757                {
758                    continue;
759                }
760
761                let entry = result.entry(vid_key).or_default();
762                // In new storage model, get edge type from L0Buffer
763                let type_name = l0.get_edge_type(eid);
764
765                let include_all = properties.contains(&"_all_props");
766                for (k, v) in l0_props {
767                    if include_all || properties.contains(&k.as_str()) {
768                        // Check if property is CRDT
769                        let is_crdt = type_name
770                            .and_then(|tn| schema.properties.get(tn))
771                            .and_then(|tp| tp.get(k))
772                            .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
773                            .unwrap_or(false);
774
775                        if is_crdt {
776                            let existing = entry.entry(k.clone()).or_insert(Value::Null);
777                            *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
778                        } else {
779                            entry.insert(k.clone(), v.clone());
780                        }
781                    }
782                }
783            }
784        }
785    }
786
787    /// Batch load labels for multiple vertices.
788    pub async fn get_batch_labels(
789        &self,
790        vids: &[Vid],
791        ctx: Option<&QueryContext>,
792    ) -> Result<HashMap<Vid, Vec<String>>> {
793        let mut result = HashMap::new();
794        if vids.is_empty() {
795            return Ok(result);
796        }
797
798        // Phase 1: Get from L0 layers (oldest to newest)
799        if let Some(ctx) = ctx {
800            let mut collect_labels = |l0: &L0Buffer| {
801                for &vid in vids {
802                    if let Some(labels) = l0.get_vertex_labels(vid) {
803                        result
804                            .entry(vid)
805                            .or_default()
806                            .extend(labels.iter().cloned());
807                    }
808                }
809            };
810
811            for l0_arc in &ctx.pending_flush_l0s {
812                collect_labels(&l0_arc.read());
813            }
814            collect_labels(&ctx.l0.read());
815            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
816                collect_labels(&tx_l0_arc.read());
817            }
818        }
819
820        // Phase 2: Get from storage (try VidLabelsIndex first, then LanceDB fallback)
821        let mut vids_needing_lancedb = Vec::new();
822
823        /// Merge new labels into an existing label list, skipping duplicates.
824        fn merge_labels(existing: &mut Vec<String>, new_labels: Vec<String>) {
825            for l in new_labels {
826                if !existing.contains(&l) {
827                    existing.push(l);
828                }
829            }
830        }
831
832        for &vid in vids {
833            if result.contains_key(&vid) {
834                continue; // Already have labels from L0
835            }
836
837            if let Some(labels) = self.storage.get_labels_from_index(vid) {
838                merge_labels(result.entry(vid).or_default(), labels);
839            } else {
840                vids_needing_lancedb.push(vid);
841            }
842        }
843
844        // Fallback to storage backend for VIDs not in the index
845        if !vids_needing_lancedb.is_empty() {
846            let backend = self.storage.backend();
847            let version = self.storage.version_high_water_mark();
848            let storage_labels = MainVertexDataset::find_batch_labels_by_vids(
849                backend,
850                &vids_needing_lancedb,
851                version,
852            )
853            .await?;
854
855            for (vid, labels) in storage_labels {
856                merge_labels(result.entry(vid).or_default(), labels);
857            }
858        }
859
860        // Deduplicate and sort labels
861        for labels in result.values_mut() {
862            labels.sort();
863            labels.dedup();
864        }
865
866        Ok(result)
867    }
868
869    pub async fn get_all_vertex_props(&self, vid: Vid) -> Result<Properties> {
870        Ok(self
871            .get_all_vertex_props_with_ctx(vid, None)
872            .await?
873            .unwrap_or_default())
874    }
875
876    pub async fn get_all_vertex_props_with_ctx(
877        &self,
878        vid: Vid,
879        ctx: Option<&QueryContext>,
880    ) -> Result<Option<Properties>> {
881        // 1. Check if deleted in any L0 layer
882        if l0_visibility::is_vertex_deleted(vid, ctx) {
883            return Ok(None);
884        }
885
886        // 2. Accumulate properties from L0 layers (oldest to newest)
887        let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
888
889        // 3. Fetch from storage
890        let storage_props_opt = self.fetch_all_props_from_storage(vid).await?;
891
892        // 4. Handle case where vertex doesn't exist in either layer
893        if l0_props.is_none() && storage_props_opt.is_none() {
894            return Ok(None);
895        }
896
897        let mut final_props = l0_props.unwrap_or_default();
898
899        // 5. Merge storage properties (L0 takes precedence)
900        if let Some(storage_props) = storage_props_opt {
901            for (k, v) in storage_props {
902                final_props.entry(k).or_insert(v);
903            }
904        }
905
906        // 6. Normalize CRDT properties - convert JSON strings to JSON objects
907        // In the new storage model, we need to get labels from context/L0
908        if let Some(ctx) = ctx {
909            // Try to get labels from L0 layers
910            let labels = l0_visibility::get_vertex_labels(vid, ctx);
911            for label in &labels {
912                self.normalize_crdt_properties(&mut final_props, label)?;
913            }
914        }
915
916        Ok(Some(final_props))
917    }
918
919    /// Batch-fetch properties for multiple vertices of a known label.
920    ///
921    /// Queries L0 layers in-memory, then fetches remaining VIDs from LanceDB in
922    /// a single `_vid IN (...)` query on the label table. Much faster than
923    /// per-vertex `get_all_vertex_props_with_ctx` when many vertices need loading.
924    pub async fn get_batch_vertex_props_for_label(
925        &self,
926        vids: &[Vid],
927        label: &str,
928        ctx: Option<&QueryContext>,
929    ) -> Result<HashMap<Vid, Properties>> {
930        let mut result: HashMap<Vid, Properties> = HashMap::new();
931        let mut need_storage: Vec<Vid> = Vec::new();
932
933        // Phase 1: Check L0 layers for each VID (fast, in-memory).
934        for &vid in vids {
935            if l0_visibility::is_vertex_deleted(vid, ctx) {
936                continue;
937            }
938            let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
939            if let Some(props) = l0_props {
940                result.insert(vid, props);
941            } else {
942                need_storage.push(vid);
943            }
944        }
945
946        // If everything was resolved from L0, skip storage entirely.
947        if need_storage.is_empty() {
948            // Normalize CRDT properties for L0-resolved vertices.
949            if ctx.is_some() {
950                for props in result.values_mut() {
951                    self.normalize_crdt_properties(props, label)?;
952                }
953            }
954            return Ok(result);
955        }
956
957        // Phase 2: Batch-fetch from LanceDB for remaining VIDs.
958        let schema = self.schema_manager.schema();
959        let label_props = schema.properties.get(label);
960
961        let mut prop_names: Vec<String> = Vec::new();
962        if let Some(props) = label_props {
963            prop_names = props.keys().cloned().collect();
964        }
965
966        let mut columns: Vec<String> = vec![
967            "_vid".to_string(),
968            "_deleted".to_string(),
969            "_version".to_string(),
970        ];
971        columns.extend(prop_names.iter().cloned());
972        columns.push("overflow_json".to_string());
973
974        // Build IN filter for all VIDs at once.
975        let vid_list: String = need_storage
976            .iter()
977            .map(|v| v.as_u64().to_string())
978            .collect::<Vec<_>>()
979            .join(", ");
980        let base_filter = format!("_vid IN ({})", vid_list);
981
982        let filter_expr = self.storage.apply_version_filter(base_filter);
983
984        let table_name = crate::backend::table_names::vertex_table_name(label);
985        let batches: Vec<RecordBatch> = self
986            .storage
987            .backend()
988            .scan(
989                crate::backend::types::ScanRequest::all(&table_name)
990                    .with_filter(&filter_expr)
991                    .with_columns(columns.clone()),
992            )
993            .await?;
994
995        let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
996
997        // Track best version per VID for proper version-based merging.
998        let mut per_vid_best_version: HashMap<Vid, u64> = HashMap::new();
999        let mut per_vid_props: HashMap<Vid, Properties> = HashMap::new();
1000
1001        for batch in batches {
1002            let vid_col = match batch
1003                .column_by_name("_vid")
1004                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1005            {
1006                Some(c) => c,
1007                None => continue,
1008            };
1009            let deleted_col = match batch
1010                .column_by_name("_deleted")
1011                .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1012            {
1013                Some(c) => c,
1014                None => continue,
1015            };
1016            let version_col = match batch
1017                .column_by_name("_version")
1018                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1019            {
1020                Some(c) => c,
1021                None => continue,
1022            };
1023
1024            for row in 0..batch.num_rows() {
1025                let vid = Vid::from(vid_col.value(row));
1026                let version = version_col.value(row);
1027
1028                if deleted_col.value(row) {
1029                    if per_vid_best_version
1030                        .get(&vid)
1031                        .is_none_or(|&best| version >= best)
1032                    {
1033                        per_vid_best_version.insert(vid, version);
1034                        per_vid_props.remove(&vid);
1035                    }
1036                    continue;
1037                }
1038
1039                let mut current_props =
1040                    Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1041
1042                if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1043                    for (k, v) in overflow_props {
1044                        current_props.entry(k).or_insert(v);
1045                    }
1046                }
1047
1048                let best = per_vid_best_version.get(&vid).copied();
1049                let mut best_opt = best;
1050                let mut merged = per_vid_props.remove(&vid);
1051                self.merge_versioned_props(
1052                    current_props,
1053                    version,
1054                    &mut best_opt,
1055                    &mut merged,
1056                    label_props,
1057                )?;
1058                if let Some(v) = best_opt {
1059                    per_vid_best_version.insert(vid, v);
1060                }
1061                if let Some(p) = merged {
1062                    per_vid_props.insert(vid, p);
1063                }
1064            }
1065        }
1066
1067        // Merge storage results with any L0 partial props already in result.
1068        for (vid, storage_props) in per_vid_props {
1069            let entry = result.entry(vid).or_default();
1070            for (k, v) in storage_props {
1071                entry.entry(k).or_insert(v);
1072            }
1073        }
1074
1075        // Mark VIDs that had no data anywhere as absent (don't insert them).
1076        // VIDs not in `result` simply won't appear in the output.
1077
1078        // Phase 3: Normalize CRDT properties.
1079        if ctx.is_some() {
1080            for props in result.values_mut() {
1081                self.normalize_crdt_properties(props, label)?;
1082            }
1083        }
1084
1085        Ok(result)
1086    }
1087
1088    /// Batch-fetch properties for multiple edges of a known type.
1089    ///
1090    /// Mirrors `get_batch_vertex_props_for_label` (above) for the edge path.
1091    /// Issues one `eid IN (...)` scan against the delta table for the edge
1092    /// type, replaying per-EID version history (op-replay + CRDT merge) the
1093    /// same way `fetch_all_edge_props_from_storage_with_hint` does. Far
1094    /// faster than per-edge `get_all_edge_props_with_ctx` when many edges
1095    /// of the same type need loading (e.g., batched SET/REMOVE on edges
1096    /// matched by a MATCH).
1097    ///
1098    /// EIDs of deleted edges or those with no rows in delta storage are
1099    /// omitted from the returned map; callers can fall back to the per-EID
1100    /// path for misses.
1101    pub async fn get_batch_edge_props_for_type(
1102        &self,
1103        eids: &[Eid],
1104        type_name: &str,
1105        ctx: Option<&QueryContext>,
1106    ) -> Result<HashMap<Eid, Properties>> {
1107        use crate::backend::table_names;
1108        use crate::backend::types::ScanRequest;
1109
1110        let mut result: HashMap<Eid, Properties> = HashMap::new();
1111        if eids.is_empty() {
1112            return Ok(result);
1113        }
1114
1115        // Phase 1: L0 check per EID. Skip deleted; serve from L0 if it has
1116        // an accumulated property set; otherwise note for storage scan.
1117        let mut need_storage: Vec<Eid> = Vec::new();
1118        for &eid in eids {
1119            if l0_visibility::is_edge_deleted(eid, ctx) {
1120                continue;
1121            }
1122            let l0_props = l0_visibility::accumulate_edge_props(eid, ctx);
1123            // Edge L0 semantics: even an empty accumulator means "edge exists
1124            // in L0 with no user props yet" — we still go to storage to pick
1125            // up the persisted full row (mirrors get_all_edge_props_with_ctx).
1126            if let Some(props) = l0_props {
1127                result.insert(eid, props);
1128            }
1129            need_storage.push(eid);
1130        }
1131
1132        if need_storage.is_empty() {
1133            return Ok(result);
1134        }
1135
1136        // Phase 2: One scan with `eid IN (...)` on the delta table.
1137        let schema = self.schema_manager.schema();
1138        let type_props = schema.properties.get(type_name);
1139
1140        if self.storage.delta_dataset(type_name, "fwd").is_err() {
1141            return Ok(result);
1142        }
1143
1144        let table_name = table_names::delta_table_name(type_name, "fwd");
1145        let backend = self.storage.backend();
1146        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1147            return Ok(result);
1148        }
1149
1150        let eid_list: String = need_storage
1151            .iter()
1152            .map(|e| e.as_u64().to_string())
1153            .collect::<Vec<_>>()
1154            .join(", ");
1155        let base_filter = format!("eid IN ({})", eid_list);
1156        let filter_expr = self.storage.apply_version_filter(base_filter);
1157
1158        let batches = match backend
1159            .scan(ScanRequest::all(&table_name).with_filter(filter_expr))
1160            .await
1161        {
1162            Ok(b) => b,
1163            Err(_) => return Ok(result), // Storage error: per-row fallback handles correctness
1164        };
1165
1166        // Collect (eid, version, op, props) tuples, then group + replay per EID.
1167        let mut per_eid_rows: HashMap<Eid, Vec<(u64, u8, Properties)>> = HashMap::new();
1168        for batch in batches {
1169            let eid_col = match batch
1170                .column_by_name("eid")
1171                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1172            {
1173                Some(c) => c,
1174                None => continue,
1175            };
1176            let op_col = match batch
1177                .column_by_name("op")
1178                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt8Array>())
1179            {
1180                Some(c) => c,
1181                None => continue,
1182            };
1183            let ver_col = match batch
1184                .column_by_name("_version")
1185                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1186            {
1187                Some(c) => c,
1188                None => continue,
1189            };
1190
1191            for row in 0..batch.num_rows() {
1192                let eid = Eid::from(eid_col.value(row));
1193                let ver = ver_col.value(row);
1194                let op = op_col.value(row);
1195                let mut props = Properties::new();
1196
1197                if op != 1
1198                    && let Some(tp) = type_props
1199                {
1200                    for (p_name, p_meta) in tp {
1201                        if let Some(col) = batch.column_by_name(p_name)
1202                            && !col.is_null(row)
1203                        {
1204                            let val = Self::value_from_column(col.as_ref(), &p_meta.r#type, row)?;
1205                            props.insert(p_name.clone(), val);
1206                        }
1207                    }
1208                }
1209                per_eid_rows.entry(eid).or_default().push((ver, op, props));
1210            }
1211        }
1212
1213        for (eid, mut rows) in per_eid_rows {
1214            rows.sort_by_key(|(ver, _, _)| *ver);
1215
1216            let mut merged_props: Properties = Properties::new();
1217            let mut is_deleted = false;
1218
1219            for (_, op, props) in rows {
1220                if op == 1 {
1221                    is_deleted = true;
1222                    merged_props.clear();
1223                } else {
1224                    is_deleted = false;
1225                    for (p_name, p_val) in props {
1226                        let is_crdt = type_props
1227                            .and_then(|tp| tp.get(&p_name))
1228                            .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1229                            .unwrap_or(false);
1230                        if is_crdt {
1231                            if let Some(existing) = merged_props.get(&p_name) {
1232                                if let Ok(merged) = self.merge_crdt_values(existing, &p_val) {
1233                                    merged_props.insert(p_name, merged);
1234                                }
1235                            } else {
1236                                merged_props.insert(p_name, p_val);
1237                            }
1238                        } else {
1239                            merged_props.insert(p_name, p_val);
1240                        }
1241                    }
1242                }
1243            }
1244
1245            if is_deleted {
1246                // Deleted in storage; remove any L0 accumulation that may
1247                // have been recorded under this EID by Phase 1 (matches
1248                // is_edge_deleted single-EID semantics).
1249                result.remove(&eid);
1250                continue;
1251            }
1252
1253            // L0 takes precedence over storage for shared keys; insert
1254            // storage values only where L0 did not already provide them.
1255            let entry = result.entry(eid).or_default();
1256            for (k, v) in merged_props {
1257                entry.entry(k).or_insert(v);
1258            }
1259        }
1260
1261        Ok(result)
1262    }
1263
1264    /// Normalize CRDT properties by converting JSON strings to JSON objects.
1265    /// This handles the case where CRDT values come from Cypher CREATE statements
1266    /// as `Value::String("{\"t\": \"gc\", ...}")` and need to be parsed into objects.
1267    fn normalize_crdt_properties(&self, props: &mut Properties, label: &str) -> Result<()> {
1268        let schema = self.schema_manager.schema();
1269        let label_props = match schema.properties.get(label) {
1270            Some(p) => p,
1271            None => return Ok(()),
1272        };
1273
1274        for (prop_name, prop_meta) in label_props {
1275            if let DataType::Crdt(_) = prop_meta.r#type
1276                && let Some(val) = props.get_mut(prop_name)
1277            {
1278                *val = Value::from(Self::parse_crdt_value(val)?);
1279            }
1280        }
1281
1282        Ok(())
1283    }
1284
1285    /// Extract properties from a single batch row.
1286    fn extract_row_properties(
1287        batch: &RecordBatch,
1288        row: usize,
1289        prop_names: &[&str],
1290        label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1291    ) -> Result<Properties> {
1292        let mut props = Properties::new();
1293        for name in prop_names {
1294            let col = match batch.column_by_name(name) {
1295                Some(col) => col,
1296                None => continue,
1297            };
1298            if col.is_null(row) {
1299                continue;
1300            }
1301            if let Some(prop_meta) = label_props.and_then(|p| p.get(*name)) {
1302                let val = Self::value_from_column(col.as_ref(), &prop_meta.r#type, row)?;
1303                props.insert((*name).to_string(), val);
1304            }
1305        }
1306        Ok(props)
1307    }
1308
1309    /// Extract overflow properties from the overflow_json column.
1310    ///
1311    /// Returns None if the column doesn't exist or the value is null,
1312    /// otherwise parses the JSON blob and returns the properties.
1313    fn extract_overflow_properties(batch: &RecordBatch, row: usize) -> Result<Option<Properties>> {
1314        use arrow_array::LargeBinaryArray;
1315
1316        let overflow_col = match batch.column_by_name("overflow_json") {
1317            Some(col) => col,
1318            None => return Ok(None), // Column doesn't exist (old schema)
1319        };
1320
1321        if overflow_col.is_null(row) {
1322            return Ok(None);
1323        }
1324
1325        let binary_array = overflow_col
1326            .as_any()
1327            .downcast_ref::<LargeBinaryArray>()
1328            .ok_or_else(|| anyhow!("overflow_json is not LargeBinaryArray"))?;
1329
1330        let jsonb_bytes = binary_array.value(row);
1331
1332        // Decode CypherValue binary
1333        let uni_val = uni_common::cypher_value_codec::decode(jsonb_bytes)
1334            .map_err(|e| anyhow!("Failed to decode CypherValue: {}", e))?;
1335        let json_val: serde_json::Value = uni_val.into();
1336
1337        // Parse to Properties
1338        let overflow_props: Properties = serde_json::from_value(json_val)
1339            .map_err(|e| anyhow!("Failed to parse overflow properties: {}", e))?;
1340
1341        Ok(Some(overflow_props))
1342    }
1343
1344    /// Merge overflow properties from the overflow_json column into an existing props map.
1345    ///
1346    /// Handles two concerns:
1347    /// 1. If `overflow_json` is explicitly requested in `properties`, stores the raw JSONB
1348    ///    bytes as a JSON array of u8 values.
1349    /// 2. Extracts individual overflow properties and merges those that are in `properties`.
1350    fn merge_overflow_into_props(
1351        batch: &RecordBatch,
1352        row: usize,
1353        properties: &[&str],
1354        props: &mut Properties,
1355    ) -> Result<()> {
1356        use arrow_array::LargeBinaryArray;
1357
1358        let overflow_col = match batch.column_by_name("overflow_json") {
1359            Some(col) if !col.is_null(row) => col,
1360            _ => return Ok(()),
1361        };
1362
1363        // Store raw JSONB bytes if explicitly requested
1364        if properties.contains(&"overflow_json")
1365            && let Some(binary_array) = overflow_col.as_any().downcast_ref::<LargeBinaryArray>()
1366        {
1367            let jsonb_bytes = binary_array.value(row);
1368            let bytes_list: Vec<Value> =
1369                jsonb_bytes.iter().map(|&b| Value::Int(b as i64)).collect();
1370            props.insert("overflow_json".to_string(), Value::List(bytes_list));
1371        }
1372
1373        // Extract and merge individual overflow properties
1374        if let Some(overflow_props) = Self::extract_overflow_properties(batch, row)? {
1375            for (k, v) in overflow_props {
1376                if properties.contains(&k.as_str()) {
1377                    props.entry(k).or_insert(v);
1378                }
1379            }
1380        }
1381
1382        Ok(())
1383    }
1384
1385    /// Merge CRDT properties from source into target.
1386    fn merge_crdt_into(
1387        &self,
1388        target: &mut Properties,
1389        source: Properties,
1390        label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1391        crdt_only: bool,
1392    ) -> Result<()> {
1393        for (k, v) in source {
1394            if let Some(prop_meta) = label_props.and_then(|p| p.get(&k)) {
1395                if let DataType::Crdt(_) = prop_meta.r#type {
1396                    let existing_v = target.entry(k).or_insert(Value::Null);
1397                    *existing_v = self.merge_crdt_values(existing_v, &v)?;
1398                } else if !crdt_only {
1399                    target.insert(k, v);
1400                }
1401            }
1402        }
1403        Ok(())
1404    }
1405
1406    /// Handle version-based property merging for storage fetch.
1407    fn merge_versioned_props(
1408        &self,
1409        current_props: Properties,
1410        version: u64,
1411        best_version: &mut Option<u64>,
1412        best_props: &mut Option<Properties>,
1413        label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1414    ) -> Result<()> {
1415        if best_version.is_none_or(|best| version > best) {
1416            // Newest version: strictly newer
1417            if let Some(mut existing_props) = best_props.take() {
1418                // Merge CRDTs from existing into current
1419                let mut merged = current_props;
1420                for (k, v) in merged.iter_mut() {
1421                    if let Some(prop_meta) = label_props.and_then(|p| p.get(k))
1422                        && let DataType::Crdt(_) = prop_meta.r#type
1423                        && let Some(existing_val) = existing_props.remove(k)
1424                    {
1425                        *v = self.merge_crdt_values(v, &existing_val)?;
1426                    }
1427                }
1428                *best_props = Some(merged);
1429            } else {
1430                *best_props = Some(current_props);
1431            }
1432            *best_version = Some(version);
1433        } else if Some(version) == *best_version {
1434            // Same version: merge all properties
1435            if let Some(existing_props) = best_props.as_mut() {
1436                self.merge_crdt_into(existing_props, current_props, label_props, false)?;
1437            } else {
1438                *best_props = Some(current_props);
1439            }
1440        } else {
1441            // Older version: only merge CRDTs
1442            if let Some(existing_props) = best_props.as_mut() {
1443                self.merge_crdt_into(existing_props, current_props, label_props, true)?;
1444            }
1445        }
1446        Ok(())
1447    }
1448
1449    async fn fetch_all_props_from_storage(&self, vid: Vid) -> Result<Option<Properties>> {
1450        // In the new storage model, VID doesn't embed label info.
1451        // We need to scan all label datasets to find the vertex's properties.
1452        let schema = self.schema_manager.schema();
1453        let mut merged_props: Option<Properties> = None;
1454        let mut global_best_version: Option<u64> = None;
1455
1456        // Try VidLabelsIndex for O(1) label resolution
1457        let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1458        {
1459            labels
1460        } else {
1461            schema.labels.keys().cloned().collect() // Fallback to full scan
1462        };
1463
1464        for label_name in &label_names {
1465            let label_props = schema.properties.get(label_name);
1466
1467            // Get property names from schema
1468            let mut prop_names: Vec<String> = Vec::new();
1469            if let Some(props) = label_props {
1470                prop_names = props.keys().cloned().collect();
1471            }
1472
1473            // Build column selection
1474            let mut columns: Vec<String> = vec!["_deleted".to_string(), "_version".to_string()];
1475            columns.extend(prop_names.iter().cloned());
1476            // Add overflow_json column to fetch non-schema properties
1477            columns.push("overflow_json".to_string());
1478
1479            // Query using backend scan API
1480            let base_filter = format!("_vid = {}", vid.as_u64());
1481
1482            let filter_expr = self.storage.apply_version_filter(base_filter);
1483
1484            let table_name = crate::backend::table_names::vertex_table_name(label_name);
1485            let batches: Vec<RecordBatch> = match self
1486                .storage
1487                .backend()
1488                .scan(
1489                    crate::backend::types::ScanRequest::all(&table_name)
1490                        .with_filter(&filter_expr)
1491                        .with_columns(columns.clone()),
1492                )
1493                .await
1494            {
1495                Ok(b) => b,
1496                Err(_) => continue,
1497            };
1498
1499            // Convert Vec<String> to Vec<&str> for downstream use
1500            let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
1501
1502            for batch in batches {
1503                let deleted_col = match batch
1504                    .column_by_name("_deleted")
1505                    .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1506                {
1507                    Some(c) => c,
1508                    None => continue,
1509                };
1510                let version_col = match batch
1511                    .column_by_name("_version")
1512                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1513                {
1514                    Some(c) => c,
1515                    None => continue,
1516                };
1517
1518                for row in 0..batch.num_rows() {
1519                    let version = version_col.value(row);
1520
1521                    if deleted_col.value(row) {
1522                        if global_best_version.is_none_or(|best| version >= best) {
1523                            global_best_version = Some(version);
1524                            merged_props = None;
1525                        }
1526                        continue;
1527                    }
1528
1529                    let mut current_props =
1530                        Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1531
1532                    // Also extract overflow properties from overflow_json column
1533                    if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1534                        // Merge overflow properties into current_props
1535                        for (k, v) in overflow_props {
1536                            current_props.entry(k).or_insert(v);
1537                        }
1538                    }
1539
1540                    self.merge_versioned_props(
1541                        current_props,
1542                        version,
1543                        &mut global_best_version,
1544                        &mut merged_props,
1545                        label_props,
1546                    )?;
1547                }
1548            }
1549        }
1550
1551        // Fallback to main table props_json for unknown/schemaless labels
1552        if merged_props.is_none()
1553            && let Some(main_props) = MainVertexDataset::find_props_by_vid(
1554                self.storage.backend(),
1555                vid,
1556                self.storage.version_high_water_mark(),
1557            )
1558            .await?
1559        {
1560            return Ok(Some(main_props));
1561        }
1562
1563        Ok(merged_props)
1564    }
1565
1566    pub async fn get_vertex_prop(&self, vid: Vid, prop: &str) -> Result<Value> {
1567        self.get_vertex_prop_with_ctx(vid, prop, None).await
1568    }
1569
1570    #[instrument(skip(self, ctx), level = "trace")]
1571    pub async fn get_vertex_prop_with_ctx(
1572        &self,
1573        vid: Vid,
1574        prop: &str,
1575        ctx: Option<&QueryContext>,
1576    ) -> Result<Value> {
1577        // 1. Check if deleted in any L0 layer
1578        if l0_visibility::is_vertex_deleted(vid, ctx) {
1579            return Ok(Value::Null);
1580        }
1581
1582        // 2. Determine if property is CRDT type
1583        // First check labels from context/L0, then fall back to scanning all labels in schema
1584        let schema = self.schema_manager.schema();
1585        let labels = ctx
1586            .map(|c| l0_visibility::get_vertex_labels(vid, c))
1587            .unwrap_or_default();
1588
1589        let is_crdt = if !labels.is_empty() {
1590            // Check labels from context
1591            labels.iter().any(|ln| {
1592                schema
1593                    .properties
1594                    .get(ln)
1595                    .and_then(|lp| lp.get(prop))
1596                    .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1597                    .unwrap_or(false)
1598            })
1599        } else {
1600            // No labels from context - check if property is CRDT in ANY label
1601            schema.properties.values().any(|label_props| {
1602                label_props
1603                    .get(prop)
1604                    .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1605                    .unwrap_or(false)
1606            })
1607        };
1608
1609        // 3. Check L0 chain for property
1610        if is_crdt {
1611            // For CRDT, accumulate and merge values from all L0 layers
1612            let l0_val = self.accumulate_crdt_from_l0(vid, prop, ctx)?;
1613            return self.finalize_crdt_lookup(vid, prop, l0_val).await;
1614        }
1615
1616        // 4. Non-CRDT: Check L0 chain for property (returns first found)
1617        if let Some(val) = l0_visibility::lookup_vertex_prop(vid, prop, ctx) {
1618            return Ok(val);
1619        }
1620
1621        // 5. Check Cache (if enabled)
1622        if let Some(ref cache) = self.vertex_cache {
1623            let mut cache = cache.lock().await;
1624            if let Some(val) = cache.get(&(vid, prop.to_string())) {
1625                debug!(vid = ?vid, prop, "Cache HIT");
1626                metrics::counter!("uni_property_cache_hits_total", "type" => "vertex").increment(1);
1627                return Ok(val.clone());
1628            } else {
1629                debug!(vid = ?vid, prop, "Cache MISS");
1630                metrics::counter!("uni_property_cache_misses_total", "type" => "vertex")
1631                    .increment(1);
1632            }
1633        }
1634
1635        // 6. Fetch from Storage
1636        let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1637
1638        // 7. Update Cache (if enabled)
1639        if let Some(ref cache) = self.vertex_cache {
1640            let mut cache = cache.lock().await;
1641            cache.put((vid, prop.to_string()), storage_val.clone());
1642        }
1643
1644        Ok(storage_val)
1645    }
1646
1647    /// Accumulate CRDT values from all L0 layers by merging them together.
1648    fn accumulate_crdt_from_l0(
1649        &self,
1650        vid: Vid,
1651        prop: &str,
1652        ctx: Option<&QueryContext>,
1653    ) -> Result<Value> {
1654        let mut merged = Value::Null;
1655        l0_visibility::visit_l0_buffers(ctx, |l0| {
1656            if let Some(props) = l0.vertex_properties.get(&vid)
1657                && let Some(val) = props.get(prop)
1658            {
1659                // Note: merge_crdt_values can't fail in practice for valid CRDTs
1660                if let Ok(new_merged) = self.merge_crdt_values(&merged, val) {
1661                    merged = new_merged;
1662                }
1663            }
1664            false // Continue visiting all layers
1665        });
1666        Ok(merged)
1667    }
1668
1669    /// Finalize CRDT lookup by merging with cache/storage.
1670    async fn finalize_crdt_lookup(&self, vid: Vid, prop: &str, l0_val: Value) -> Result<Value> {
1671        // Check Cache (if enabled)
1672        let cached_val = if let Some(ref cache) = self.vertex_cache {
1673            let mut cache = cache.lock().await;
1674            cache.get(&(vid, prop.to_string())).cloned()
1675        } else {
1676            None
1677        };
1678
1679        if let Some(val) = cached_val {
1680            let merged = self.merge_crdt_values(&val, &l0_val)?;
1681            return Ok(merged);
1682        }
1683
1684        // Fetch from Storage
1685        let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1686
1687        // Update Cache (if enabled)
1688        if let Some(ref cache) = self.vertex_cache {
1689            let mut cache = cache.lock().await;
1690            cache.put((vid, prop.to_string()), storage_val.clone());
1691        }
1692
1693        // Merge L0 + Storage
1694        self.merge_crdt_values(&storage_val, &l0_val)
1695    }
1696
1697    async fn fetch_prop_from_storage(&self, vid: Vid, prop: &str) -> Result<Value> {
1698        // In the new storage model, VID doesn't embed label info.
1699        // We need to scan all label datasets to find the property.
1700        let schema = self.schema_manager.schema();
1701        let mut best_version: Option<u64> = None;
1702        let mut best_value: Option<Value> = None;
1703
1704        // Try VidLabelsIndex for O(1) label resolution
1705        let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1706        {
1707            labels
1708        } else {
1709            schema.labels.keys().cloned().collect() // Fallback to full scan
1710        };
1711
1712        for label_name in &label_names {
1713            // Check if property is defined in schema for this label
1714            let prop_meta = schema
1715                .properties
1716                .get(label_name)
1717                .and_then(|props| props.get(prop));
1718
1719            // Even if property is not in schema, we still check overflow_json
1720
1721            // Query using backend scan API
1722            let base_filter = format!("_vid = {}", vid.as_u64());
1723
1724            let filter_expr = self.storage.apply_version_filter(base_filter);
1725
1726            // Always request metadata columns and overflow_json
1727            let mut columns = vec![
1728                "_deleted".to_string(),
1729                "_version".to_string(),
1730                "overflow_json".to_string(),
1731            ];
1732
1733            // Only request the property column if it's defined in schema
1734            if prop_meta.is_some() {
1735                columns.push(prop.to_string());
1736            }
1737
1738            let table_name = crate::backend::table_names::vertex_table_name(label_name);
1739            let batches: Vec<RecordBatch> = match self
1740                .storage
1741                .backend()
1742                .scan(
1743                    crate::backend::types::ScanRequest::all(&table_name)
1744                        .with_filter(&filter_expr)
1745                        .with_columns(columns),
1746                )
1747                .await
1748            {
1749                Ok(b) => b,
1750                Err(_) => continue,
1751            };
1752
1753            for batch in batches {
1754                let deleted_col = match batch
1755                    .column_by_name("_deleted")
1756                    .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1757                {
1758                    Some(c) => c,
1759                    None => continue,
1760                };
1761                let version_col = match batch
1762                    .column_by_name("_version")
1763                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1764                {
1765                    Some(c) => c,
1766                    None => continue,
1767                };
1768                for row in 0..batch.num_rows() {
1769                    let version = version_col.value(row);
1770
1771                    if deleted_col.value(row) {
1772                        if best_version.is_none_or(|best| version >= best) {
1773                            best_version = Some(version);
1774                            best_value = None;
1775                        }
1776                        continue;
1777                    }
1778
1779                    // First try schema column if property is in schema
1780                    let mut val = None;
1781                    if let Some(meta) = prop_meta
1782                        && let Some(col) = batch.column_by_name(prop)
1783                    {
1784                        val = Some(if col.is_null(row) {
1785                            Value::Null
1786                        } else {
1787                            Self::value_from_column(col, &meta.r#type, row)?
1788                        });
1789                    }
1790
1791                    // If not in schema column, check overflow_json
1792                    if val.is_none()
1793                        && let Some(overflow_props) =
1794                            Self::extract_overflow_properties(&batch, row)?
1795                        && let Some(overflow_val) = overflow_props.get(prop)
1796                    {
1797                        val = Some(overflow_val.clone());
1798                    }
1799
1800                    // If we found a value (from schema or overflow), merge it
1801                    if let Some(v) = val {
1802                        if let Some(meta) = prop_meta {
1803                            // Use schema type for merging (handles CRDT)
1804                            self.merge_prop_value(
1805                                v,
1806                                version,
1807                                &meta.r#type,
1808                                &mut best_version,
1809                                &mut best_value,
1810                            )?;
1811                        } else {
1812                            // Overflow property: use simple LWW merging
1813                            if best_version.is_none_or(|best| version >= best) {
1814                                best_version = Some(version);
1815                                best_value = Some(v);
1816                            }
1817                        }
1818                    }
1819                }
1820            }
1821        }
1822        Ok(best_value.unwrap_or(Value::Null))
1823    }
1824
1825    /// Decode an Arrow column value with strict CRDT error handling.
1826    pub fn value_from_column(col: &dyn Array, data_type: &DataType, row: usize) -> Result<Value> {
1827        crate::storage::value_codec::decode_column_value(
1828            col,
1829            data_type,
1830            row,
1831            CrdtDecodeMode::Strict,
1832        )
1833    }
1834
1835    /// Merge two `Value`-wrapped CRDT operands.
1836    ///
1837    /// Routes through [`uni_crdt::Crdt::merge_via_registry`] using the
1838    /// `PropertyManager`'s `plugin_registry`. With an empty registry
1839    /// (the legacy 3-arg [`Self::new`] default) `merge_via_registry`
1840    /// falls back to `Crdt::try_merge`, preserving native semantics.
1841    ///
1842    /// # Errors
1843    ///
1844    /// Returns an `anyhow::Error` when either operand is malformed
1845    /// CRDT JSON, the variants disagree, or the registry-dispatched
1846    /// merge surfaces a `CrdtError`.
1847    pub fn merge_crdt_values(&self, a: &Value, b: &Value) -> Result<Value> {
1848        // Handle the case where values are JSON strings containing CRDT JSON
1849        // (this happens when values come from Cypher CREATE statements)
1850        // Parse before checking for null to ensure proper format conversion
1851        if a.is_null() {
1852            return Self::parse_crdt_value(b).map(Value::from);
1853        }
1854        if b.is_null() {
1855            return Self::parse_crdt_value(a).map(Value::from);
1856        }
1857
1858        let a_parsed = Self::parse_crdt_value(a)?;
1859        let b_parsed = Self::parse_crdt_value(b)?;
1860
1861        let mut crdt_a: Crdt = serde_json::from_value(a_parsed)?;
1862        let crdt_b: Crdt = serde_json::from_value(b_parsed)?;
1863        // M10 follow-up: route through `merge_via_registry` so a
1864        // hot-reloaded `CrdtKindProvider` plugin can intercept the
1865        // merge. With an empty registry (the 3-arg `new()` default)
1866        // this falls back to `Crdt::try_merge`, preserving prior
1867        // behavior bit-for-bit.
1868        crdt_a
1869            .merge_via_registry(&crdt_b, &self.plugin_registry)
1870            .map_err(|e| anyhow::anyhow!("{e}"))?;
1871        Ok(Value::from(serde_json::to_value(crdt_a)?))
1872    }
1873
1874    /// Parse a CRDT value that may be either a JSON object or a JSON string containing JSON.
1875    /// Returns `serde_json::Value` for internal CRDT processing.
1876    fn parse_crdt_value(val: &Value) -> Result<serde_json::Value> {
1877        if let Value::String(s) = val {
1878            // Value is a JSON string - parse the string content as JSON
1879            serde_json::from_str(s).map_err(|e| anyhow!("Failed to parse CRDT JSON string: {}", e))
1880        } else {
1881            // Convert uni_common::Value to serde_json::Value for CRDT processing
1882            Ok(serde_json::Value::from(val.clone()))
1883        }
1884    }
1885
1886    /// Merge a property value based on version, handling CRDT vs LWW semantics.
1887    fn merge_prop_value(
1888        &self,
1889        val: Value,
1890        version: u64,
1891        data_type: &DataType,
1892        best_version: &mut Option<u64>,
1893        best_value: &mut Option<Value>,
1894    ) -> Result<()> {
1895        if let DataType::Crdt(_) = data_type {
1896            self.merge_crdt_prop_value(val, version, best_version, best_value)
1897        } else {
1898            // Standard LWW
1899            if best_version.is_none_or(|best| version >= best) {
1900                *best_version = Some(version);
1901                *best_value = Some(val);
1902            }
1903            Ok(())
1904        }
1905    }
1906
1907    /// Merge CRDT property values across versions (CRDTs merge regardless of version).
1908    fn merge_crdt_prop_value(
1909        &self,
1910        val: Value,
1911        version: u64,
1912        best_version: &mut Option<u64>,
1913        best_value: &mut Option<Value>,
1914    ) -> Result<()> {
1915        if best_version.is_none_or(|best| version > best) {
1916            // Newer version: merge with existing if present
1917            if let Some(existing) = best_value.take() {
1918                *best_value = Some(self.merge_crdt_values(&val, &existing)?);
1919            } else {
1920                *best_value = Some(val);
1921            }
1922            *best_version = Some(version);
1923        } else if Some(version) == *best_version {
1924            // Same version: merge
1925            let existing = best_value.get_or_insert(Value::Null);
1926            *existing = self.merge_crdt_values(existing, &val)?;
1927        } else {
1928            // Older version: still merge for CRDTs
1929            if let Some(existing) = best_value.as_mut() {
1930                *existing = self.merge_crdt_values(existing, &val)?;
1931            }
1932        }
1933        Ok(())
1934    }
1935}