Skip to main content

uni_store/runtime/
property_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::l0::L0Buffer;
6use crate::runtime::l0_visibility;
7use crate::storage::main_vertex::MainVertexDataset;
8use crate::storage::manager::StorageManager;
9use crate::storage::value_codec::CrdtDecodeMode;
10use anyhow::{Result, anyhow};
11use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
12use lru::LruCache;
13use metrics;
14use std::collections::HashMap;
15use std::num::NonZeroUsize;
16use std::sync::Arc;
17use tokio::sync::Mutex;
18use tracing::{debug, instrument, warn};
19use uni_common::Properties;
20use uni_common::Value;
21use uni_common::core::id::{Eid, Vid};
22use uni_common::core::schema::{DataType, SchemaManager};
23use uni_crdt::Crdt;
24
25pub struct PropertyManager {
26    storage: Arc<StorageManager>,
27    schema_manager: Arc<SchemaManager>,
28    /// Plugin registry consulted by CRDT merges via
29    /// [`uni_crdt::Crdt::merge_via_registry`]. The legacy 3-arg
30    /// [`Self::new`] passes an empty registry so callers that don't
31    /// wire plugin dispatch keep getting bit-identical native
32    /// behavior (empty registry → `merge_via_registry`'s native
33    /// fallback). Production paths in `UniInner` use
34    /// [`Self::with_plugin_registry`] to share the host's registry.
35    plugin_registry: Arc<uni_plugin::PluginRegistry>,
36    /// Cache is None when capacity=0 (caching disabled)
37    vertex_cache: Option<Mutex<LruCache<(Vid, String), Value>>>,
38    edge_cache: Option<Mutex<LruCache<(uni_common::core::id::Eid, String), Value>>>,
39    cache_capacity: usize,
40}
41
42impl PropertyManager {
43    /// Construct a `PropertyManager` with an empty plugin registry.
44    ///
45    /// Back-compat shim for the ~17 algorithm and test call sites that
46    /// don't need registry-dispatched CRDT merges. Equivalent to
47    /// [`Self::with_plugin_registry`] with `Arc::new(PluginRegistry::new())`.
48    pub fn new(
49        storage: Arc<StorageManager>,
50        schema_manager: Arc<SchemaManager>,
51        capacity: usize,
52    ) -> Self {
53        Self::with_plugin_registry(
54            storage,
55            schema_manager,
56            capacity,
57            Arc::new(uni_plugin::PluginRegistry::new()),
58        )
59    }
60
61    /// Construct a `PropertyManager` wired to a shared `PluginRegistry`.
62    ///
63    /// CRDT merges in this `PropertyManager` consult `plugin_registry`
64    /// for `CrdtKindProvider`s matching each `Crdt::kind()`; matched
65    /// kinds dispatch through the provider (so hot-reloaded plugins
66    /// take effect immediately), unmatched kinds fall back to the
67    /// native `Crdt::try_merge`.
68    pub fn with_plugin_registry(
69        storage: Arc<StorageManager>,
70        schema_manager: Arc<SchemaManager>,
71        capacity: usize,
72        plugin_registry: Arc<uni_plugin::PluginRegistry>,
73    ) -> Self {
74        // Capacity of 0 disables caching
75        let (vertex_cache, edge_cache) = if capacity == 0 {
76            (None, None)
77        } else {
78            let cap = NonZeroUsize::new(capacity).unwrap();
79            (
80                Some(Mutex::new(LruCache::new(cap))),
81                Some(Mutex::new(LruCache::new(cap))),
82            )
83        };
84
85        Self {
86            storage,
87            schema_manager,
88            plugin_registry,
89            vertex_cache,
90            edge_cache,
91            cache_capacity: capacity,
92        }
93    }
94
95    pub fn cache_size(&self) -> usize {
96        self.cache_capacity
97    }
98
99    /// Check if caching is enabled
100    pub fn caching_enabled(&self) -> bool {
101        self.cache_capacity > 0
102    }
103
104    /// Clear all caches.
105    /// Call this when L0 is rotated, flushed, or compaction occurs to prevent stale reads.
106    pub async fn clear_cache(&self) {
107        if let Some(ref cache) = self.vertex_cache {
108            cache.lock().await.clear();
109        }
110        if let Some(ref cache) = self.edge_cache {
111            cache.lock().await.clear();
112        }
113    }
114
115    /// Invalidate a specific vertex's cached properties.
116    pub async fn invalidate_vertex(&self, _vid: Vid) {
117        if let Some(ref cache) = self.vertex_cache {
118            let mut cache = cache.lock().await;
119            // LruCache doesn't have a way to iterate and remove, so we pop entries
120            // that match the vid. This is O(n) but necessary for targeted invalidation.
121            // For simplicity, clear the entire cache - LRU will repopulate as needed.
122            cache.clear();
123        }
124    }
125
126    /// Invalidate a specific edge's cached properties.
127    pub async fn invalidate_edge(&self, _eid: uni_common::core::id::Eid) {
128        if let Some(ref cache) = self.edge_cache {
129            let mut cache = cache.lock().await;
130            // Same approach as invalidate_vertex
131            cache.clear();
132        }
133    }
134
135    #[instrument(skip(self, ctx), level = "trace")]
136    pub async fn get_edge_prop(
137        &self,
138        eid: uni_common::core::id::Eid,
139        prop: &str,
140        ctx: Option<&QueryContext>,
141    ) -> Result<Value> {
142        // 1. Check if deleted in any L0 layer
143        if l0_visibility::is_edge_deleted(eid, ctx) {
144            return Ok(Value::Null);
145        }
146
147        // 2. Check L0 chain for property (transaction -> main -> pending)
148        if let Some(val) = l0_visibility::lookup_edge_prop(eid, prop, ctx) {
149            return Ok(val);
150        }
151
152        // 3. Check Cache (if enabled)
153        if let Some(ref cache) = self.edge_cache {
154            let mut cache = cache.lock().await;
155            if let Some(val) = cache.get(&(eid, prop.to_string())) {
156                debug!(eid = ?eid, prop, "Cache HIT");
157                metrics::counter!("uni_property_cache_hits_total", "type" => "edge").increment(1);
158                return Ok(val.clone());
159            } else {
160                debug!(eid = ?eid, prop, "Cache MISS");
161                metrics::counter!("uni_property_cache_misses_total", "type" => "edge").increment(1);
162            }
163        }
164
165        // 4. Fetch from Storage
166        let all = self.get_all_edge_props_with_ctx(eid, ctx).await?;
167        let val = all
168            .as_ref()
169            .and_then(|props| props.get(prop).cloned())
170            .unwrap_or(Value::Null);
171
172        // 5. Update Cache (if enabled) - Cache ALL fetched properties, not just requested one
173        if let Some(ref cache) = self.edge_cache {
174            let mut cache = cache.lock().await;
175            if let Some(ref props) = all {
176                for (prop_name, prop_val) in props {
177                    cache.put((eid, prop_name.clone()), prop_val.clone());
178                }
179            } else {
180                // No properties found, cache the null result for this property
181                cache.put((eid, prop.to_string()), Value::Null);
182            }
183        }
184
185        Ok(val)
186    }
187
188    pub async fn get_all_edge_props_with_ctx(
189        &self,
190        eid: uni_common::core::id::Eid,
191        ctx: Option<&QueryContext>,
192    ) -> Result<Option<Properties>> {
193        // 1. Check if deleted in any L0 layer
194        if l0_visibility::is_edge_deleted(eid, ctx) {
195            return Ok(None);
196        }
197
198        // 2. Accumulate properties from L0 layers (oldest to newest)
199        let mut final_props = l0_visibility::accumulate_edge_props(eid, ctx).unwrap_or_default();
200
201        // 3. Fetch from storage runs
202        let storage_props = self.fetch_all_edge_props_from_storage(eid).await?;
203
204        // 4. Handle case where edge exists but has no properties
205        if final_props.is_empty() && storage_props.is_none() {
206            if l0_visibility::edge_exists_in_l0(eid, ctx) {
207                return Ok(Some(Properties::new()));
208            }
209            return Ok(None);
210        }
211
212        // 5. Merge storage properties (L0 takes precedence)
213        if let Some(sp) = storage_props {
214            for (k, v) in sp {
215                final_props.entry(k).or_insert(v);
216            }
217        }
218
219        Ok(Some(final_props))
220    }
221
222    async fn fetch_all_edge_props_from_storage(&self, eid: Eid) -> Result<Option<Properties>> {
223        // In the new design, we scan all edge types since EID doesn't embed type info
224        self.fetch_all_edge_props_from_storage_with_hint(eid, None)
225            .await
226    }
227
228    async fn fetch_all_edge_props_from_storage_with_hint(
229        &self,
230        eid: Eid,
231        type_name_hint: Option<&str>,
232    ) -> Result<Option<Properties>> {
233        let schema = self.schema_manager.schema();
234        let backend = self.storage.backend();
235
236        // If hint provided, use it directly
237        let type_names: Vec<&str> = if let Some(hint) = type_name_hint {
238            vec![hint]
239        } else {
240            // Scan all edge types
241            schema.edge_types.keys().map(|s| s.as_str()).collect()
242        };
243
244        for type_name in type_names {
245            let type_props = schema.properties.get(type_name);
246
247            // For now, edges are primarily in Delta runs before compaction to L2 CSR.
248            // We check FWD delta runs.
249            if self.storage.delta_dataset(type_name, "fwd").is_err() {
250                continue; // Edge type doesn't exist, try next
251            }
252
253            // Use backend for edge property lookup
254            use crate::backend::table_names;
255            use crate::backend::types::ScanRequest;
256
257            let table_name = table_names::delta_table_name(type_name, "fwd");
258            if !backend.table_exists(&table_name).await.unwrap_or(false) {
259                continue; // No data for this type, try next
260            }
261
262            let base_filter = format!("eid = {}", eid.as_u64());
263            let filter_expr = self.storage.apply_version_filter(base_filter);
264
265            let batches = match backend
266                .scan(ScanRequest::all(&table_name).with_filter(filter_expr))
267                .await
268            {
269                Ok(b) => b,
270                Err(_) => continue,
271            };
272
273            // Collect all rows for this edge, sorted by version
274            let mut rows: Vec<(u64, u8, Properties)> = Vec::new();
275
276            for batch in batches {
277                let op_col = match batch.column_by_name("op") {
278                    Some(c) => c
279                        .as_any()
280                        .downcast_ref::<arrow_array::UInt8Array>()
281                        .unwrap(),
282                    None => continue,
283                };
284                let ver_col = match batch.column_by_name("_version") {
285                    Some(c) => c.as_any().downcast_ref::<UInt64Array>().unwrap(),
286                    None => continue,
287                };
288
289                for row in 0..batch.num_rows() {
290                    let ver = ver_col.value(row);
291                    let op = op_col.value(row);
292                    let mut props = Properties::new();
293
294                    if op != 1 {
295                        // Not a delete - extract properties
296                        if let Some(tp) = type_props {
297                            for (p_name, p_meta) in tp {
298                                if let Some(col) = batch.column_by_name(p_name)
299                                    && !col.is_null(row)
300                                {
301                                    let val =
302                                        Self::value_from_column(col.as_ref(), &p_meta.r#type, row)?;
303                                    props.insert(p_name.clone(), val);
304                                }
305                            }
306                        }
307                    }
308                    rows.push((ver, op, props));
309                }
310            }
311
312            if rows.is_empty() {
313                continue;
314            }
315
316            // Sort by version (ascending) so we merge in order
317            rows.sort_by_key(|(ver, _, _)| *ver);
318
319            // Merge properties across all versions
320            // For CRDT properties: merge values
321            // For non-CRDT properties: later versions overwrite earlier ones
322            let mut merged_props: Properties = Properties::new();
323            let mut is_deleted = false;
324
325            for (_, op, props) in rows {
326                if op == 1 {
327                    // Delete operation - mark as deleted
328                    is_deleted = true;
329                    merged_props.clear();
330                } else {
331                    is_deleted = false;
332                    for (p_name, p_val) in props {
333                        // Check if this is a CRDT property
334                        let is_crdt = type_props
335                            .and_then(|tp| tp.get(&p_name))
336                            .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
337                            .unwrap_or(false);
338
339                        if is_crdt {
340                            // Merge CRDT values
341                            if let Some(existing) = merged_props.get(&p_name) {
342                                if let Ok(merged) = self.merge_crdt_values(existing, &p_val) {
343                                    merged_props.insert(p_name, merged);
344                                }
345                            } else {
346                                merged_props.insert(p_name, p_val);
347                            }
348                        } else {
349                            // Non-CRDT: later version overwrites
350                            merged_props.insert(p_name, p_val);
351                        }
352                    }
353                }
354            }
355
356            if is_deleted {
357                return Ok(None);
358            }
359
360            if !merged_props.is_empty() {
361                return Ok(Some(merged_props));
362            }
363        }
364
365        // Fallback to main edges table props_json for unknown/schemaless types
366        use crate::storage::main_edge::MainEdgeDataset;
367        if let Some(props) = MainEdgeDataset::find_props_by_eid(self.storage.backend(), eid).await?
368        {
369            return Ok(Some(props));
370        }
371
372        Ok(None)
373    }
374
375    /// Batch load properties for multiple vertices
376    pub async fn get_batch_vertex_props(
377        &self,
378        vids: &[Vid],
379        properties: &[&str],
380        ctx: Option<&QueryContext>,
381    ) -> Result<HashMap<Vid, Properties>> {
382        let schema = self.schema_manager.schema();
383        let mut result = HashMap::new();
384        if vids.is_empty() {
385            return Ok(result);
386        }
387
388        // In the new storage model, VIDs are pure auto-increment and don't embed label info.
389        // We need to scan all label datasets to find the vertices.
390
391        // Try VidLabelsIndex for O(1) label resolution
392        let labels_to_scan: Vec<String> = {
393            let mut needed: std::collections::HashSet<String> = std::collections::HashSet::new();
394            let mut all_resolved = true;
395            for &vid in vids {
396                if let Some(labels) = self.storage.get_labels_from_index(vid) {
397                    needed.extend(labels);
398                } else {
399                    all_resolved = false;
400                    break;
401                }
402            }
403            if all_resolved {
404                needed.into_iter().collect()
405            } else {
406                schema.labels.keys().cloned().collect() // Fallback to full scan
407            }
408        };
409
410        // 2. Fetch from storage - scan relevant label datasets
411        for label_name in &labels_to_scan {
412            // Filter to properties that exist in this label's schema
413            let label_schema_props = schema.properties.get(label_name);
414            let valid_props: Vec<&str> = properties
415                .iter()
416                .cloned()
417                .filter(|p| label_schema_props.is_some_and(|props| props.contains_key(*p)))
418                .collect();
419            // Note: don't skip when valid_props is empty; overflow_json may have the properties
420
421            let ds = self.storage.vertex_dataset(label_name)?;
422            let backend = self.storage.backend();
423            let vtable_name = ds.table_name();
424
425            if !backend.table_exists(&vtable_name).await.unwrap_or(false) {
426                continue; // Table doesn't exist yet — skip this label
427            }
428
429            // Construct filter: _vid IN (...)
430            let vid_list = vids
431                .iter()
432                .map(|v| v.as_u64().to_string())
433                .collect::<Vec<_>>()
434                .join(",");
435            let base_filter = format!("_vid IN ({})", vid_list);
436
437            let final_filter = self.storage.apply_version_filter(base_filter);
438
439            // Build column list for projection
440            let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
441            columns.push("_vid".to_string());
442            columns.push("_version".to_string());
443            columns.push("_deleted".to_string());
444            columns.extend(valid_props.iter().map(|s| s.to_string()));
445            // Add overflow_json to fetch non-schema properties
446            columns.push("overflow_json".to_string());
447
448            use crate::backend::types::ScanRequest;
449            let request = ScanRequest::all(&vtable_name)
450                .with_filter(final_filter)
451                .with_columns(columns);
452
453            let batches: Vec<RecordBatch> = match backend.scan(request).await {
454                Ok(b) => b,
455                Err(e) => {
456                    warn!(
457                        label = %label_name,
458                        error = %e,
459                        "failed to scan label table, skipping"
460                    );
461                    continue;
462                }
463            };
464            for batch in batches {
465                let vid_col = match batch
466                    .column_by_name("_vid")
467                    .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
468                {
469                    Some(c) => c,
470                    None => continue,
471                };
472                let del_col = match batch
473                    .column_by_name("_deleted")
474                    .and_then(|col| col.as_any().downcast_ref::<BooleanArray>())
475                {
476                    Some(c) => c,
477                    None => continue,
478                };
479
480                for row in 0..batch.num_rows() {
481                    let vid = Vid::from(vid_col.value(row));
482
483                    if del_col.value(row) {
484                        result.remove(&vid);
485                        continue;
486                    }
487
488                    let label_props = schema.properties.get(label_name);
489                    let mut props =
490                        Self::extract_row_properties(&batch, row, &valid_props, label_props)?;
491                    Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
492                    result.insert(vid, props);
493                }
494            }
495        }
496
497        // 3. Overlay L0 buffers in age order: pending (oldest to newest) -> current -> transaction
498        if let Some(ctx) = ctx {
499            // First, overlay pending flush L0s in order (oldest first, so iterate forward)
500            for pending_l0_arc in &ctx.pending_flush_l0s {
501                let pending_l0 = pending_l0_arc.read();
502                self.overlay_l0_batch(vids, &pending_l0, properties, &mut result);
503            }
504
505            // Then overlay current L0 (newer than pending)
506            let l0 = ctx.l0.read();
507            self.overlay_l0_batch(vids, &l0, properties, &mut result);
508
509            // Finally overlay transaction L0 (newest)
510            // Skip transaction L0 if querying a snapshot
511            // (Transaction changes are at current version, not in snapshot)
512            if self.storage.version_high_water_mark().is_none()
513                && let Some(tx_l0_arc) = &ctx.transaction_l0
514            {
515                let tx_l0 = tx_l0_arc.read();
516                self.overlay_l0_batch(vids, &tx_l0, properties, &mut result);
517            }
518        }
519
520        Ok(result)
521    }
522
523    fn overlay_l0_batch(
524        &self,
525        vids: &[Vid],
526        l0: &L0Buffer,
527        properties: &[&str],
528        result: &mut HashMap<Vid, Properties>,
529    ) {
530        let schema = self.schema_manager.schema();
531        for &vid in vids {
532            // If deleted in L0, remove from result
533            if l0.vertex_tombstones.contains(&vid) {
534                result.remove(&vid);
535                continue;
536            }
537            // If in L0, check version before merging
538            if let Some(l0_props) = l0.vertex_properties.get(&vid) {
539                // Skip entries beyond snapshot boundary
540                let entry_version = l0.vertex_versions.get(&vid).copied().unwrap_or(0);
541                if self
542                    .storage
543                    .version_high_water_mark()
544                    .is_some_and(|hwm| entry_version > hwm)
545                {
546                    continue;
547                }
548
549                let entry = result.entry(vid).or_default();
550                // In new storage model, get labels from L0Buffer
551                let labels = l0.get_vertex_labels(vid);
552
553                for (k, v) in l0_props {
554                    if properties.contains(&k.as_str()) {
555                        // Check if property is CRDT by looking up in any of the vertex's labels
556                        let is_crdt = labels
557                            .and_then(|label_list| {
558                                label_list.iter().find_map(|ln| {
559                                    schema
560                                        .properties
561                                        .get(ln)
562                                        .and_then(|lp| lp.get(k))
563                                        .filter(|pm| matches!(pm.r#type, DataType::Crdt(_)))
564                                })
565                            })
566                            .is_some();
567
568                        if is_crdt {
569                            let existing = entry.entry(k.clone()).or_insert(Value::Null);
570                            *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
571                        } else {
572                            entry.insert(k.clone(), v.clone());
573                        }
574                    }
575                }
576            }
577        }
578    }
579
580    /// Load properties as Arrow columns for vectorized processing
581    /// Batch load properties for multiple edges
582    pub async fn get_batch_edge_props(
583        &self,
584        eids: &[uni_common::core::id::Eid],
585        properties: &[&str],
586        ctx: Option<&QueryContext>,
587    ) -> Result<HashMap<Vid, Properties>> {
588        let schema = self.schema_manager.schema();
589        let mut result = HashMap::new();
590        if eids.is_empty() {
591            return Ok(result);
592        }
593
594        // In the new storage model, EIDs are pure auto-increment and don't embed type info.
595        // We need to scan all edge type datasets to find the edges.
596
597        // Try to resolve edge types from L0 context for O(1) lookup
598        let types_to_scan: Vec<String> = {
599            if let Some(ctx) = ctx {
600                let mut needed: std::collections::HashSet<String> =
601                    std::collections::HashSet::new();
602                let mut all_resolved = true;
603                for &eid in eids {
604                    if let Some(etype) = ctx.l0.read().get_edge_type(eid) {
605                        needed.insert(etype.to_string());
606                    } else {
607                        all_resolved = false;
608                        break;
609                    }
610                }
611                if all_resolved {
612                    needed.into_iter().collect()
613                } else {
614                    schema.edge_types.keys().cloned().collect() // Fallback to full scan
615                }
616            } else {
617                schema.edge_types.keys().cloned().collect() // No context, full scan
618            }
619        };
620
621        // 2. Fetch from storage (Delta runs) - scan relevant edge types
622        for type_name in &types_to_scan {
623            let type_props = schema.properties.get(type_name);
624            let valid_props: Vec<&str> = properties
625                .iter()
626                .cloned()
627                .filter(|p| type_props.is_some_and(|props| props.contains_key(*p)))
628                .collect();
629            // Note: don't skip when valid_props is empty; overflow_json may have the properties
630
631            let delta_ds = match self.storage.delta_dataset(type_name, "fwd") {
632                Ok(ds) => ds,
633                Err(_) => continue,
634            };
635            let backend = self.storage.backend();
636            let dtable_name = delta_ds.table_name();
637
638            if !backend.table_exists(&dtable_name).await.unwrap_or(false) {
639                continue; // Table doesn't exist yet — skip this edge type
640            }
641
642            let eid_list = eids
643                .iter()
644                .map(|e| e.as_u64().to_string())
645                .collect::<Vec<_>>()
646                .join(",");
647            let base_filter = format!("eid IN ({})", eid_list);
648
649            let final_filter = self.storage.apply_version_filter(base_filter);
650
651            // Build column list for projection
652            let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
653            columns.push("eid".to_string());
654            columns.push("_version".to_string());
655            columns.push("op".to_string());
656            columns.extend(valid_props.iter().map(|s| s.to_string()));
657            // Add overflow_json to fetch non-schema properties
658            columns.push("overflow_json".to_string());
659
660            use crate::backend::types::ScanRequest;
661            let request = ScanRequest::all(&dtable_name)
662                .with_filter(final_filter)
663                .with_columns(columns);
664
665            let batches: Vec<RecordBatch> = match backend.scan(request).await {
666                Ok(b) => b,
667                Err(e) => {
668                    warn!(
669                        edge_type = %type_name,
670                        error = %e,
671                        "failed to scan edge delta table, skipping"
672                    );
673                    continue;
674                }
675            };
676            for batch in batches {
677                let eid_col = match batch
678                    .column_by_name("eid")
679                    .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
680                {
681                    Some(c) => c,
682                    None => continue,
683                };
684                let op_col = match batch
685                    .column_by_name("op")
686                    .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
687                {
688                    Some(c) => c,
689                    None => continue,
690                };
691
692                for row in 0..batch.num_rows() {
693                    let eid = uni_common::core::id::Eid::from(eid_col.value(row));
694
695                    // op=1 is Delete
696                    if op_col.value(row) == 1 {
697                        result.remove(&Vid::from(eid.as_u64()));
698                        continue;
699                    }
700
701                    let mut props =
702                        Self::extract_row_properties(&batch, row, &valid_props, type_props)?;
703                    Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
704                    // Reuse Vid as key for compatibility with materialized_property
705                    result.insert(Vid::from(eid.as_u64()), props);
706                }
707            }
708        }
709
710        // 3. Overlay L0 buffers in age order: pending (oldest to newest) -> current -> transaction
711        if let Some(ctx) = ctx {
712            // First, overlay pending flush L0s in order (oldest first, so iterate forward)
713            for pending_l0_arc in &ctx.pending_flush_l0s {
714                let pending_l0 = pending_l0_arc.read();
715                self.overlay_l0_edge_batch(eids, &pending_l0, properties, &mut result);
716            }
717
718            // Then overlay current L0 (newer than pending)
719            let l0 = ctx.l0.read();
720            self.overlay_l0_edge_batch(eids, &l0, properties, &mut result);
721
722            // Finally overlay transaction L0 (newest)
723            // Skip transaction L0 if querying a snapshot
724            // (Transaction changes are at current version, not in snapshot)
725            if self.storage.version_high_water_mark().is_none()
726                && let Some(tx_l0_arc) = &ctx.transaction_l0
727            {
728                let tx_l0 = tx_l0_arc.read();
729                self.overlay_l0_edge_batch(eids, &tx_l0, properties, &mut result);
730            }
731        }
732
733        Ok(result)
734    }
735
736    fn overlay_l0_edge_batch(
737        &self,
738        eids: &[uni_common::core::id::Eid],
739        l0: &L0Buffer,
740        properties: &[&str],
741        result: &mut HashMap<Vid, Properties>,
742    ) {
743        let schema = self.schema_manager.schema();
744        for &eid in eids {
745            let vid_key = Vid::from(eid.as_u64());
746            if l0.tombstones.contains_key(&eid) {
747                result.remove(&vid_key);
748                continue;
749            }
750            if let Some(l0_props) = l0.edge_properties.get(&eid) {
751                // Skip entries beyond snapshot boundary
752                let entry_version = l0.edge_versions.get(&eid).copied().unwrap_or(0);
753                if self
754                    .storage
755                    .version_high_water_mark()
756                    .is_some_and(|hwm| entry_version > hwm)
757                {
758                    continue;
759                }
760
761                let entry = result.entry(vid_key).or_default();
762                // In new storage model, get edge type from L0Buffer
763                let type_name = l0.get_edge_type(eid);
764
765                let include_all = properties.contains(&"_all_props");
766                for (k, v) in l0_props {
767                    if include_all || properties.contains(&k.as_str()) {
768                        // Check if property is CRDT
769                        let is_crdt = type_name
770                            .and_then(|tn| schema.properties.get(tn))
771                            .and_then(|tp| tp.get(k))
772                            .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
773                            .unwrap_or(false);
774
775                        if is_crdt {
776                            let existing = entry.entry(k.clone()).or_insert(Value::Null);
777                            *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
778                        } else {
779                            entry.insert(k.clone(), v.clone());
780                        }
781                    }
782                }
783            }
784        }
785    }
786
787    /// Batch load labels for multiple vertices.
788    pub async fn get_batch_labels(
789        &self,
790        vids: &[Vid],
791        ctx: Option<&QueryContext>,
792    ) -> Result<HashMap<Vid, Vec<String>>> {
793        let mut result = HashMap::new();
794        if vids.is_empty() {
795            return Ok(result);
796        }
797
798        // Phase 1: Get from L0 layers (oldest to newest)
799        if let Some(ctx) = ctx {
800            let mut collect_labels = |l0: &L0Buffer| {
801                for &vid in vids {
802                    if let Some(labels) = l0.get_vertex_labels(vid) {
803                        result
804                            .entry(vid)
805                            .or_default()
806                            .extend(labels.iter().cloned());
807                    }
808                }
809            };
810
811            for l0_arc in &ctx.pending_flush_l0s {
812                collect_labels(&l0_arc.read());
813            }
814            collect_labels(&ctx.l0.read());
815            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
816                collect_labels(&tx_l0_arc.read());
817            }
818        }
819
820        // Phase 2: Get from storage (try VidLabelsIndex first, then LanceDB fallback)
821        let mut vids_needing_lancedb = Vec::new();
822
823        /// Merge new labels into an existing label list, skipping duplicates.
824        fn merge_labels(existing: &mut Vec<String>, new_labels: Vec<String>) {
825            for l in new_labels {
826                if !existing.contains(&l) {
827                    existing.push(l);
828                }
829            }
830        }
831
832        for &vid in vids {
833            if result.contains_key(&vid) {
834                continue; // Already have labels from L0
835            }
836
837            if let Some(labels) = self.storage.get_labels_from_index(vid) {
838                merge_labels(result.entry(vid).or_default(), labels);
839            } else {
840                vids_needing_lancedb.push(vid);
841            }
842        }
843
844        // Fallback to storage backend for VIDs not in the index
845        if !vids_needing_lancedb.is_empty() {
846            let backend = self.storage.backend();
847            let version = self.storage.version_high_water_mark();
848            let storage_labels = MainVertexDataset::find_batch_labels_by_vids(
849                backend,
850                &vids_needing_lancedb,
851                version,
852            )
853            .await?;
854
855            for (vid, labels) in storage_labels {
856                merge_labels(result.entry(vid).or_default(), labels);
857            }
858        }
859
860        // Deduplicate and sort labels
861        for labels in result.values_mut() {
862            labels.sort();
863            labels.dedup();
864        }
865
866        Ok(result)
867    }
868
869    pub async fn get_all_vertex_props(&self, vid: Vid) -> Result<Properties> {
870        Ok(self
871            .get_all_vertex_props_with_ctx(vid, None)
872            .await?
873            .unwrap_or_default())
874    }
875
876    pub async fn get_all_vertex_props_with_ctx(
877        &self,
878        vid: Vid,
879        ctx: Option<&QueryContext>,
880    ) -> Result<Option<Properties>> {
881        // 1. Check if deleted in any L0 layer
882        if l0_visibility::is_vertex_deleted(vid, ctx) {
883            return Ok(None);
884        }
885
886        // 2. Accumulate properties from L0 layers (oldest to newest)
887        let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
888
889        // 3. Fetch from storage
890        let storage_props_opt = self.fetch_all_props_from_storage(vid).await?;
891
892        // 4. Handle case where vertex doesn't exist in either layer
893        if l0_props.is_none() && storage_props_opt.is_none() {
894            return Ok(None);
895        }
896
897        let mut final_props = l0_props.unwrap_or_default();
898
899        // 5. Merge storage properties (L0 takes precedence)
900        if let Some(storage_props) = storage_props_opt {
901            for (k, v) in storage_props {
902                final_props.entry(k).or_insert(v);
903            }
904        }
905
906        // 6. Normalize CRDT properties - convert JSON strings to JSON objects
907        // In the new storage model, we need to get labels from context/L0
908        if let Some(ctx) = ctx {
909            // Try to get labels from L0 layers
910            let labels = l0_visibility::get_vertex_labels(vid, ctx);
911            for label in &labels {
912                self.normalize_crdt_properties(&mut final_props, label)?;
913            }
914        }
915
916        Ok(Some(final_props))
917    }
918
919    /// Batch-fetch properties for multiple vertices of a known label.
920    ///
921    /// Queries L0 layers in-memory, then fetches remaining VIDs from LanceDB in
922    /// a single `_vid IN (...)` query on the label table. Much faster than
923    /// per-vertex `get_all_vertex_props_with_ctx` when many vertices need loading.
924    pub async fn get_batch_vertex_props_for_label(
925        &self,
926        vids: &[Vid],
927        label: &str,
928        ctx: Option<&QueryContext>,
929    ) -> Result<HashMap<Vid, Properties>> {
930        let mut result: HashMap<Vid, Properties> = HashMap::new();
931        let mut need_storage: Vec<Vid> = Vec::new();
932
933        // Phase 1: Check L0 layers for each VID (fast, in-memory).
934        for &vid in vids {
935            if l0_visibility::is_vertex_deleted(vid, ctx) {
936                continue;
937            }
938            let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
939            if let Some(props) = l0_props {
940                result.insert(vid, props);
941            } else {
942                need_storage.push(vid);
943            }
944        }
945
946        // If everything was resolved from L0, skip storage entirely.
947        if need_storage.is_empty() {
948            // Normalize CRDT properties for L0-resolved vertices.
949            if ctx.is_some() {
950                for props in result.values_mut() {
951                    self.normalize_crdt_properties(props, label)?;
952                }
953            }
954            return Ok(result);
955        }
956
957        // Phase 2: Batch-fetch from LanceDB for remaining VIDs.
958        let schema = self.schema_manager.schema();
959        let label_props = schema.properties.get(label);
960
961        let mut prop_names: Vec<String> = Vec::new();
962        if let Some(props) = label_props {
963            prop_names = props.keys().cloned().collect();
964        }
965
966        let mut columns: Vec<String> = vec![
967            "_vid".to_string(),
968            "_deleted".to_string(),
969            "_version".to_string(),
970        ];
971        columns.extend(prop_names.iter().cloned());
972        columns.push("overflow_json".to_string());
973
974        // Build IN filter for all VIDs at once.
975        let vid_list: String = need_storage
976            .iter()
977            .map(|v| v.as_u64().to_string())
978            .collect::<Vec<_>>()
979            .join(", ");
980        let base_filter = format!("_vid IN ({})", vid_list);
981
982        let filter_expr = self.storage.apply_version_filter(base_filter);
983
984        let table_name = crate::backend::table_names::vertex_table_name(label);
985        let batches: Vec<RecordBatch> = self
986            .storage
987            .backend()
988            .scan(
989                crate::backend::types::ScanRequest::all(&table_name)
990                    .with_filter(&filter_expr)
991                    .with_columns(columns.clone()),
992            )
993            .await?;
994
995        let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
996
997        // Track best version per VID for proper version-based merging.
998        let mut per_vid_best_version: HashMap<Vid, u64> = HashMap::new();
999        let mut per_vid_props: HashMap<Vid, Properties> = HashMap::new();
1000
1001        for batch in batches {
1002            let vid_col = match batch
1003                .column_by_name("_vid")
1004                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1005            {
1006                Some(c) => c,
1007                None => continue,
1008            };
1009            let deleted_col = match batch
1010                .column_by_name("_deleted")
1011                .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1012            {
1013                Some(c) => c,
1014                None => continue,
1015            };
1016            let version_col = match batch
1017                .column_by_name("_version")
1018                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1019            {
1020                Some(c) => c,
1021                None => continue,
1022            };
1023
1024            for row in 0..batch.num_rows() {
1025                let vid = Vid::from(vid_col.value(row));
1026                let version = version_col.value(row);
1027
1028                if deleted_col.value(row) {
1029                    if per_vid_best_version
1030                        .get(&vid)
1031                        .is_none_or(|&best| version >= best)
1032                    {
1033                        per_vid_best_version.insert(vid, version);
1034                        per_vid_props.remove(&vid);
1035                    }
1036                    continue;
1037                }
1038
1039                let mut current_props =
1040                    Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1041
1042                if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1043                    for (k, v) in overflow_props {
1044                        current_props.entry(k).or_insert(v);
1045                    }
1046                }
1047
1048                let best = per_vid_best_version.get(&vid).copied();
1049                let mut best_opt = best;
1050                let mut merged = per_vid_props.remove(&vid);
1051                self.merge_versioned_props(
1052                    current_props,
1053                    version,
1054                    &mut best_opt,
1055                    &mut merged,
1056                    label_props,
1057                )?;
1058                if let Some(v) = best_opt {
1059                    per_vid_best_version.insert(vid, v);
1060                }
1061                if let Some(p) = merged {
1062                    per_vid_props.insert(vid, p);
1063                }
1064            }
1065        }
1066
1067        // Merge storage results with any L0 partial props already in result.
1068        for (vid, storage_props) in per_vid_props {
1069            let entry = result.entry(vid).or_default();
1070            for (k, v) in storage_props {
1071                entry.entry(k).or_insert(v);
1072            }
1073        }
1074
1075        // Mark VIDs that had no data anywhere as absent (don't insert them).
1076        // VIDs not in `result` simply won't appear in the output.
1077
1078        // Phase 3: Normalize CRDT properties.
1079        if ctx.is_some() {
1080            for props in result.values_mut() {
1081                self.normalize_crdt_properties(props, label)?;
1082            }
1083        }
1084
1085        Ok(result)
1086    }
1087
1088    /// Batch-fetch properties for multiple edges of a known type.
1089    ///
1090    /// Mirrors `get_batch_vertex_props_for_label` (above) for the edge path.
1091    /// Issues one `eid IN (...)` scan against the delta table for the edge
1092    /// type, replaying per-EID version history (op-replay + CRDT merge) the
1093    /// same way `fetch_all_edge_props_from_storage_with_hint` does. Far
1094    /// faster than per-edge `get_all_edge_props_with_ctx` when many edges
1095    /// of the same type need loading (e.g., batched SET/REMOVE on edges
1096    /// matched by a MATCH).
1097    ///
1098    /// EIDs of deleted edges or those with no rows in delta storage are
1099    /// omitted from the returned map; callers can fall back to the per-EID
1100    /// path for misses.
1101    pub async fn get_batch_edge_props_for_type(
1102        &self,
1103        eids: &[Eid],
1104        type_name: &str,
1105        ctx: Option<&QueryContext>,
1106    ) -> Result<HashMap<Eid, Properties>> {
1107        use crate::backend::table_names;
1108        use crate::backend::types::ScanRequest;
1109
1110        let mut result: HashMap<Eid, Properties> = HashMap::new();
1111        if eids.is_empty() {
1112            return Ok(result);
1113        }
1114
1115        // Phase 1: L0 check per EID. Skip deleted; serve from L0 if it has
1116        // an accumulated property set; otherwise note for storage scan.
1117        let mut need_storage: Vec<Eid> = Vec::new();
1118        for &eid in eids {
1119            if l0_visibility::is_edge_deleted(eid, ctx) {
1120                continue;
1121            }
1122            let l0_props = l0_visibility::accumulate_edge_props(eid, ctx);
1123            // Edge L0 semantics: even an empty accumulator means "edge exists
1124            // in L0 with no user props yet" — we still go to storage to pick
1125            // up the persisted full row (mirrors get_all_edge_props_with_ctx).
1126            if let Some(props) = l0_props {
1127                result.insert(eid, props);
1128            }
1129            need_storage.push(eid);
1130        }
1131
1132        if need_storage.is_empty() {
1133            return Ok(result);
1134        }
1135
1136        // Phase 2: One scan with `eid IN (...)` on the delta table.
1137        let schema = self.schema_manager.schema();
1138        let type_props = schema.properties.get(type_name);
1139
1140        if self.storage.delta_dataset(type_name, "fwd").is_err() {
1141            return Ok(result);
1142        }
1143
1144        let table_name = table_names::delta_table_name(type_name, "fwd");
1145        let backend = self.storage.backend();
1146        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1147            return Ok(result);
1148        }
1149
1150        let eid_list: String = need_storage
1151            .iter()
1152            .map(|e| e.as_u64().to_string())
1153            .collect::<Vec<_>>()
1154            .join(", ");
1155        let base_filter = format!("eid IN ({})", eid_list);
1156        let filter_expr = self.storage.apply_version_filter(base_filter);
1157
1158        let batches = match backend
1159            .scan(ScanRequest::all(&table_name).with_filter(filter_expr))
1160            .await
1161        {
1162            Ok(b) => b,
1163            Err(_) => return Ok(result), // Storage error: per-row fallback handles correctness
1164        };
1165
1166        // Collect (eid, version, op, props) tuples, then group + replay per EID.
1167        let mut per_eid_rows: HashMap<Eid, Vec<(u64, u8, Properties)>> = HashMap::new();
1168        for batch in batches {
1169            let eid_col = match batch
1170                .column_by_name("eid")
1171                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1172            {
1173                Some(c) => c,
1174                None => continue,
1175            };
1176            let op_col = match batch
1177                .column_by_name("op")
1178                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt8Array>())
1179            {
1180                Some(c) => c,
1181                None => continue,
1182            };
1183            let ver_col = match batch
1184                .column_by_name("_version")
1185                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1186            {
1187                Some(c) => c,
1188                None => continue,
1189            };
1190
1191            for row in 0..batch.num_rows() {
1192                let eid = Eid::from(eid_col.value(row));
1193                let ver = ver_col.value(row);
1194                let op = op_col.value(row);
1195                let mut props = Properties::new();
1196
1197                if op != 1
1198                    && let Some(tp) = type_props
1199                {
1200                    for (p_name, p_meta) in tp {
1201                        if let Some(col) = batch.column_by_name(p_name)
1202                            && !col.is_null(row)
1203                        {
1204                            let val = Self::value_from_column(col.as_ref(), &p_meta.r#type, row)?;
1205                            props.insert(p_name.clone(), val);
1206                        }
1207                    }
1208                }
1209                per_eid_rows.entry(eid).or_default().push((ver, op, props));
1210            }
1211        }
1212
1213        for (eid, mut rows) in per_eid_rows {
1214            rows.sort_by_key(|(ver, _, _)| *ver);
1215
1216            let mut merged_props: Properties = Properties::new();
1217            let mut is_deleted = false;
1218
1219            for (_, op, props) in rows {
1220                if op == 1 {
1221                    is_deleted = true;
1222                    merged_props.clear();
1223                } else {
1224                    is_deleted = false;
1225                    for (p_name, p_val) in props {
1226                        let is_crdt = type_props
1227                            .and_then(|tp| tp.get(&p_name))
1228                            .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1229                            .unwrap_or(false);
1230                        if is_crdt {
1231                            if let Some(existing) = merged_props.get(&p_name) {
1232                                if let Ok(merged) = self.merge_crdt_values(existing, &p_val) {
1233                                    merged_props.insert(p_name, merged);
1234                                }
1235                            } else {
1236                                merged_props.insert(p_name, p_val);
1237                            }
1238                        } else {
1239                            merged_props.insert(p_name, p_val);
1240                        }
1241                    }
1242                }
1243            }
1244
1245            if is_deleted {
1246                // Deleted in storage; remove any L0 accumulation that may
1247                // have been recorded under this EID by Phase 1 (matches
1248                // is_edge_deleted single-EID semantics).
1249                result.remove(&eid);
1250                continue;
1251            }
1252
1253            // L0 takes precedence over storage for shared keys; insert
1254            // storage values only where L0 did not already provide them.
1255            let entry = result.entry(eid).or_default();
1256            for (k, v) in merged_props {
1257                entry.entry(k).or_insert(v);
1258            }
1259        }
1260
1261        // Schemaless / overflow edge props live in the main edges table's
1262        // `props_json`, NOT in the per-type delta columns — so the delta replay
1263        // above recovers only typed columns and leaves a schemaless edge (or any
1264        // prop absent from the type schema) empty here. Mirror the single-EID
1265        // `fetch_all_edge_props_from_storage` fallback: for any requested EID
1266        // still unresolved (no entry, or an empty one) fall back to the main
1267        // edges table. Without this, a fork SET/REMOVE on an inherited schemaless
1268        // relationship read an empty prefetch and wiped the edge's untouched
1269        // properties (#102). Only misses pay the per-EID lookup, so the batch
1270        // fast-path is preserved for fully-typed edges.
1271        use crate::storage::main_edge::MainEdgeDataset;
1272        for &eid in eids {
1273            if l0_visibility::is_edge_deleted(eid, ctx) {
1274                continue;
1275            }
1276            let needs_fallback = result.get(&eid).is_none_or(|p| p.is_empty());
1277            if !needs_fallback {
1278                continue;
1279            }
1280            if let Some(props) =
1281                MainEdgeDataset::find_props_by_eid(self.storage.backend(), eid).await?
1282            {
1283                let entry = result.entry(eid).or_default();
1284                for (k, v) in props {
1285                    entry.entry(k).or_insert(v);
1286                }
1287            }
1288        }
1289
1290        Ok(result)
1291    }
1292
1293    /// Normalize CRDT properties by converting JSON strings to JSON objects.
1294    /// This handles the case where CRDT values come from Cypher CREATE statements
1295    /// as `Value::String("{\"t\": \"gc\", ...}")` and need to be parsed into objects.
1296    fn normalize_crdt_properties(&self, props: &mut Properties, label: &str) -> Result<()> {
1297        let schema = self.schema_manager.schema();
1298        let label_props = match schema.properties.get(label) {
1299            Some(p) => p,
1300            None => return Ok(()),
1301        };
1302
1303        for (prop_name, prop_meta) in label_props {
1304            if let DataType::Crdt(_) = prop_meta.r#type
1305                && let Some(val) = props.get_mut(prop_name)
1306            {
1307                *val = Value::from(Self::parse_crdt_value(val)?);
1308            }
1309        }
1310
1311        Ok(())
1312    }
1313
1314    /// Extract properties from a single batch row.
1315    fn extract_row_properties(
1316        batch: &RecordBatch,
1317        row: usize,
1318        prop_names: &[&str],
1319        label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1320    ) -> Result<Properties> {
1321        let mut props = Properties::new();
1322        for name in prop_names {
1323            let col = match batch.column_by_name(name) {
1324                Some(col) => col,
1325                None => continue,
1326            };
1327            if col.is_null(row) {
1328                continue;
1329            }
1330            if let Some(prop_meta) = label_props.and_then(|p| p.get(*name)) {
1331                let val = Self::value_from_column(col.as_ref(), &prop_meta.r#type, row)?;
1332                props.insert((*name).to_string(), val);
1333            }
1334        }
1335        Ok(props)
1336    }
1337
1338    /// Extract overflow properties from the overflow_json column.
1339    ///
1340    /// Returns None if the column doesn't exist or the value is null,
1341    /// otherwise parses the JSON blob and returns the properties.
1342    fn extract_overflow_properties(batch: &RecordBatch, row: usize) -> Result<Option<Properties>> {
1343        use arrow_array::LargeBinaryArray;
1344
1345        let overflow_col = match batch.column_by_name("overflow_json") {
1346            Some(col) => col,
1347            None => return Ok(None), // Column doesn't exist (old schema)
1348        };
1349
1350        if overflow_col.is_null(row) {
1351            return Ok(None);
1352        }
1353
1354        let binary_array = overflow_col
1355            .as_any()
1356            .downcast_ref::<LargeBinaryArray>()
1357            .ok_or_else(|| anyhow!("overflow_json is not LargeBinaryArray"))?;
1358
1359        let jsonb_bytes = binary_array.value(row);
1360
1361        // Decode the CypherValue blob directly to `Value`. Routing through
1362        // `serde_json` would stringify temporal values (and is unnecessary —
1363        // the blob already decodes to a `Value::Map`).
1364        match uni_common::cypher_value_codec::decode(jsonb_bytes)
1365            .map_err(|e| anyhow!("Failed to decode CypherValue: {}", e))?
1366        {
1367            Value::Map(map) => Ok(Some(map)),
1368            Value::Null => Ok(None),
1369            other => Err(anyhow!(
1370                "overflow_json decoded to a non-map value: {other:?}"
1371            )),
1372        }
1373    }
1374
1375    /// Merge overflow properties from the overflow_json column into an existing props map.
1376    ///
1377    /// Handles two concerns:
1378    /// 1. If `overflow_json` is explicitly requested in `properties`, stores the raw JSONB
1379    ///    bytes as a JSON array of u8 values.
1380    /// 2. Extracts individual overflow properties and merges those that are in `properties`.
1381    fn merge_overflow_into_props(
1382        batch: &RecordBatch,
1383        row: usize,
1384        properties: &[&str],
1385        props: &mut Properties,
1386    ) -> Result<()> {
1387        use arrow_array::LargeBinaryArray;
1388
1389        let overflow_col = match batch.column_by_name("overflow_json") {
1390            Some(col) if !col.is_null(row) => col,
1391            _ => return Ok(()),
1392        };
1393
1394        // Store raw JSONB bytes if explicitly requested
1395        if properties.contains(&"overflow_json")
1396            && let Some(binary_array) = overflow_col.as_any().downcast_ref::<LargeBinaryArray>()
1397        {
1398            let jsonb_bytes = binary_array.value(row);
1399            let bytes_list: Vec<Value> =
1400                jsonb_bytes.iter().map(|&b| Value::Int(b as i64)).collect();
1401            props.insert("overflow_json".to_string(), Value::List(bytes_list));
1402        }
1403
1404        // Extract and merge individual overflow properties
1405        if let Some(overflow_props) = Self::extract_overflow_properties(batch, row)? {
1406            for (k, v) in overflow_props {
1407                if properties.contains(&k.as_str()) {
1408                    props.entry(k).or_insert(v);
1409                }
1410            }
1411        }
1412
1413        Ok(())
1414    }
1415
1416    /// Merge CRDT properties from source into target.
1417    fn merge_crdt_into(
1418        &self,
1419        target: &mut Properties,
1420        source: Properties,
1421        label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1422        crdt_only: bool,
1423    ) -> Result<()> {
1424        for (k, v) in source {
1425            if let Some(prop_meta) = label_props.and_then(|p| p.get(&k)) {
1426                if let DataType::Crdt(_) = prop_meta.r#type {
1427                    let existing_v = target.entry(k).or_insert(Value::Null);
1428                    *existing_v = self.merge_crdt_values(existing_v, &v)?;
1429                } else if !crdt_only {
1430                    target.insert(k, v);
1431                }
1432            }
1433        }
1434        Ok(())
1435    }
1436
1437    /// Handle version-based property merging for storage fetch.
1438    fn merge_versioned_props(
1439        &self,
1440        current_props: Properties,
1441        version: u64,
1442        best_version: &mut Option<u64>,
1443        best_props: &mut Option<Properties>,
1444        label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1445    ) -> Result<()> {
1446        if best_version.is_none_or(|best| version > best) {
1447            // Newest version: strictly newer
1448            if let Some(mut existing_props) = best_props.take() {
1449                // Merge CRDTs from existing into current
1450                let mut merged = current_props;
1451                for (k, v) in merged.iter_mut() {
1452                    if let Some(prop_meta) = label_props.and_then(|p| p.get(k))
1453                        && let DataType::Crdt(_) = prop_meta.r#type
1454                        && let Some(existing_val) = existing_props.remove(k)
1455                    {
1456                        *v = self.merge_crdt_values(v, &existing_val)?;
1457                    }
1458                }
1459                *best_props = Some(merged);
1460            } else {
1461                *best_props = Some(current_props);
1462            }
1463            *best_version = Some(version);
1464        } else if Some(version) == *best_version {
1465            // Same version: merge all properties
1466            if let Some(existing_props) = best_props.as_mut() {
1467                self.merge_crdt_into(existing_props, current_props, label_props, false)?;
1468            } else {
1469                *best_props = Some(current_props);
1470            }
1471        } else {
1472            // Older version: only merge CRDTs
1473            if let Some(existing_props) = best_props.as_mut() {
1474                self.merge_crdt_into(existing_props, current_props, label_props, true)?;
1475            }
1476        }
1477        Ok(())
1478    }
1479
1480    async fn fetch_all_props_from_storage(&self, vid: Vid) -> Result<Option<Properties>> {
1481        // In the new storage model, VID doesn't embed label info.
1482        // We need to scan all label datasets to find the vertex's properties.
1483        let schema = self.schema_manager.schema();
1484        let mut merged_props: Option<Properties> = None;
1485        let mut global_best_version: Option<u64> = None;
1486
1487        // Try VidLabelsIndex for O(1) label resolution
1488        let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1489        {
1490            labels
1491        } else {
1492            schema.labels.keys().cloned().collect() // Fallback to full scan
1493        };
1494
1495        for label_name in &label_names {
1496            let label_props = schema.properties.get(label_name);
1497
1498            // Get property names from schema
1499            let mut prop_names: Vec<String> = Vec::new();
1500            if let Some(props) = label_props {
1501                prop_names = props.keys().cloned().collect();
1502            }
1503
1504            // Build column selection
1505            let mut columns: Vec<String> = vec!["_deleted".to_string(), "_version".to_string()];
1506            columns.extend(prop_names.iter().cloned());
1507            // Add overflow_json column to fetch non-schema properties
1508            columns.push("overflow_json".to_string());
1509
1510            // Query using backend scan API
1511            let base_filter = format!("_vid = {}", vid.as_u64());
1512
1513            let filter_expr = self.storage.apply_version_filter(base_filter);
1514
1515            let table_name = crate::backend::table_names::vertex_table_name(label_name);
1516            let batches: Vec<RecordBatch> = match self
1517                .storage
1518                .backend()
1519                .scan(
1520                    crate::backend::types::ScanRequest::all(&table_name)
1521                        .with_filter(&filter_expr)
1522                        .with_columns(columns.clone()),
1523                )
1524                .await
1525            {
1526                Ok(b) => b,
1527                Err(_) => continue,
1528            };
1529
1530            // Convert Vec<String> to Vec<&str> for downstream use
1531            let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
1532
1533            for batch in batches {
1534                let deleted_col = match batch
1535                    .column_by_name("_deleted")
1536                    .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1537                {
1538                    Some(c) => c,
1539                    None => continue,
1540                };
1541                let version_col = match batch
1542                    .column_by_name("_version")
1543                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1544                {
1545                    Some(c) => c,
1546                    None => continue,
1547                };
1548
1549                for row in 0..batch.num_rows() {
1550                    let version = version_col.value(row);
1551
1552                    if deleted_col.value(row) {
1553                        if global_best_version.is_none_or(|best| version >= best) {
1554                            global_best_version = Some(version);
1555                            merged_props = None;
1556                        }
1557                        continue;
1558                    }
1559
1560                    let mut current_props =
1561                        Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1562
1563                    // Also extract overflow properties from overflow_json column
1564                    if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1565                        // Merge overflow properties into current_props
1566                        for (k, v) in overflow_props {
1567                            current_props.entry(k).or_insert(v);
1568                        }
1569                    }
1570
1571                    self.merge_versioned_props(
1572                        current_props,
1573                        version,
1574                        &mut global_best_version,
1575                        &mut merged_props,
1576                        label_props,
1577                    )?;
1578                }
1579            }
1580        }
1581
1582        // Fallback to main table props_json for unknown/schemaless labels.
1583        // Gated on "no per-label verdict" (neither a live row nor a tombstone
1584        // was seen), so a per-label deletion tombstone is never overridden by
1585        // an older main-table row.
1586        if merged_props.is_none()
1587            && global_best_version.is_none()
1588            && let Some(main_props) = MainVertexDataset::find_props_by_vid(
1589                self.storage.backend(),
1590                vid,
1591                self.storage.version_high_water_mark(),
1592            )
1593            .await?
1594        {
1595            return Ok(Some(main_props));
1596        }
1597
1598        Ok(merged_props)
1599    }
1600
1601    pub async fn get_vertex_prop(&self, vid: Vid, prop: &str) -> Result<Value> {
1602        self.get_vertex_prop_with_ctx(vid, prop, None).await
1603    }
1604
1605    #[instrument(skip(self, ctx), level = "trace")]
1606    pub async fn get_vertex_prop_with_ctx(
1607        &self,
1608        vid: Vid,
1609        prop: &str,
1610        ctx: Option<&QueryContext>,
1611    ) -> Result<Value> {
1612        // 1. Check if deleted in any L0 layer
1613        if l0_visibility::is_vertex_deleted(vid, ctx) {
1614            return Ok(Value::Null);
1615        }
1616
1617        // 2. Determine if property is CRDT type
1618        // First check labels from context/L0, then fall back to scanning all labels in schema
1619        let schema = self.schema_manager.schema();
1620        let labels = ctx
1621            .map(|c| l0_visibility::get_vertex_labels(vid, c))
1622            .unwrap_or_default();
1623
1624        let is_crdt = if !labels.is_empty() {
1625            // Check labels from context
1626            labels.iter().any(|ln| {
1627                schema
1628                    .properties
1629                    .get(ln)
1630                    .and_then(|lp| lp.get(prop))
1631                    .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1632                    .unwrap_or(false)
1633            })
1634        } else {
1635            // No labels from context - check if property is CRDT in ANY label
1636            schema.properties.values().any(|label_props| {
1637                label_props
1638                    .get(prop)
1639                    .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1640                    .unwrap_or(false)
1641            })
1642        };
1643
1644        // 3. Check L0 chain for property
1645        if is_crdt {
1646            // For CRDT, accumulate and merge values from all L0 layers
1647            let l0_val = self.accumulate_crdt_from_l0(vid, prop, ctx)?;
1648            return self.finalize_crdt_lookup(vid, prop, l0_val).await;
1649        }
1650
1651        // 4. Non-CRDT: Check L0 chain for property (returns first found)
1652        if let Some(val) = l0_visibility::lookup_vertex_prop(vid, prop, ctx) {
1653            return Ok(val);
1654        }
1655
1656        // 5. Check Cache (if enabled)
1657        if let Some(ref cache) = self.vertex_cache {
1658            let mut cache = cache.lock().await;
1659            if let Some(val) = cache.get(&(vid, prop.to_string())) {
1660                debug!(vid = ?vid, prop, "Cache HIT");
1661                metrics::counter!("uni_property_cache_hits_total", "type" => "vertex").increment(1);
1662                return Ok(val.clone());
1663            } else {
1664                debug!(vid = ?vid, prop, "Cache MISS");
1665                metrics::counter!("uni_property_cache_misses_total", "type" => "vertex")
1666                    .increment(1);
1667            }
1668        }
1669
1670        // 6. Fetch from Storage
1671        let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1672
1673        // 7. Update Cache (if enabled)
1674        if let Some(ref cache) = self.vertex_cache {
1675            let mut cache = cache.lock().await;
1676            cache.put((vid, prop.to_string()), storage_val.clone());
1677        }
1678
1679        Ok(storage_val)
1680    }
1681
1682    /// Accumulate CRDT values from all L0 layers by merging them together.
1683    fn accumulate_crdt_from_l0(
1684        &self,
1685        vid: Vid,
1686        prop: &str,
1687        ctx: Option<&QueryContext>,
1688    ) -> Result<Value> {
1689        let mut merged = Value::Null;
1690        l0_visibility::visit_l0_buffers(ctx, |l0| {
1691            if let Some(props) = l0.vertex_properties.get(&vid)
1692                && let Some(val) = props.get(prop)
1693            {
1694                // Note: merge_crdt_values can't fail in practice for valid CRDTs
1695                if let Ok(new_merged) = self.merge_crdt_values(&merged, val) {
1696                    merged = new_merged;
1697                }
1698            }
1699            false // Continue visiting all layers
1700        });
1701        Ok(merged)
1702    }
1703
1704    /// Finalize CRDT lookup by merging with cache/storage.
1705    async fn finalize_crdt_lookup(&self, vid: Vid, prop: &str, l0_val: Value) -> Result<Value> {
1706        // Check Cache (if enabled)
1707        let cached_val = if let Some(ref cache) = self.vertex_cache {
1708            let mut cache = cache.lock().await;
1709            cache.get(&(vid, prop.to_string())).cloned()
1710        } else {
1711            None
1712        };
1713
1714        if let Some(val) = cached_val {
1715            let merged = self.merge_crdt_values(&val, &l0_val)?;
1716            return Ok(merged);
1717        }
1718
1719        // Fetch from Storage
1720        let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1721
1722        // Update Cache (if enabled)
1723        if let Some(ref cache) = self.vertex_cache {
1724            let mut cache = cache.lock().await;
1725            cache.put((vid, prop.to_string()), storage_val.clone());
1726        }
1727
1728        // Merge L0 + Storage
1729        self.merge_crdt_values(&storage_val, &l0_val)
1730    }
1731
1732    async fn fetch_prop_from_storage(&self, vid: Vid, prop: &str) -> Result<Value> {
1733        // In the new storage model, VID doesn't embed label info.
1734        // We need to scan all label datasets to find the property.
1735        let schema = self.schema_manager.schema();
1736        let mut best_version: Option<u64> = None;
1737        let mut best_value: Option<Value> = None;
1738
1739        // Try VidLabelsIndex for O(1) label resolution
1740        let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1741        {
1742            labels
1743        } else {
1744            schema.labels.keys().cloned().collect() // Fallback to full scan
1745        };
1746
1747        for label_name in &label_names {
1748            // Check if property is defined in schema for this label
1749            let prop_meta = schema
1750                .properties
1751                .get(label_name)
1752                .and_then(|props| props.get(prop));
1753
1754            // Even if property is not in schema, we still check overflow_json
1755
1756            // Query using backend scan API
1757            let base_filter = format!("_vid = {}", vid.as_u64());
1758
1759            let filter_expr = self.storage.apply_version_filter(base_filter);
1760
1761            // Always request metadata columns and overflow_json
1762            let mut columns = vec![
1763                "_deleted".to_string(),
1764                "_version".to_string(),
1765                "overflow_json".to_string(),
1766            ];
1767
1768            // Only request the property column if it's defined in schema
1769            if prop_meta.is_some() {
1770                columns.push(prop.to_string());
1771            }
1772
1773            let table_name = crate::backend::table_names::vertex_table_name(label_name);
1774            let batches: Vec<RecordBatch> = match self
1775                .storage
1776                .backend()
1777                .scan(
1778                    crate::backend::types::ScanRequest::all(&table_name)
1779                        .with_filter(&filter_expr)
1780                        .with_columns(columns),
1781                )
1782                .await
1783            {
1784                Ok(b) => b,
1785                Err(_) => continue,
1786            };
1787
1788            for batch in batches {
1789                let deleted_col = match batch
1790                    .column_by_name("_deleted")
1791                    .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1792                {
1793                    Some(c) => c,
1794                    None => continue,
1795                };
1796                let version_col = match batch
1797                    .column_by_name("_version")
1798                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1799                {
1800                    Some(c) => c,
1801                    None => continue,
1802                };
1803                for row in 0..batch.num_rows() {
1804                    let version = version_col.value(row);
1805
1806                    if deleted_col.value(row) {
1807                        if best_version.is_none_or(|best| version >= best) {
1808                            best_version = Some(version);
1809                            best_value = None;
1810                        }
1811                        continue;
1812                    }
1813
1814                    // First try schema column if property is in schema
1815                    let mut val = None;
1816                    if let Some(meta) = prop_meta
1817                        && let Some(col) = batch.column_by_name(prop)
1818                    {
1819                        val = Some(if col.is_null(row) {
1820                            Value::Null
1821                        } else {
1822                            Self::value_from_column(col, &meta.r#type, row)?
1823                        });
1824                    }
1825
1826                    // If not in schema column, check overflow_json
1827                    if val.is_none()
1828                        && let Some(overflow_props) =
1829                            Self::extract_overflow_properties(&batch, row)?
1830                        && let Some(overflow_val) = overflow_props.get(prop)
1831                    {
1832                        val = Some(overflow_val.clone());
1833                    }
1834
1835                    // If we found a value (from schema or overflow), merge it
1836                    if let Some(v) = val {
1837                        if let Some(meta) = prop_meta {
1838                            // Use schema type for merging (handles CRDT)
1839                            self.merge_prop_value(
1840                                v,
1841                                version,
1842                                &meta.r#type,
1843                                &mut best_version,
1844                                &mut best_value,
1845                            )?;
1846                        } else {
1847                            // Overflow property: use simple LWW merging
1848                            if best_version.is_none_or(|best| version >= best) {
1849                                best_version = Some(version);
1850                                best_value = Some(v);
1851                            }
1852                        }
1853                    }
1854                }
1855            }
1856        }
1857
1858        // Fallback to main-table props_json for unknown/schemaless labels —
1859        // their rows have no per-label table at all (mirrors
1860        // `fetch_all_props_from_storage`). Gated on "no per-label verdict"
1861        // (neither a live value nor a tombstone was seen), so a per-label
1862        // tombstone is never overridden by an older main-table row.
1863        if best_value.is_none()
1864            && best_version.is_none()
1865            && let Some(main_props) = MainVertexDataset::find_props_by_vid(
1866                self.storage.backend(),
1867                vid,
1868                self.storage.version_high_water_mark(),
1869            )
1870            .await?
1871        {
1872            return Ok(main_props.get(prop).cloned().unwrap_or(Value::Null));
1873        }
1874
1875        Ok(best_value.unwrap_or(Value::Null))
1876    }
1877
1878    /// Decode an Arrow column value with strict CRDT error handling.
1879    pub fn value_from_column(col: &dyn Array, data_type: &DataType, row: usize) -> Result<Value> {
1880        crate::storage::value_codec::decode_column_value(
1881            col,
1882            data_type,
1883            row,
1884            CrdtDecodeMode::Strict,
1885        )
1886    }
1887
1888    /// Merge two `Value`-wrapped CRDT operands.
1889    ///
1890    /// Routes through [`uni_crdt::Crdt::merge_via_registry`] using the
1891    /// `PropertyManager`'s `plugin_registry`. With an empty registry
1892    /// (the legacy 3-arg [`Self::new`] default) `merge_via_registry`
1893    /// falls back to `Crdt::try_merge`, preserving native semantics.
1894    ///
1895    /// # Errors
1896    ///
1897    /// Returns an `anyhow::Error` when either operand is malformed
1898    /// CRDT JSON, the variants disagree, or the registry-dispatched
1899    /// merge surfaces a `CrdtError`.
1900    pub fn merge_crdt_values(&self, a: &Value, b: &Value) -> Result<Value> {
1901        // Handle the case where values are JSON strings containing CRDT JSON
1902        // (this happens when values come from Cypher CREATE statements)
1903        // Parse before checking for null to ensure proper format conversion
1904        if a.is_null() {
1905            return Self::parse_crdt_value(b).map(Value::from);
1906        }
1907        if b.is_null() {
1908            return Self::parse_crdt_value(a).map(Value::from);
1909        }
1910
1911        let a_parsed = Self::parse_crdt_value(a)?;
1912        let b_parsed = Self::parse_crdt_value(b)?;
1913
1914        let mut crdt_a: Crdt = serde_json::from_value(a_parsed)?;
1915        let crdt_b: Crdt = serde_json::from_value(b_parsed)?;
1916        // M10 follow-up: route through `merge_via_registry` so a
1917        // hot-reloaded `CrdtKindProvider` plugin can intercept the
1918        // merge. With an empty registry (the 3-arg `new()` default)
1919        // this falls back to `Crdt::try_merge`, preserving prior
1920        // behavior bit-for-bit.
1921        crdt_a
1922            .merge_via_registry(&crdt_b, &self.plugin_registry)
1923            .map_err(|e| anyhow::anyhow!("{e}"))?;
1924        Ok(Value::from(serde_json::to_value(crdt_a)?))
1925    }
1926
1927    /// Parse a CRDT value that may be either a JSON object or a JSON string containing JSON.
1928    /// Returns `serde_json::Value` for internal CRDT processing.
1929    fn parse_crdt_value(val: &Value) -> Result<serde_json::Value> {
1930        if let Value::String(s) = val {
1931            // Value is a JSON string - parse the string content as JSON
1932            serde_json::from_str(s).map_err(|e| anyhow!("Failed to parse CRDT JSON string: {}", e))
1933        } else {
1934            // Convert uni_common::Value to serde_json::Value for CRDT processing
1935            Ok(serde_json::Value::from(val.clone()))
1936        }
1937    }
1938
1939    /// Merge a property value based on version, handling CRDT vs LWW semantics.
1940    fn merge_prop_value(
1941        &self,
1942        val: Value,
1943        version: u64,
1944        data_type: &DataType,
1945        best_version: &mut Option<u64>,
1946        best_value: &mut Option<Value>,
1947    ) -> Result<()> {
1948        if let DataType::Crdt(_) = data_type {
1949            self.merge_crdt_prop_value(val, version, best_version, best_value)
1950        } else {
1951            // Standard LWW
1952            if best_version.is_none_or(|best| version >= best) {
1953                *best_version = Some(version);
1954                *best_value = Some(val);
1955            }
1956            Ok(())
1957        }
1958    }
1959
1960    /// Merge CRDT property values across versions (CRDTs merge regardless of version).
1961    fn merge_crdt_prop_value(
1962        &self,
1963        val: Value,
1964        version: u64,
1965        best_version: &mut Option<u64>,
1966        best_value: &mut Option<Value>,
1967    ) -> Result<()> {
1968        if best_version.is_none_or(|best| version > best) {
1969            // Newer version: merge with existing if present
1970            if let Some(existing) = best_value.take() {
1971                *best_value = Some(self.merge_crdt_values(&val, &existing)?);
1972            } else {
1973                *best_value = Some(val);
1974            }
1975            *best_version = Some(version);
1976        } else if Some(version) == *best_version {
1977            // Same version: merge
1978            let existing = best_value.get_or_insert(Value::Null);
1979            *existing = self.merge_crdt_values(existing, &val)?;
1980        } else {
1981            // Older version: still merge for CRDTs
1982            if let Some(existing) = best_value.as_mut() {
1983                *existing = self.merge_crdt_values(existing, &val)?;
1984            }
1985        }
1986        Ok(())
1987    }
1988}