Skip to main content

uni_store/runtime/
property_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::l0::L0Buffer;
6use crate::runtime::l0_visibility;
7use crate::storage::main_vertex::MainVertexDataset;
8use crate::storage::manager::StorageManager;
9use crate::storage::value_codec::CrdtDecodeMode;
10use anyhow::{Result, anyhow};
11use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
12use lru::LruCache;
13use metrics;
14use std::collections::HashMap;
15use std::num::NonZeroUsize;
16use std::sync::Arc;
17use tokio::sync::Mutex;
18use tracing::{debug, instrument, warn};
19use uni_common::Properties;
20use uni_common::Value;
21use uni_common::core::id::{Eid, Vid};
22use uni_common::core::schema::{DataType, SchemaManager};
23use uni_crdt::Crdt;
24
25pub struct PropertyManager {
26    storage: Arc<StorageManager>,
27    schema_manager: Arc<SchemaManager>,
28    /// Plugin registry consulted by CRDT merges via
29    /// [`uni_crdt::Crdt::merge_via_registry`]. The legacy 3-arg
30    /// [`Self::new`] passes an empty registry so callers that don't
31    /// wire plugin dispatch keep getting bit-identical native
32    /// behavior (empty registry → `merge_via_registry`'s native
33    /// fallback). Production paths in `UniInner` use
34    /// [`Self::with_plugin_registry`] to share the host's registry.
35    plugin_registry: Arc<uni_plugin::PluginRegistry>,
36    /// Cache is None when capacity=0 (caching disabled)
37    vertex_cache: Option<Mutex<LruCache<(Vid, String), Value>>>,
38    edge_cache: Option<Mutex<LruCache<(uni_common::core::id::Eid, String), Value>>>,
39    cache_capacity: usize,
40}
41
42impl PropertyManager {
43    /// Construct a `PropertyManager` with an empty plugin registry.
44    ///
45    /// Back-compat shim for the ~17 algorithm and test call sites that
46    /// don't need registry-dispatched CRDT merges. Equivalent to
47    /// [`Self::with_plugin_registry`] with `Arc::new(PluginRegistry::new())`.
48    pub fn new(
49        storage: Arc<StorageManager>,
50        schema_manager: Arc<SchemaManager>,
51        capacity: usize,
52    ) -> Self {
53        Self::with_plugin_registry(
54            storage,
55            schema_manager,
56            capacity,
57            Arc::new(uni_plugin::PluginRegistry::new()),
58        )
59    }
60
61    /// Construct a `PropertyManager` wired to a shared `PluginRegistry`.
62    ///
63    /// CRDT merges in this `PropertyManager` consult `plugin_registry`
64    /// for `CrdtKindProvider`s matching each `Crdt::kind()`; matched
65    /// kinds dispatch through the provider (so hot-reloaded plugins
66    /// take effect immediately), unmatched kinds fall back to the
67    /// native `Crdt::try_merge`.
68    pub fn with_plugin_registry(
69        storage: Arc<StorageManager>,
70        schema_manager: Arc<SchemaManager>,
71        capacity: usize,
72        plugin_registry: Arc<uni_plugin::PluginRegistry>,
73    ) -> Self {
74        // Capacity of 0 disables caching
75        let (vertex_cache, edge_cache) = if capacity == 0 {
76            (None, None)
77        } else {
78            let cap = NonZeroUsize::new(capacity).unwrap();
79            (
80                Some(Mutex::new(LruCache::new(cap))),
81                Some(Mutex::new(LruCache::new(cap))),
82            )
83        };
84
85        Self {
86            storage,
87            schema_manager,
88            plugin_registry,
89            vertex_cache,
90            edge_cache,
91            cache_capacity: capacity,
92        }
93    }
94
95    pub fn cache_size(&self) -> usize {
96        self.cache_capacity
97    }
98
99    /// Check if caching is enabled
100    pub fn caching_enabled(&self) -> bool {
101        self.cache_capacity > 0
102    }
103
104    /// Clear all caches.
105    /// Call this when L0 is rotated, flushed, or compaction occurs to prevent stale reads.
106    pub async fn clear_cache(&self) {
107        if let Some(ref cache) = self.vertex_cache {
108            cache.lock().await.clear();
109        }
110        if let Some(ref cache) = self.edge_cache {
111            cache.lock().await.clear();
112        }
113    }
114
115    /// Invalidate a specific vertex's cached properties.
116    pub async fn invalidate_vertex(&self, _vid: Vid) {
117        if let Some(ref cache) = self.vertex_cache {
118            let mut cache = cache.lock().await;
119            // LruCache doesn't have a way to iterate and remove, so we pop entries
120            // that match the vid. This is O(n) but necessary for targeted invalidation.
121            // For simplicity, clear the entire cache - LRU will repopulate as needed.
122            cache.clear();
123        }
124    }
125
126    /// Invalidate a specific edge's cached properties.
127    pub async fn invalidate_edge(&self, _eid: uni_common::core::id::Eid) {
128        if let Some(ref cache) = self.edge_cache {
129            let mut cache = cache.lock().await;
130            // Same approach as invalidate_vertex
131            cache.clear();
132        }
133    }
134
135    #[instrument(skip(self, ctx), level = "trace")]
136    pub async fn get_edge_prop(
137        &self,
138        eid: uni_common::core::id::Eid,
139        prop: &str,
140        ctx: Option<&QueryContext>,
141    ) -> Result<Value> {
142        // 1. Check if deleted in any L0 layer
143        if l0_visibility::is_edge_deleted(eid, ctx) {
144            return Ok(Value::Null);
145        }
146
147        // 2. Check L0 chain for property (transaction -> main -> pending)
148        if let Some(val) = l0_visibility::lookup_edge_prop(eid, prop, ctx) {
149            return Ok(val);
150        }
151
152        // 3. Check Cache (if enabled)
153        if let Some(ref cache) = self.edge_cache {
154            let mut cache = cache.lock().await;
155            if let Some(val) = cache.get(&(eid, prop.to_string())) {
156                debug!(eid = ?eid, prop, "Cache HIT");
157                metrics::counter!("uni_property_cache_hits_total", "type" => "edge").increment(1);
158                return Ok(val.clone());
159            } else {
160                debug!(eid = ?eid, prop, "Cache MISS");
161                metrics::counter!("uni_property_cache_misses_total", "type" => "edge").increment(1);
162            }
163        }
164
165        // 4. Fetch from Storage
166        let all = self.get_all_edge_props_with_ctx(eid, ctx).await?;
167        let val = all
168            .as_ref()
169            .and_then(|props| props.get(prop).cloned())
170            .unwrap_or(Value::Null);
171
172        // 5. Update Cache (if enabled) - Cache ALL fetched properties, not just requested one
173        if let Some(ref cache) = self.edge_cache {
174            let mut cache = cache.lock().await;
175            if let Some(ref props) = all {
176                for (prop_name, prop_val) in props {
177                    cache.put((eid, prop_name.clone()), prop_val.clone());
178                }
179            } else {
180                // No properties found, cache the null result for this property
181                cache.put((eid, prop.to_string()), Value::Null);
182            }
183        }
184
185        Ok(val)
186    }
187
188    pub async fn get_all_edge_props_with_ctx(
189        &self,
190        eid: uni_common::core::id::Eid,
191        ctx: Option<&QueryContext>,
192    ) -> Result<Option<Properties>> {
193        // 1. Check if deleted in any L0 layer
194        if l0_visibility::is_edge_deleted(eid, ctx) {
195            return Ok(None);
196        }
197
198        // 2. Accumulate properties from L0 layers (oldest to newest)
199        let mut final_props = l0_visibility::accumulate_edge_props(eid, ctx).unwrap_or_default();
200
201        // 3. Fetch from storage runs
202        let storage_props = self.fetch_all_edge_props_from_storage(eid).await?;
203
204        // 4. Handle case where edge exists but has no properties
205        if final_props.is_empty() && storage_props.is_none() {
206            if l0_visibility::edge_exists_in_l0(eid, ctx) {
207                return Ok(Some(Properties::new()));
208            }
209            return Ok(None);
210        }
211
212        // 5. Merge storage properties (L0 takes precedence)
213        if let Some(sp) = storage_props {
214            for (k, v) in sp {
215                final_props.entry(k).or_insert(v);
216            }
217        }
218
219        Ok(Some(final_props))
220    }
221
222    async fn fetch_all_edge_props_from_storage(&self, eid: Eid) -> Result<Option<Properties>> {
223        // In the new design, we scan all edge types since EID doesn't embed type info
224        self.fetch_all_edge_props_from_storage_with_hint(eid, None)
225            .await
226    }
227
228    async fn fetch_all_edge_props_from_storage_with_hint(
229        &self,
230        eid: Eid,
231        type_name_hint: Option<&str>,
232    ) -> Result<Option<Properties>> {
233        let schema = self.schema_manager.schema();
234        let backend = self.storage.backend();
235
236        // If hint provided, use it directly
237        let type_names: Vec<&str> = if let Some(hint) = type_name_hint {
238            vec![hint]
239        } else {
240            // Scan all edge types
241            schema.edge_types.keys().map(|s| s.as_str()).collect()
242        };
243
244        for type_name in type_names {
245            let type_props = schema.properties.get(type_name);
246
247            // For now, edges are primarily in Delta runs before compaction to L2 CSR.
248            // We check FWD delta runs.
249            if self.storage.delta_dataset(type_name, "fwd").is_err() {
250                continue; // Edge type doesn't exist, try next
251            }
252
253            // Use backend for edge property lookup
254            use crate::backend::table_names;
255            use crate::backend::types::ScanRequest;
256
257            let table_name = table_names::delta_table_name(type_name, "fwd");
258            if !backend.table_exists(&table_name).await.unwrap_or(false) {
259                continue; // No data for this type, try next
260            }
261
262            let base_filter = format!("eid = {}", eid.as_u64());
263            let filter_expr = self.storage.apply_version_filter(base_filter);
264
265            let batches = match backend
266                .scan(ScanRequest::all(&table_name).with_filter(filter_expr))
267                .await
268            {
269                Ok(b) => b,
270                Err(_) => continue,
271            };
272
273            // Collect all rows for this edge, sorted by version
274            let mut rows: Vec<(u64, u8, Properties)> = Vec::new();
275
276            for batch in batches {
277                let op_col = match batch.column_by_name("op") {
278                    Some(c) => c
279                        .as_any()
280                        .downcast_ref::<arrow_array::UInt8Array>()
281                        .unwrap(),
282                    None => continue,
283                };
284                let ver_col = match batch.column_by_name("_version") {
285                    Some(c) => c.as_any().downcast_ref::<UInt64Array>().unwrap(),
286                    None => continue,
287                };
288
289                for row in 0..batch.num_rows() {
290                    let ver = ver_col.value(row);
291                    let op = op_col.value(row);
292                    let mut props = Properties::new();
293
294                    if op != 1 {
295                        // Not a delete - extract properties
296                        if let Some(tp) = type_props {
297                            for (p_name, p_meta) in tp {
298                                if let Some(col) = batch.column_by_name(p_name)
299                                    && !col.is_null(row)
300                                {
301                                    let val =
302                                        Self::value_from_column(col.as_ref(), &p_meta.r#type, row)?;
303                                    props.insert(p_name.clone(), val);
304                                }
305                            }
306                        }
307                    }
308                    rows.push((ver, op, props));
309                }
310            }
311
312            if rows.is_empty() {
313                continue;
314            }
315
316            // Sort by version (ascending) so we merge in order
317            rows.sort_by_key(|(ver, _, _)| *ver);
318
319            // Merge properties across all versions
320            // For CRDT properties: merge values
321            // For non-CRDT properties: later versions overwrite earlier ones
322            let mut merged_props: Properties = Properties::new();
323            let mut is_deleted = false;
324
325            for (_, op, props) in rows {
326                if op == 1 {
327                    // Delete operation - mark as deleted
328                    is_deleted = true;
329                    merged_props.clear();
330                } else {
331                    is_deleted = false;
332                    for (p_name, p_val) in props {
333                        // Check if this is a CRDT property
334                        let is_crdt = type_props
335                            .and_then(|tp| tp.get(&p_name))
336                            .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
337                            .unwrap_or(false);
338
339                        if is_crdt {
340                            // Merge CRDT values
341                            if let Some(existing) = merged_props.get(&p_name) {
342                                if let Ok(merged) = self.merge_crdt_values(existing, &p_val) {
343                                    merged_props.insert(p_name, merged);
344                                }
345                            } else {
346                                merged_props.insert(p_name, p_val);
347                            }
348                        } else {
349                            // Non-CRDT: later version overwrites
350                            merged_props.insert(p_name, p_val);
351                        }
352                    }
353                }
354            }
355
356            if is_deleted {
357                return Ok(None);
358            }
359
360            if !merged_props.is_empty() {
361                return Ok(Some(merged_props));
362            }
363        }
364
365        // Fallback to main edges table props_json for unknown/schemaless types
366        use crate::storage::main_edge::MainEdgeDataset;
367        if let Some(props) = MainEdgeDataset::find_props_by_eid(self.storage.backend(), eid).await?
368        {
369            return Ok(Some(props));
370        }
371
372        Ok(None)
373    }
374
375    /// Batch load properties for multiple vertices
376    pub async fn get_batch_vertex_props(
377        &self,
378        vids: &[Vid],
379        properties: &[&str],
380        ctx: Option<&QueryContext>,
381    ) -> Result<HashMap<Vid, Properties>> {
382        let schema = self.schema_manager.schema();
383        let mut result = HashMap::new();
384        if vids.is_empty() {
385            return Ok(result);
386        }
387
388        // In the new storage model, VIDs are pure auto-increment and don't embed label info.
389        // We need to scan all label datasets to find the vertices.
390
391        // Try VidLabelsIndex for O(1) label resolution
392        let labels_to_scan: Vec<String> = {
393            let mut needed: std::collections::HashSet<String> = std::collections::HashSet::new();
394            let mut all_resolved = true;
395            for &vid in vids {
396                if let Some(labels) = self.storage.get_labels_from_index(vid) {
397                    needed.extend(labels);
398                } else {
399                    all_resolved = false;
400                    break;
401                }
402            }
403            if all_resolved {
404                needed.into_iter().collect()
405            } else {
406                schema.labels.keys().cloned().collect() // Fallback to full scan
407            }
408        };
409
410        // 2. Fetch from storage - scan relevant label datasets
411        for label_name in &labels_to_scan {
412            // Filter to properties that exist in this label's schema
413            let label_schema_props = schema.properties.get(label_name);
414            let valid_props: Vec<&str> = properties
415                .iter()
416                .cloned()
417                .filter(|p| label_schema_props.is_some_and(|props| props.contains_key(*p)))
418                .collect();
419            // Note: don't skip when valid_props is empty; overflow_json may have the properties
420
421            let ds = self.storage.vertex_dataset(label_name)?;
422            let backend = self.storage.backend();
423            let vtable_name = ds.table_name();
424
425            if !backend.table_exists(&vtable_name).await.unwrap_or(false) {
426                continue; // Table doesn't exist yet — skip this label
427            }
428
429            // Construct filter: _vid IN (...)
430            let vid_list = vids
431                .iter()
432                .map(|v| v.as_u64().to_string())
433                .collect::<Vec<_>>()
434                .join(",");
435            let base_filter = format!("_vid IN ({})", vid_list);
436
437            let final_filter = self.storage.apply_version_filter(base_filter);
438
439            // Build column list for projection
440            let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
441            columns.push("_vid".to_string());
442            columns.push("_version".to_string());
443            columns.push("_deleted".to_string());
444            columns.extend(valid_props.iter().map(|s| s.to_string()));
445            // Add overflow_json to fetch non-schema properties
446            columns.push("overflow_json".to_string());
447
448            use crate::backend::types::ScanRequest;
449            let request = ScanRequest::all(&vtable_name)
450                .with_filter(final_filter)
451                .with_columns(columns);
452
453            let batches: Vec<RecordBatch> = match backend.scan(request).await {
454                Ok(b) => b,
455                Err(e) => {
456                    warn!(
457                        label = %label_name,
458                        error = %e,
459                        "failed to scan label table, skipping"
460                    );
461                    continue;
462                }
463            };
464            for batch in batches {
465                let vid_col = match batch
466                    .column_by_name("_vid")
467                    .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
468                {
469                    Some(c) => c,
470                    None => continue,
471                };
472                let del_col = match batch
473                    .column_by_name("_deleted")
474                    .and_then(|col| col.as_any().downcast_ref::<BooleanArray>())
475                {
476                    Some(c) => c,
477                    None => continue,
478                };
479
480                for row in 0..batch.num_rows() {
481                    let vid = Vid::from(vid_col.value(row));
482
483                    if del_col.value(row) {
484                        result.remove(&vid);
485                        continue;
486                    }
487
488                    let label_props = schema.properties.get(label_name);
489                    let mut props =
490                        Self::extract_row_properties(&batch, row, &valid_props, label_props)?;
491                    Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
492                    result.insert(vid, props);
493                }
494            }
495        }
496
497        // 3. Overlay L0 buffers in age order: pending (oldest to newest) -> current -> transaction
498        if let Some(ctx) = ctx {
499            // First, overlay pending flush L0s in order (oldest first, so iterate forward)
500            for pending_l0_arc in &ctx.pending_flush_l0s {
501                let pending_l0 = pending_l0_arc.read();
502                self.overlay_l0_batch(vids, &pending_l0, properties, &mut result);
503            }
504
505            // Then overlay current L0 (newer than pending)
506            let l0 = ctx.l0.read();
507            self.overlay_l0_batch(vids, &l0, properties, &mut result);
508
509            // Finally overlay transaction L0 (newest)
510            // Skip transaction L0 if querying a snapshot
511            // (Transaction changes are at current version, not in snapshot)
512            if self.storage.version_high_water_mark().is_none()
513                && let Some(tx_l0_arc) = &ctx.transaction_l0
514            {
515                let tx_l0 = tx_l0_arc.read();
516                self.overlay_l0_batch(vids, &tx_l0, properties, &mut result);
517            }
518        }
519
520        Ok(result)
521    }
522
523    fn overlay_l0_batch(
524        &self,
525        vids: &[Vid],
526        l0: &L0Buffer,
527        properties: &[&str],
528        result: &mut HashMap<Vid, Properties>,
529    ) {
530        let schema = self.schema_manager.schema();
531        for &vid in vids {
532            // If deleted in L0, remove from result
533            if l0.vertex_tombstones.contains(&vid) {
534                result.remove(&vid);
535                continue;
536            }
537            // If in L0, check version before merging
538            if let Some(l0_props) = l0.vertex_properties.get(&vid) {
539                // Skip entries beyond snapshot boundary
540                let entry_version = l0.vertex_versions.get(&vid).copied().unwrap_or(0);
541                if self
542                    .storage
543                    .version_high_water_mark()
544                    .is_some_and(|hwm| entry_version > hwm)
545                {
546                    continue;
547                }
548
549                let entry = result.entry(vid).or_default();
550                // In new storage model, get labels from L0Buffer
551                let labels = l0.get_vertex_labels(vid);
552
553                for (k, v) in l0_props {
554                    if properties.contains(&k.as_str()) {
555                        // Check if property is CRDT by looking up in any of the vertex's labels
556                        let is_crdt = labels
557                            .and_then(|label_list| {
558                                label_list.iter().find_map(|ln| {
559                                    schema
560                                        .properties
561                                        .get(ln)
562                                        .and_then(|lp| lp.get(k))
563                                        .filter(|pm| matches!(pm.r#type, DataType::Crdt(_)))
564                                })
565                            })
566                            .is_some();
567
568                        if is_crdt {
569                            let existing = entry.entry(k.clone()).or_insert(Value::Null);
570                            *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
571                        } else {
572                            entry.insert(k.clone(), v.clone());
573                        }
574                    }
575                }
576            }
577        }
578    }
579
580    /// Load properties as Arrow columns for vectorized processing
581    /// Batch load properties for multiple edges
582    pub async fn get_batch_edge_props(
583        &self,
584        eids: &[uni_common::core::id::Eid],
585        properties: &[&str],
586        ctx: Option<&QueryContext>,
587    ) -> Result<HashMap<Vid, Properties>> {
588        let schema = self.schema_manager.schema();
589        let mut result = HashMap::new();
590        if eids.is_empty() {
591            return Ok(result);
592        }
593
594        // In the new storage model, EIDs are pure auto-increment and don't embed type info.
595        // We need to scan all edge type datasets to find the edges.
596
597        // Try to resolve edge types from L0 context for O(1) lookup
598        let types_to_scan: Vec<String> = {
599            if let Some(ctx) = ctx {
600                let mut needed: std::collections::HashSet<String> =
601                    std::collections::HashSet::new();
602                let mut all_resolved = true;
603                for &eid in eids {
604                    if let Some(etype) = ctx.l0.read().get_edge_type(eid) {
605                        needed.insert(etype.to_string());
606                    } else {
607                        all_resolved = false;
608                        break;
609                    }
610                }
611                if all_resolved {
612                    needed.into_iter().collect()
613                } else {
614                    schema.edge_types.keys().cloned().collect() // Fallback to full scan
615                }
616            } else {
617                schema.edge_types.keys().cloned().collect() // No context, full scan
618            }
619        };
620
621        // 2. Fetch from storage (Delta runs) - scan relevant edge types
622        for type_name in &types_to_scan {
623            let type_props = schema.properties.get(type_name);
624            let valid_props: Vec<&str> = properties
625                .iter()
626                .cloned()
627                .filter(|p| type_props.is_some_and(|props| props.contains_key(*p)))
628                .collect();
629            // Note: don't skip when valid_props is empty; overflow_json may have the properties
630
631            let delta_ds = match self.storage.delta_dataset(type_name, "fwd") {
632                Ok(ds) => ds,
633                Err(_) => continue,
634            };
635            let backend = self.storage.backend();
636            let dtable_name = delta_ds.table_name();
637
638            if !backend.table_exists(&dtable_name).await.unwrap_or(false) {
639                continue; // Table doesn't exist yet — skip this edge type
640            }
641
642            let eid_list = eids
643                .iter()
644                .map(|e| e.as_u64().to_string())
645                .collect::<Vec<_>>()
646                .join(",");
647            let base_filter = format!("eid IN ({})", eid_list);
648
649            let final_filter = self.storage.apply_version_filter(base_filter);
650
651            // Build column list for projection
652            let mut columns: Vec<String> = Vec::with_capacity(valid_props.len() + 4);
653            columns.push("eid".to_string());
654            columns.push("_version".to_string());
655            columns.push("op".to_string());
656            columns.extend(valid_props.iter().map(|s| s.to_string()));
657            // Add overflow_json to fetch non-schema properties
658            columns.push("overflow_json".to_string());
659
660            use crate::backend::types::ScanRequest;
661            let request = ScanRequest::all(&dtable_name)
662                .with_filter(final_filter)
663                .with_columns(columns);
664
665            let batches: Vec<RecordBatch> = match backend.scan(request).await {
666                Ok(b) => b,
667                Err(e) => {
668                    warn!(
669                        edge_type = %type_name,
670                        error = %e,
671                        "failed to scan edge delta table, skipping"
672                    );
673                    continue;
674                }
675            };
676            for batch in batches {
677                let eid_col = match batch
678                    .column_by_name("eid")
679                    .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
680                {
681                    Some(c) => c,
682                    None => continue,
683                };
684                let op_col = match batch
685                    .column_by_name("op")
686                    .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
687                {
688                    Some(c) => c,
689                    None => continue,
690                };
691
692                for row in 0..batch.num_rows() {
693                    let eid = uni_common::core::id::Eid::from(eid_col.value(row));
694
695                    // op=1 is Delete
696                    if op_col.value(row) == 1 {
697                        result.remove(&Vid::from(eid.as_u64()));
698                        continue;
699                    }
700
701                    let mut props =
702                        Self::extract_row_properties(&batch, row, &valid_props, type_props)?;
703                    Self::merge_overflow_into_props(&batch, row, properties, &mut props)?;
704                    // Reuse Vid as key for compatibility with materialized_property
705                    result.insert(Vid::from(eid.as_u64()), props);
706                }
707            }
708        }
709
710        // 3. Overlay L0 buffers in age order: pending (oldest to newest) -> current -> transaction
711        if let Some(ctx) = ctx {
712            // First, overlay pending flush L0s in order (oldest first, so iterate forward)
713            for pending_l0_arc in &ctx.pending_flush_l0s {
714                let pending_l0 = pending_l0_arc.read();
715                self.overlay_l0_edge_batch(eids, &pending_l0, properties, &mut result);
716            }
717
718            // Then overlay current L0 (newer than pending)
719            let l0 = ctx.l0.read();
720            self.overlay_l0_edge_batch(eids, &l0, properties, &mut result);
721
722            // Finally overlay transaction L0 (newest)
723            // Skip transaction L0 if querying a snapshot
724            // (Transaction changes are at current version, not in snapshot)
725            if self.storage.version_high_water_mark().is_none()
726                && let Some(tx_l0_arc) = &ctx.transaction_l0
727            {
728                let tx_l0 = tx_l0_arc.read();
729                self.overlay_l0_edge_batch(eids, &tx_l0, properties, &mut result);
730            }
731        }
732
733        Ok(result)
734    }
735
736    fn overlay_l0_edge_batch(
737        &self,
738        eids: &[uni_common::core::id::Eid],
739        l0: &L0Buffer,
740        properties: &[&str],
741        result: &mut HashMap<Vid, Properties>,
742    ) {
743        let schema = self.schema_manager.schema();
744        for &eid in eids {
745            let vid_key = Vid::from(eid.as_u64());
746            if l0.tombstones.contains_key(&eid) {
747                result.remove(&vid_key);
748                continue;
749            }
750            if let Some(l0_props) = l0.edge_properties.get(&eid) {
751                // Skip entries beyond snapshot boundary
752                let entry_version = l0.edge_versions.get(&eid).copied().unwrap_or(0);
753                if self
754                    .storage
755                    .version_high_water_mark()
756                    .is_some_and(|hwm| entry_version > hwm)
757                {
758                    continue;
759                }
760
761                let entry = result.entry(vid_key).or_default();
762                // In new storage model, get edge type from L0Buffer
763                let type_name = l0.get_edge_type(eid);
764
765                let include_all = properties.contains(&"_all_props");
766                for (k, v) in l0_props {
767                    if include_all || properties.contains(&k.as_str()) {
768                        // Check if property is CRDT
769                        let is_crdt = type_name
770                            .and_then(|tn| schema.properties.get(tn))
771                            .and_then(|tp| tp.get(k))
772                            .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
773                            .unwrap_or(false);
774
775                        if is_crdt {
776                            let existing = entry.entry(k.clone()).or_insert(Value::Null);
777                            *existing = self.merge_crdt_values(existing, v).unwrap_or(v.clone());
778                        } else {
779                            entry.insert(k.clone(), v.clone());
780                        }
781                    }
782                }
783            }
784        }
785    }
786
787    /// Batch load labels for multiple vertices.
788    pub async fn get_batch_labels(
789        &self,
790        vids: &[Vid],
791        ctx: Option<&QueryContext>,
792    ) -> Result<HashMap<Vid, Vec<String>>> {
793        let mut result = HashMap::new();
794        if vids.is_empty() {
795            return Ok(result);
796        }
797
798        // Phase 1: Get from L0 layers (oldest to newest)
799        if let Some(ctx) = ctx {
800            let mut collect_labels = |l0: &L0Buffer| {
801                for &vid in vids {
802                    if let Some(labels) = l0.get_vertex_labels(vid) {
803                        result
804                            .entry(vid)
805                            .or_default()
806                            .extend(labels.iter().cloned());
807                    }
808                }
809            };
810
811            for l0_arc in &ctx.pending_flush_l0s {
812                collect_labels(&l0_arc.read());
813            }
814            collect_labels(&ctx.l0.read());
815            if let Some(tx_l0_arc) = &ctx.transaction_l0 {
816                collect_labels(&tx_l0_arc.read());
817            }
818        }
819
820        // Phase 2: Get from storage (try VidLabelsIndex first, then LanceDB fallback)
821        let mut vids_needing_lancedb = Vec::new();
822
823        /// Merge new labels into an existing label list, skipping duplicates.
824        fn merge_labels(existing: &mut Vec<String>, new_labels: Vec<String>) {
825            for l in new_labels {
826                if !existing.contains(&l) {
827                    existing.push(l);
828                }
829            }
830        }
831
832        for &vid in vids {
833            if result.contains_key(&vid) {
834                continue; // Already have labels from L0
835            }
836
837            if let Some(labels) = self.storage.get_labels_from_index(vid) {
838                merge_labels(result.entry(vid).or_default(), labels);
839            } else {
840                vids_needing_lancedb.push(vid);
841            }
842        }
843
844        // Fallback to storage backend for VIDs not in the index
845        if !vids_needing_lancedb.is_empty() {
846            let backend = self.storage.backend();
847            let version = self.storage.version_high_water_mark();
848            let storage_labels = MainVertexDataset::find_batch_labels_by_vids(
849                backend,
850                &vids_needing_lancedb,
851                version,
852            )
853            .await?;
854
855            for (vid, labels) in storage_labels {
856                merge_labels(result.entry(vid).or_default(), labels);
857            }
858        }
859
860        // Deduplicate and sort labels
861        for labels in result.values_mut() {
862            labels.sort();
863            labels.dedup();
864        }
865
866        Ok(result)
867    }
868
869    pub async fn get_all_vertex_props(&self, vid: Vid) -> Result<Properties> {
870        Ok(self
871            .get_all_vertex_props_with_ctx(vid, None)
872            .await?
873            .unwrap_or_default())
874    }
875
876    pub async fn get_all_vertex_props_with_ctx(
877        &self,
878        vid: Vid,
879        ctx: Option<&QueryContext>,
880    ) -> Result<Option<Properties>> {
881        // 1. Check if deleted in any L0 layer
882        if l0_visibility::is_vertex_deleted(vid, ctx) {
883            return Ok(None);
884        }
885
886        // 2. Accumulate properties from L0 layers (oldest to newest)
887        let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
888
889        // 3. Fetch from storage
890        let storage_props_opt = self.fetch_all_props_from_storage(vid).await?;
891
892        // 4. Handle case where vertex doesn't exist in either layer
893        if l0_props.is_none() && storage_props_opt.is_none() {
894            return Ok(None);
895        }
896
897        let mut final_props = l0_props.unwrap_or_default();
898
899        // 5. Merge storage properties (L0 takes precedence)
900        if let Some(storage_props) = storage_props_opt {
901            for (k, v) in storage_props {
902                final_props.entry(k).or_insert(v);
903            }
904        }
905
906        // 6. Normalize CRDT properties - convert JSON strings to JSON objects
907        // In the new storage model, we need to get labels from context/L0
908        if let Some(ctx) = ctx {
909            // Try to get labels from L0 layers
910            let labels = l0_visibility::get_vertex_labels(vid, ctx);
911            for label in &labels {
912                self.normalize_crdt_properties(&mut final_props, label)?;
913            }
914        }
915
916        Ok(Some(final_props))
917    }
918
919    /// Batch-fetch properties for multiple vertices of a known label.
920    ///
921    /// Queries L0 layers in-memory, then fetches remaining VIDs from LanceDB in
922    /// a single `_vid IN (...)` query on the label table. Much faster than
923    /// per-vertex `get_all_vertex_props_with_ctx` when many vertices need loading.
924    pub async fn get_batch_vertex_props_for_label(
925        &self,
926        vids: &[Vid],
927        label: &str,
928        ctx: Option<&QueryContext>,
929    ) -> Result<HashMap<Vid, Properties>> {
930        let mut result: HashMap<Vid, Properties> = HashMap::new();
931        let mut need_storage: Vec<Vid> = Vec::new();
932
933        // Phase 1: Check L0 layers for each VID (fast, in-memory).
934        for &vid in vids {
935            if l0_visibility::is_vertex_deleted(vid, ctx) {
936                continue;
937            }
938            let l0_props = l0_visibility::accumulate_vertex_props(vid, ctx);
939            if let Some(props) = l0_props {
940                result.insert(vid, props);
941            } else {
942                need_storage.push(vid);
943            }
944        }
945
946        // If everything was resolved from L0, skip storage entirely.
947        if need_storage.is_empty() {
948            // Normalize CRDT properties for L0-resolved vertices.
949            if ctx.is_some() {
950                for props in result.values_mut() {
951                    self.normalize_crdt_properties(props, label)?;
952                }
953            }
954            return Ok(result);
955        }
956
957        // Phase 2: Batch-fetch from LanceDB for remaining VIDs.
958        let schema = self.schema_manager.schema();
959        let label_props = schema.properties.get(label);
960
961        let mut prop_names: Vec<String> = Vec::new();
962        if let Some(props) = label_props {
963            prop_names = props.keys().cloned().collect();
964        }
965
966        let mut columns: Vec<String> = vec![
967            "_vid".to_string(),
968            "_deleted".to_string(),
969            "_version".to_string(),
970        ];
971        columns.extend(prop_names.iter().cloned());
972        columns.push("overflow_json".to_string());
973
974        // Build IN filter for all VIDs at once.
975        let vid_list: String = need_storage
976            .iter()
977            .map(|v| v.as_u64().to_string())
978            .collect::<Vec<_>>()
979            .join(", ");
980        let base_filter = format!("_vid IN ({})", vid_list);
981
982        let filter_expr = self.storage.apply_version_filter(base_filter);
983
984        let table_name = crate::backend::table_names::vertex_table_name(label);
985        let batches: Vec<RecordBatch> = self
986            .storage
987            .backend()
988            .scan(
989                crate::backend::types::ScanRequest::all(&table_name)
990                    .with_filter(&filter_expr)
991                    .with_columns(columns.clone()),
992            )
993            .await?;
994
995        let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
996
997        // Track best version per VID for proper version-based merging.
998        let mut per_vid_best_version: HashMap<Vid, u64> = HashMap::new();
999        let mut per_vid_props: HashMap<Vid, Properties> = HashMap::new();
1000
1001        for batch in batches {
1002            let vid_col = match batch
1003                .column_by_name("_vid")
1004                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1005            {
1006                Some(c) => c,
1007                None => continue,
1008            };
1009            let deleted_col = match batch
1010                .column_by_name("_deleted")
1011                .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1012            {
1013                Some(c) => c,
1014                None => continue,
1015            };
1016            let version_col = match batch
1017                .column_by_name("_version")
1018                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1019            {
1020                Some(c) => c,
1021                None => continue,
1022            };
1023
1024            for row in 0..batch.num_rows() {
1025                let vid = Vid::from(vid_col.value(row));
1026                let version = version_col.value(row);
1027
1028                if deleted_col.value(row) {
1029                    if per_vid_best_version
1030                        .get(&vid)
1031                        .is_none_or(|&best| version >= best)
1032                    {
1033                        per_vid_best_version.insert(vid, version);
1034                        per_vid_props.remove(&vid);
1035                    }
1036                    continue;
1037                }
1038
1039                let mut current_props =
1040                    Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1041
1042                if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1043                    for (k, v) in overflow_props {
1044                        current_props.entry(k).or_insert(v);
1045                    }
1046                }
1047
1048                let best = per_vid_best_version.get(&vid).copied();
1049                let mut best_opt = best;
1050                let mut merged = per_vid_props.remove(&vid);
1051                self.merge_versioned_props(
1052                    current_props,
1053                    version,
1054                    &mut best_opt,
1055                    &mut merged,
1056                    label_props,
1057                )?;
1058                if let Some(v) = best_opt {
1059                    per_vid_best_version.insert(vid, v);
1060                }
1061                if let Some(p) = merged {
1062                    per_vid_props.insert(vid, p);
1063                }
1064            }
1065        }
1066
1067        // Merge storage results with any L0 partial props already in result.
1068        for (vid, storage_props) in per_vid_props {
1069            let entry = result.entry(vid).or_default();
1070            for (k, v) in storage_props {
1071                entry.entry(k).or_insert(v);
1072            }
1073        }
1074
1075        // Mark VIDs that had no data anywhere as absent (don't insert them).
1076        // VIDs not in `result` simply won't appear in the output.
1077
1078        // Phase 3: Normalize CRDT properties.
1079        if ctx.is_some() {
1080            for props in result.values_mut() {
1081                self.normalize_crdt_properties(props, label)?;
1082            }
1083        }
1084
1085        Ok(result)
1086    }
1087
1088    /// Batch-fetch properties for multiple edges of a known type.
1089    ///
1090    /// Mirrors `get_batch_vertex_props_for_label` (above) for the edge path.
1091    /// Issues one `eid IN (...)` scan against the delta table for the edge
1092    /// type, replaying per-EID version history (op-replay + CRDT merge) the
1093    /// same way `fetch_all_edge_props_from_storage_with_hint` does. Far
1094    /// faster than per-edge `get_all_edge_props_with_ctx` when many edges
1095    /// of the same type need loading (e.g., batched SET/REMOVE on edges
1096    /// matched by a MATCH).
1097    ///
1098    /// EIDs of deleted edges or those with no rows in delta storage are
1099    /// omitted from the returned map; callers can fall back to the per-EID
1100    /// path for misses.
1101    pub async fn get_batch_edge_props_for_type(
1102        &self,
1103        eids: &[Eid],
1104        type_name: &str,
1105        ctx: Option<&QueryContext>,
1106    ) -> Result<HashMap<Eid, Properties>> {
1107        use crate::backend::table_names;
1108        use crate::backend::types::ScanRequest;
1109
1110        let mut result: HashMap<Eid, Properties> = HashMap::new();
1111        if eids.is_empty() {
1112            return Ok(result);
1113        }
1114
1115        // Phase 1: L0 check per EID. Skip deleted; serve from L0 if it has
1116        // an accumulated property set; otherwise note for storage scan.
1117        let mut need_storage: Vec<Eid> = Vec::new();
1118        for &eid in eids {
1119            if l0_visibility::is_edge_deleted(eid, ctx) {
1120                continue;
1121            }
1122            let l0_props = l0_visibility::accumulate_edge_props(eid, ctx);
1123            // Edge L0 semantics: even an empty accumulator means "edge exists
1124            // in L0 with no user props yet" — we still go to storage to pick
1125            // up the persisted full row (mirrors get_all_edge_props_with_ctx).
1126            if let Some(props) = l0_props {
1127                result.insert(eid, props);
1128            }
1129            need_storage.push(eid);
1130        }
1131
1132        if need_storage.is_empty() {
1133            return Ok(result);
1134        }
1135
1136        // Phase 2: One scan with `eid IN (...)` on the delta table.
1137        let schema = self.schema_manager.schema();
1138        let type_props = schema.properties.get(type_name);
1139
1140        if self.storage.delta_dataset(type_name, "fwd").is_err() {
1141            return Ok(result);
1142        }
1143
1144        let table_name = table_names::delta_table_name(type_name, "fwd");
1145        let backend = self.storage.backend();
1146        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1147            return Ok(result);
1148        }
1149
1150        let eid_list: String = need_storage
1151            .iter()
1152            .map(|e| e.as_u64().to_string())
1153            .collect::<Vec<_>>()
1154            .join(", ");
1155        let base_filter = format!("eid IN ({})", eid_list);
1156        let filter_expr = self.storage.apply_version_filter(base_filter);
1157
1158        let batches = match backend
1159            .scan(ScanRequest::all(&table_name).with_filter(filter_expr))
1160            .await
1161        {
1162            Ok(b) => b,
1163            Err(_) => return Ok(result), // Storage error: per-row fallback handles correctness
1164        };
1165
1166        // Collect (eid, version, op, props) tuples, then group + replay per EID.
1167        let mut per_eid_rows: HashMap<Eid, Vec<(u64, u8, Properties)>> = HashMap::new();
1168        for batch in batches {
1169            let eid_col = match batch
1170                .column_by_name("eid")
1171                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1172            {
1173                Some(c) => c,
1174                None => continue,
1175            };
1176            let op_col = match batch
1177                .column_by_name("op")
1178                .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt8Array>())
1179            {
1180                Some(c) => c,
1181                None => continue,
1182            };
1183            let ver_col = match batch
1184                .column_by_name("_version")
1185                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1186            {
1187                Some(c) => c,
1188                None => continue,
1189            };
1190
1191            for row in 0..batch.num_rows() {
1192                let eid = Eid::from(eid_col.value(row));
1193                let ver = ver_col.value(row);
1194                let op = op_col.value(row);
1195                let mut props = Properties::new();
1196
1197                if op != 1
1198                    && let Some(tp) = type_props
1199                {
1200                    for (p_name, p_meta) in tp {
1201                        if let Some(col) = batch.column_by_name(p_name)
1202                            && !col.is_null(row)
1203                        {
1204                            let val = Self::value_from_column(col.as_ref(), &p_meta.r#type, row)?;
1205                            props.insert(p_name.clone(), val);
1206                        }
1207                    }
1208                }
1209                per_eid_rows.entry(eid).or_default().push((ver, op, props));
1210            }
1211        }
1212
1213        for (eid, mut rows) in per_eid_rows {
1214            rows.sort_by_key(|(ver, _, _)| *ver);
1215
1216            let mut merged_props: Properties = Properties::new();
1217            let mut is_deleted = false;
1218
1219            for (_, op, props) in rows {
1220                if op == 1 {
1221                    is_deleted = true;
1222                    merged_props.clear();
1223                } else {
1224                    is_deleted = false;
1225                    for (p_name, p_val) in props {
1226                        let is_crdt = type_props
1227                            .and_then(|tp| tp.get(&p_name))
1228                            .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1229                            .unwrap_or(false);
1230                        if is_crdt {
1231                            if let Some(existing) = merged_props.get(&p_name) {
1232                                if let Ok(merged) = self.merge_crdt_values(existing, &p_val) {
1233                                    merged_props.insert(p_name, merged);
1234                                }
1235                            } else {
1236                                merged_props.insert(p_name, p_val);
1237                            }
1238                        } else {
1239                            merged_props.insert(p_name, p_val);
1240                        }
1241                    }
1242                }
1243            }
1244
1245            if is_deleted {
1246                // Deleted in storage; remove any L0 accumulation that may
1247                // have been recorded under this EID by Phase 1 (matches
1248                // is_edge_deleted single-EID semantics).
1249                result.remove(&eid);
1250                continue;
1251            }
1252
1253            // L0 takes precedence over storage for shared keys; insert
1254            // storage values only where L0 did not already provide them.
1255            let entry = result.entry(eid).or_default();
1256            for (k, v) in merged_props {
1257                entry.entry(k).or_insert(v);
1258            }
1259        }
1260
1261        Ok(result)
1262    }
1263
1264    /// Normalize CRDT properties by converting JSON strings to JSON objects.
1265    /// This handles the case where CRDT values come from Cypher CREATE statements
1266    /// as `Value::String("{\"t\": \"gc\", ...}")` and need to be parsed into objects.
1267    fn normalize_crdt_properties(&self, props: &mut Properties, label: &str) -> Result<()> {
1268        let schema = self.schema_manager.schema();
1269        let label_props = match schema.properties.get(label) {
1270            Some(p) => p,
1271            None => return Ok(()),
1272        };
1273
1274        for (prop_name, prop_meta) in label_props {
1275            if let DataType::Crdt(_) = prop_meta.r#type
1276                && let Some(val) = props.get_mut(prop_name)
1277            {
1278                *val = Value::from(Self::parse_crdt_value(val)?);
1279            }
1280        }
1281
1282        Ok(())
1283    }
1284
1285    /// Extract properties from a single batch row.
1286    fn extract_row_properties(
1287        batch: &RecordBatch,
1288        row: usize,
1289        prop_names: &[&str],
1290        label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1291    ) -> Result<Properties> {
1292        let mut props = Properties::new();
1293        for name in prop_names {
1294            let col = match batch.column_by_name(name) {
1295                Some(col) => col,
1296                None => continue,
1297            };
1298            if col.is_null(row) {
1299                continue;
1300            }
1301            if let Some(prop_meta) = label_props.and_then(|p| p.get(*name)) {
1302                let val = Self::value_from_column(col.as_ref(), &prop_meta.r#type, row)?;
1303                props.insert((*name).to_string(), val);
1304            }
1305        }
1306        Ok(props)
1307    }
1308
1309    /// Extract overflow properties from the overflow_json column.
1310    ///
1311    /// Returns None if the column doesn't exist or the value is null,
1312    /// otherwise parses the JSON blob and returns the properties.
1313    fn extract_overflow_properties(batch: &RecordBatch, row: usize) -> Result<Option<Properties>> {
1314        use arrow_array::LargeBinaryArray;
1315
1316        let overflow_col = match batch.column_by_name("overflow_json") {
1317            Some(col) => col,
1318            None => return Ok(None), // Column doesn't exist (old schema)
1319        };
1320
1321        if overflow_col.is_null(row) {
1322            return Ok(None);
1323        }
1324
1325        let binary_array = overflow_col
1326            .as_any()
1327            .downcast_ref::<LargeBinaryArray>()
1328            .ok_or_else(|| anyhow!("overflow_json is not LargeBinaryArray"))?;
1329
1330        let jsonb_bytes = binary_array.value(row);
1331
1332        // Decode the CypherValue blob directly to `Value`. Routing through
1333        // `serde_json` would stringify temporal values (and is unnecessary —
1334        // the blob already decodes to a `Value::Map`).
1335        match uni_common::cypher_value_codec::decode(jsonb_bytes)
1336            .map_err(|e| anyhow!("Failed to decode CypherValue: {}", e))?
1337        {
1338            Value::Map(map) => Ok(Some(map)),
1339            Value::Null => Ok(None),
1340            other => Err(anyhow!(
1341                "overflow_json decoded to a non-map value: {other:?}"
1342            )),
1343        }
1344    }
1345
1346    /// Merge overflow properties from the overflow_json column into an existing props map.
1347    ///
1348    /// Handles two concerns:
1349    /// 1. If `overflow_json` is explicitly requested in `properties`, stores the raw JSONB
1350    ///    bytes as a JSON array of u8 values.
1351    /// 2. Extracts individual overflow properties and merges those that are in `properties`.
1352    fn merge_overflow_into_props(
1353        batch: &RecordBatch,
1354        row: usize,
1355        properties: &[&str],
1356        props: &mut Properties,
1357    ) -> Result<()> {
1358        use arrow_array::LargeBinaryArray;
1359
1360        let overflow_col = match batch.column_by_name("overflow_json") {
1361            Some(col) if !col.is_null(row) => col,
1362            _ => return Ok(()),
1363        };
1364
1365        // Store raw JSONB bytes if explicitly requested
1366        if properties.contains(&"overflow_json")
1367            && let Some(binary_array) = overflow_col.as_any().downcast_ref::<LargeBinaryArray>()
1368        {
1369            let jsonb_bytes = binary_array.value(row);
1370            let bytes_list: Vec<Value> =
1371                jsonb_bytes.iter().map(|&b| Value::Int(b as i64)).collect();
1372            props.insert("overflow_json".to_string(), Value::List(bytes_list));
1373        }
1374
1375        // Extract and merge individual overflow properties
1376        if let Some(overflow_props) = Self::extract_overflow_properties(batch, row)? {
1377            for (k, v) in overflow_props {
1378                if properties.contains(&k.as_str()) {
1379                    props.entry(k).or_insert(v);
1380                }
1381            }
1382        }
1383
1384        Ok(())
1385    }
1386
1387    /// Merge CRDT properties from source into target.
1388    fn merge_crdt_into(
1389        &self,
1390        target: &mut Properties,
1391        source: Properties,
1392        label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1393        crdt_only: bool,
1394    ) -> Result<()> {
1395        for (k, v) in source {
1396            if let Some(prop_meta) = label_props.and_then(|p| p.get(&k)) {
1397                if let DataType::Crdt(_) = prop_meta.r#type {
1398                    let existing_v = target.entry(k).or_insert(Value::Null);
1399                    *existing_v = self.merge_crdt_values(existing_v, &v)?;
1400                } else if !crdt_only {
1401                    target.insert(k, v);
1402                }
1403            }
1404        }
1405        Ok(())
1406    }
1407
1408    /// Handle version-based property merging for storage fetch.
1409    fn merge_versioned_props(
1410        &self,
1411        current_props: Properties,
1412        version: u64,
1413        best_version: &mut Option<u64>,
1414        best_props: &mut Option<Properties>,
1415        label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1416    ) -> Result<()> {
1417        if best_version.is_none_or(|best| version > best) {
1418            // Newest version: strictly newer
1419            if let Some(mut existing_props) = best_props.take() {
1420                // Merge CRDTs from existing into current
1421                let mut merged = current_props;
1422                for (k, v) in merged.iter_mut() {
1423                    if let Some(prop_meta) = label_props.and_then(|p| p.get(k))
1424                        && let DataType::Crdt(_) = prop_meta.r#type
1425                        && let Some(existing_val) = existing_props.remove(k)
1426                    {
1427                        *v = self.merge_crdt_values(v, &existing_val)?;
1428                    }
1429                }
1430                *best_props = Some(merged);
1431            } else {
1432                *best_props = Some(current_props);
1433            }
1434            *best_version = Some(version);
1435        } else if Some(version) == *best_version {
1436            // Same version: merge all properties
1437            if let Some(existing_props) = best_props.as_mut() {
1438                self.merge_crdt_into(existing_props, current_props, label_props, false)?;
1439            } else {
1440                *best_props = Some(current_props);
1441            }
1442        } else {
1443            // Older version: only merge CRDTs
1444            if let Some(existing_props) = best_props.as_mut() {
1445                self.merge_crdt_into(existing_props, current_props, label_props, true)?;
1446            }
1447        }
1448        Ok(())
1449    }
1450
1451    async fn fetch_all_props_from_storage(&self, vid: Vid) -> Result<Option<Properties>> {
1452        // In the new storage model, VID doesn't embed label info.
1453        // We need to scan all label datasets to find the vertex's properties.
1454        let schema = self.schema_manager.schema();
1455        let mut merged_props: Option<Properties> = None;
1456        let mut global_best_version: Option<u64> = None;
1457
1458        // Try VidLabelsIndex for O(1) label resolution
1459        let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1460        {
1461            labels
1462        } else {
1463            schema.labels.keys().cloned().collect() // Fallback to full scan
1464        };
1465
1466        for label_name in &label_names {
1467            let label_props = schema.properties.get(label_name);
1468
1469            // Get property names from schema
1470            let mut prop_names: Vec<String> = Vec::new();
1471            if let Some(props) = label_props {
1472                prop_names = props.keys().cloned().collect();
1473            }
1474
1475            // Build column selection
1476            let mut columns: Vec<String> = vec!["_deleted".to_string(), "_version".to_string()];
1477            columns.extend(prop_names.iter().cloned());
1478            // Add overflow_json column to fetch non-schema properties
1479            columns.push("overflow_json".to_string());
1480
1481            // Query using backend scan API
1482            let base_filter = format!("_vid = {}", vid.as_u64());
1483
1484            let filter_expr = self.storage.apply_version_filter(base_filter);
1485
1486            let table_name = crate::backend::table_names::vertex_table_name(label_name);
1487            let batches: Vec<RecordBatch> = match self
1488                .storage
1489                .backend()
1490                .scan(
1491                    crate::backend::types::ScanRequest::all(&table_name)
1492                        .with_filter(&filter_expr)
1493                        .with_columns(columns.clone()),
1494                )
1495                .await
1496            {
1497                Ok(b) => b,
1498                Err(_) => continue,
1499            };
1500
1501            // Convert Vec<String> to Vec<&str> for downstream use
1502            let prop_name_refs: Vec<&str> = prop_names.iter().map(|s| s.as_str()).collect();
1503
1504            for batch in batches {
1505                let deleted_col = match batch
1506                    .column_by_name("_deleted")
1507                    .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1508                {
1509                    Some(c) => c,
1510                    None => continue,
1511                };
1512                let version_col = match batch
1513                    .column_by_name("_version")
1514                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1515                {
1516                    Some(c) => c,
1517                    None => continue,
1518                };
1519
1520                for row in 0..batch.num_rows() {
1521                    let version = version_col.value(row);
1522
1523                    if deleted_col.value(row) {
1524                        if global_best_version.is_none_or(|best| version >= best) {
1525                            global_best_version = Some(version);
1526                            merged_props = None;
1527                        }
1528                        continue;
1529                    }
1530
1531                    let mut current_props =
1532                        Self::extract_row_properties(&batch, row, &prop_name_refs, label_props)?;
1533
1534                    // Also extract overflow properties from overflow_json column
1535                    if let Some(overflow_props) = Self::extract_overflow_properties(&batch, row)? {
1536                        // Merge overflow properties into current_props
1537                        for (k, v) in overflow_props {
1538                            current_props.entry(k).or_insert(v);
1539                        }
1540                    }
1541
1542                    self.merge_versioned_props(
1543                        current_props,
1544                        version,
1545                        &mut global_best_version,
1546                        &mut merged_props,
1547                        label_props,
1548                    )?;
1549                }
1550            }
1551        }
1552
1553        // Fallback to main table props_json for unknown/schemaless labels.
1554        // Gated on "no per-label verdict" (neither a live row nor a tombstone
1555        // was seen), so a per-label deletion tombstone is never overridden by
1556        // an older main-table row.
1557        if merged_props.is_none()
1558            && global_best_version.is_none()
1559            && let Some(main_props) = MainVertexDataset::find_props_by_vid(
1560                self.storage.backend(),
1561                vid,
1562                self.storage.version_high_water_mark(),
1563            )
1564            .await?
1565        {
1566            return Ok(Some(main_props));
1567        }
1568
1569        Ok(merged_props)
1570    }
1571
1572    pub async fn get_vertex_prop(&self, vid: Vid, prop: &str) -> Result<Value> {
1573        self.get_vertex_prop_with_ctx(vid, prop, None).await
1574    }
1575
1576    #[instrument(skip(self, ctx), level = "trace")]
1577    pub async fn get_vertex_prop_with_ctx(
1578        &self,
1579        vid: Vid,
1580        prop: &str,
1581        ctx: Option<&QueryContext>,
1582    ) -> Result<Value> {
1583        // 1. Check if deleted in any L0 layer
1584        if l0_visibility::is_vertex_deleted(vid, ctx) {
1585            return Ok(Value::Null);
1586        }
1587
1588        // 2. Determine if property is CRDT type
1589        // First check labels from context/L0, then fall back to scanning all labels in schema
1590        let schema = self.schema_manager.schema();
1591        let labels = ctx
1592            .map(|c| l0_visibility::get_vertex_labels(vid, c))
1593            .unwrap_or_default();
1594
1595        let is_crdt = if !labels.is_empty() {
1596            // Check labels from context
1597            labels.iter().any(|ln| {
1598                schema
1599                    .properties
1600                    .get(ln)
1601                    .and_then(|lp| lp.get(prop))
1602                    .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1603                    .unwrap_or(false)
1604            })
1605        } else {
1606            // No labels from context - check if property is CRDT in ANY label
1607            schema.properties.values().any(|label_props| {
1608                label_props
1609                    .get(prop)
1610                    .map(|pm| matches!(pm.r#type, DataType::Crdt(_)))
1611                    .unwrap_or(false)
1612            })
1613        };
1614
1615        // 3. Check L0 chain for property
1616        if is_crdt {
1617            // For CRDT, accumulate and merge values from all L0 layers
1618            let l0_val = self.accumulate_crdt_from_l0(vid, prop, ctx)?;
1619            return self.finalize_crdt_lookup(vid, prop, l0_val).await;
1620        }
1621
1622        // 4. Non-CRDT: Check L0 chain for property (returns first found)
1623        if let Some(val) = l0_visibility::lookup_vertex_prop(vid, prop, ctx) {
1624            return Ok(val);
1625        }
1626
1627        // 5. Check Cache (if enabled)
1628        if let Some(ref cache) = self.vertex_cache {
1629            let mut cache = cache.lock().await;
1630            if let Some(val) = cache.get(&(vid, prop.to_string())) {
1631                debug!(vid = ?vid, prop, "Cache HIT");
1632                metrics::counter!("uni_property_cache_hits_total", "type" => "vertex").increment(1);
1633                return Ok(val.clone());
1634            } else {
1635                debug!(vid = ?vid, prop, "Cache MISS");
1636                metrics::counter!("uni_property_cache_misses_total", "type" => "vertex")
1637                    .increment(1);
1638            }
1639        }
1640
1641        // 6. Fetch from Storage
1642        let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1643
1644        // 7. Update Cache (if enabled)
1645        if let Some(ref cache) = self.vertex_cache {
1646            let mut cache = cache.lock().await;
1647            cache.put((vid, prop.to_string()), storage_val.clone());
1648        }
1649
1650        Ok(storage_val)
1651    }
1652
1653    /// Accumulate CRDT values from all L0 layers by merging them together.
1654    fn accumulate_crdt_from_l0(
1655        &self,
1656        vid: Vid,
1657        prop: &str,
1658        ctx: Option<&QueryContext>,
1659    ) -> Result<Value> {
1660        let mut merged = Value::Null;
1661        l0_visibility::visit_l0_buffers(ctx, |l0| {
1662            if let Some(props) = l0.vertex_properties.get(&vid)
1663                && let Some(val) = props.get(prop)
1664            {
1665                // Note: merge_crdt_values can't fail in practice for valid CRDTs
1666                if let Ok(new_merged) = self.merge_crdt_values(&merged, val) {
1667                    merged = new_merged;
1668                }
1669            }
1670            false // Continue visiting all layers
1671        });
1672        Ok(merged)
1673    }
1674
1675    /// Finalize CRDT lookup by merging with cache/storage.
1676    async fn finalize_crdt_lookup(&self, vid: Vid, prop: &str, l0_val: Value) -> Result<Value> {
1677        // Check Cache (if enabled)
1678        let cached_val = if let Some(ref cache) = self.vertex_cache {
1679            let mut cache = cache.lock().await;
1680            cache.get(&(vid, prop.to_string())).cloned()
1681        } else {
1682            None
1683        };
1684
1685        if let Some(val) = cached_val {
1686            let merged = self.merge_crdt_values(&val, &l0_val)?;
1687            return Ok(merged);
1688        }
1689
1690        // Fetch from Storage
1691        let storage_val = self.fetch_prop_from_storage(vid, prop).await?;
1692
1693        // Update Cache (if enabled)
1694        if let Some(ref cache) = self.vertex_cache {
1695            let mut cache = cache.lock().await;
1696            cache.put((vid, prop.to_string()), storage_val.clone());
1697        }
1698
1699        // Merge L0 + Storage
1700        self.merge_crdt_values(&storage_val, &l0_val)
1701    }
1702
1703    async fn fetch_prop_from_storage(&self, vid: Vid, prop: &str) -> Result<Value> {
1704        // In the new storage model, VID doesn't embed label info.
1705        // We need to scan all label datasets to find the property.
1706        let schema = self.schema_manager.schema();
1707        let mut best_version: Option<u64> = None;
1708        let mut best_value: Option<Value> = None;
1709
1710        // Try VidLabelsIndex for O(1) label resolution
1711        let label_names: Vec<String> = if let Some(labels) = self.storage.get_labels_from_index(vid)
1712        {
1713            labels
1714        } else {
1715            schema.labels.keys().cloned().collect() // Fallback to full scan
1716        };
1717
1718        for label_name in &label_names {
1719            // Check if property is defined in schema for this label
1720            let prop_meta = schema
1721                .properties
1722                .get(label_name)
1723                .and_then(|props| props.get(prop));
1724
1725            // Even if property is not in schema, we still check overflow_json
1726
1727            // Query using backend scan API
1728            let base_filter = format!("_vid = {}", vid.as_u64());
1729
1730            let filter_expr = self.storage.apply_version_filter(base_filter);
1731
1732            // Always request metadata columns and overflow_json
1733            let mut columns = vec![
1734                "_deleted".to_string(),
1735                "_version".to_string(),
1736                "overflow_json".to_string(),
1737            ];
1738
1739            // Only request the property column if it's defined in schema
1740            if prop_meta.is_some() {
1741                columns.push(prop.to_string());
1742            }
1743
1744            let table_name = crate::backend::table_names::vertex_table_name(label_name);
1745            let batches: Vec<RecordBatch> = match self
1746                .storage
1747                .backend()
1748                .scan(
1749                    crate::backend::types::ScanRequest::all(&table_name)
1750                        .with_filter(&filter_expr)
1751                        .with_columns(columns),
1752                )
1753                .await
1754            {
1755                Ok(b) => b,
1756                Err(_) => continue,
1757            };
1758
1759            for batch in batches {
1760                let deleted_col = match batch
1761                    .column_by_name("_deleted")
1762                    .and_then(|c| c.as_any().downcast_ref::<BooleanArray>())
1763                {
1764                    Some(c) => c,
1765                    None => continue,
1766                };
1767                let version_col = match batch
1768                    .column_by_name("_version")
1769                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1770                {
1771                    Some(c) => c,
1772                    None => continue,
1773                };
1774                for row in 0..batch.num_rows() {
1775                    let version = version_col.value(row);
1776
1777                    if deleted_col.value(row) {
1778                        if best_version.is_none_or(|best| version >= best) {
1779                            best_version = Some(version);
1780                            best_value = None;
1781                        }
1782                        continue;
1783                    }
1784
1785                    // First try schema column if property is in schema
1786                    let mut val = None;
1787                    if let Some(meta) = prop_meta
1788                        && let Some(col) = batch.column_by_name(prop)
1789                    {
1790                        val = Some(if col.is_null(row) {
1791                            Value::Null
1792                        } else {
1793                            Self::value_from_column(col, &meta.r#type, row)?
1794                        });
1795                    }
1796
1797                    // If not in schema column, check overflow_json
1798                    if val.is_none()
1799                        && let Some(overflow_props) =
1800                            Self::extract_overflow_properties(&batch, row)?
1801                        && let Some(overflow_val) = overflow_props.get(prop)
1802                    {
1803                        val = Some(overflow_val.clone());
1804                    }
1805
1806                    // If we found a value (from schema or overflow), merge it
1807                    if let Some(v) = val {
1808                        if let Some(meta) = prop_meta {
1809                            // Use schema type for merging (handles CRDT)
1810                            self.merge_prop_value(
1811                                v,
1812                                version,
1813                                &meta.r#type,
1814                                &mut best_version,
1815                                &mut best_value,
1816                            )?;
1817                        } else {
1818                            // Overflow property: use simple LWW merging
1819                            if best_version.is_none_or(|best| version >= best) {
1820                                best_version = Some(version);
1821                                best_value = Some(v);
1822                            }
1823                        }
1824                    }
1825                }
1826            }
1827        }
1828
1829        // Fallback to main-table props_json for unknown/schemaless labels —
1830        // their rows have no per-label table at all (mirrors
1831        // `fetch_all_props_from_storage`). Gated on "no per-label verdict"
1832        // (neither a live value nor a tombstone was seen), so a per-label
1833        // tombstone is never overridden by an older main-table row.
1834        if best_value.is_none()
1835            && best_version.is_none()
1836            && let Some(main_props) = MainVertexDataset::find_props_by_vid(
1837                self.storage.backend(),
1838                vid,
1839                self.storage.version_high_water_mark(),
1840            )
1841            .await?
1842        {
1843            return Ok(main_props.get(prop).cloned().unwrap_or(Value::Null));
1844        }
1845
1846        Ok(best_value.unwrap_or(Value::Null))
1847    }
1848
1849    /// Decode an Arrow column value with strict CRDT error handling.
1850    pub fn value_from_column(col: &dyn Array, data_type: &DataType, row: usize) -> Result<Value> {
1851        crate::storage::value_codec::decode_column_value(
1852            col,
1853            data_type,
1854            row,
1855            CrdtDecodeMode::Strict,
1856        )
1857    }
1858
1859    /// Merge two `Value`-wrapped CRDT operands.
1860    ///
1861    /// Routes through [`uni_crdt::Crdt::merge_via_registry`] using the
1862    /// `PropertyManager`'s `plugin_registry`. With an empty registry
1863    /// (the legacy 3-arg [`Self::new`] default) `merge_via_registry`
1864    /// falls back to `Crdt::try_merge`, preserving native semantics.
1865    ///
1866    /// # Errors
1867    ///
1868    /// Returns an `anyhow::Error` when either operand is malformed
1869    /// CRDT JSON, the variants disagree, or the registry-dispatched
1870    /// merge surfaces a `CrdtError`.
1871    pub fn merge_crdt_values(&self, a: &Value, b: &Value) -> Result<Value> {
1872        // Handle the case where values are JSON strings containing CRDT JSON
1873        // (this happens when values come from Cypher CREATE statements)
1874        // Parse before checking for null to ensure proper format conversion
1875        if a.is_null() {
1876            return Self::parse_crdt_value(b).map(Value::from);
1877        }
1878        if b.is_null() {
1879            return Self::parse_crdt_value(a).map(Value::from);
1880        }
1881
1882        let a_parsed = Self::parse_crdt_value(a)?;
1883        let b_parsed = Self::parse_crdt_value(b)?;
1884
1885        let mut crdt_a: Crdt = serde_json::from_value(a_parsed)?;
1886        let crdt_b: Crdt = serde_json::from_value(b_parsed)?;
1887        // M10 follow-up: route through `merge_via_registry` so a
1888        // hot-reloaded `CrdtKindProvider` plugin can intercept the
1889        // merge. With an empty registry (the 3-arg `new()` default)
1890        // this falls back to `Crdt::try_merge`, preserving prior
1891        // behavior bit-for-bit.
1892        crdt_a
1893            .merge_via_registry(&crdt_b, &self.plugin_registry)
1894            .map_err(|e| anyhow::anyhow!("{e}"))?;
1895        Ok(Value::from(serde_json::to_value(crdt_a)?))
1896    }
1897
1898    /// Parse a CRDT value that may be either a JSON object or a JSON string containing JSON.
1899    /// Returns `serde_json::Value` for internal CRDT processing.
1900    fn parse_crdt_value(val: &Value) -> Result<serde_json::Value> {
1901        if let Value::String(s) = val {
1902            // Value is a JSON string - parse the string content as JSON
1903            serde_json::from_str(s).map_err(|e| anyhow!("Failed to parse CRDT JSON string: {}", e))
1904        } else {
1905            // Convert uni_common::Value to serde_json::Value for CRDT processing
1906            Ok(serde_json::Value::from(val.clone()))
1907        }
1908    }
1909
1910    /// Merge a property value based on version, handling CRDT vs LWW semantics.
1911    fn merge_prop_value(
1912        &self,
1913        val: Value,
1914        version: u64,
1915        data_type: &DataType,
1916        best_version: &mut Option<u64>,
1917        best_value: &mut Option<Value>,
1918    ) -> Result<()> {
1919        if let DataType::Crdt(_) = data_type {
1920            self.merge_crdt_prop_value(val, version, best_version, best_value)
1921        } else {
1922            // Standard LWW
1923            if best_version.is_none_or(|best| version >= best) {
1924                *best_version = Some(version);
1925                *best_value = Some(val);
1926            }
1927            Ok(())
1928        }
1929    }
1930
1931    /// Merge CRDT property values across versions (CRDTs merge regardless of version).
1932    fn merge_crdt_prop_value(
1933        &self,
1934        val: Value,
1935        version: u64,
1936        best_version: &mut Option<u64>,
1937        best_value: &mut Option<Value>,
1938    ) -> Result<()> {
1939        if best_version.is_none_or(|best| version > best) {
1940            // Newer version: merge with existing if present
1941            if let Some(existing) = best_value.take() {
1942                *best_value = Some(self.merge_crdt_values(&val, &existing)?);
1943            } else {
1944                *best_value = Some(val);
1945            }
1946            *best_version = Some(version);
1947        } else if Some(version) == *best_version {
1948            // Same version: merge
1949            let existing = best_value.get_or_insert(Value::Null);
1950            *existing = self.merge_crdt_values(existing, &val)?;
1951        } else {
1952            // Older version: still merge for CRDTs
1953            if let Some(existing) = best_value.as_mut() {
1954                *existing = self.merge_crdt_values(existing, &val)?;
1955            }
1956        }
1957        Ok(())
1958    }
1959}