Skip to main content

uni_store/runtime/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::id_allocator::IdAllocator;
6use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
7use crate::runtime::l0_manager::L0Manager;
8use crate::runtime::property_manager::PropertyManager;
9use crate::runtime::wal::WriteAheadLog;
10use crate::storage::adjacency_manager::AdjacencyManager;
11use crate::storage::delta::{L1Entry, Op};
12use crate::storage::main_edge::MainEdgeDataset;
13use crate::storage::main_vertex::MainVertexDataset;
14use crate::storage::manager::StorageManager;
15use anyhow::{Result, anyhow};
16use chrono::Utc;
17use futures::TryStreamExt;
18use metrics;
19use parking_lot::RwLock;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use tracing::{debug, info, instrument};
23use uni_common::Properties;
24use uni_common::Value;
25use uni_common::config::UniConfig;
26use uni_common::core::id::{Eid, Vid};
27use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
28use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
29use uni_xervo::runtime::ModelRuntime;
30use uuid::Uuid;
31
32#[derive(Clone, Debug)]
33pub struct WriterConfig {
34    pub max_mutations: usize,
35}
36
37impl Default for WriterConfig {
38    fn default() -> Self {
39        Self {
40            max_mutations: 10_000,
41        }
42    }
43}
44
45pub struct Writer {
46    pub l0_manager: Arc<L0Manager>,
47    pub storage: Arc<StorageManager>,
48    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
49    pub allocator: Arc<IdAllocator>,
50    pub config: UniConfig,
51    pub xervo_runtime: Option<Arc<ModelRuntime>>,
52    pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
53    /// Property manager for cache invalidation after flush
54    pub property_manager: Option<Arc<PropertyManager>>,
55    /// Adjacency manager for dual-write (edges survive flush).
56    adjacency_manager: Arc<AdjacencyManager>,
57    /// Timestamp of last flush or creation
58    last_flush_time: std::time::Instant,
59    /// Background compaction task handle (prevents concurrent compaction races)
60    compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
61}
62
63impl Writer {
64    pub async fn new(
65        storage: Arc<StorageManager>,
66        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
67        start_version: u64,
68    ) -> Result<Self> {
69        Self::new_with_config(
70            storage,
71            schema_manager,
72            start_version,
73            UniConfig::default(),
74            None,
75            None,
76        )
77        .await
78    }
79
80    pub async fn new_with_config(
81        storage: Arc<StorageManager>,
82        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
83        start_version: u64,
84        config: UniConfig,
85        wal: Option<Arc<WriteAheadLog>>,
86        allocator: Option<Arc<IdAllocator>>,
87    ) -> Result<Self> {
88        let allocator = if let Some(a) = allocator {
89            a
90        } else {
91            let store = storage.store();
92            let path = object_store::path::Path::from("id_allocator.json");
93            Arc::new(IdAllocator::new(store, path, 1000).await?)
94        };
95
96        let l0_manager = Arc::new(L0Manager::new(start_version, wal));
97
98        let property_manager = Some(Arc::new(PropertyManager::new(
99            storage.clone(),
100            schema_manager.clone(),
101            1000,
102        )));
103
104        let adjacency_manager = storage.adjacency_manager();
105
106        Ok(Self {
107            l0_manager,
108            storage,
109            schema_manager,
110            allocator,
111            config,
112            xervo_runtime: None,
113            transaction_l0: None,
114            property_manager,
115            adjacency_manager,
116            last_flush_time: std::time::Instant::now(),
117            compaction_handle: Arc::new(RwLock::new(None)),
118        })
119    }
120
121    /// Replay WAL mutations into the current L0 buffer.
122    pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
123        let l0 = self.l0_manager.get_current();
124        let wal = l0.read().wal.clone();
125
126        if let Some(wal) = wal {
127            wal.initialize().await?;
128            let mutations = wal.replay_since(wal_high_water_mark).await?;
129            let count = mutations.len();
130
131            if count > 0 {
132                log::info!(
133                    "Replaying {} mutations from WAL (LSN > {})",
134                    count,
135                    wal_high_water_mark
136                );
137                let mut l0_guard = l0.write();
138                l0_guard.replay_mutations(mutations)?;
139            }
140
141            Ok(count)
142        } else {
143            Ok(0)
144        }
145    }
146
147    /// Allocates the next VID (pure auto-increment).
148    pub async fn next_vid(&self) -> Result<Vid> {
149        self.allocator.allocate_vid().await
150    }
151
152    /// Allocates multiple VIDs at once for bulk operations.
153    /// This is more efficient than calling next_vid() in a loop.
154    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
155        self.allocator.allocate_vids(count).await
156    }
157
158    /// Allocates the next EID (pure auto-increment).
159    pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
160        self.allocator.allocate_eid().await
161    }
162
163    pub fn set_xervo_runtime(&mut self, runtime: Arc<ModelRuntime>) {
164        self.xervo_runtime = Some(runtime);
165    }
166
167    pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
168        self.xervo_runtime.clone()
169    }
170
171    pub fn begin_transaction(&mut self) -> Result<()> {
172        if self.transaction_l0.is_some() {
173            return Err(anyhow!("Transaction already active"));
174        }
175        let current_version = self.l0_manager.get_current().read().current_version;
176        // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
177        self.transaction_l0 = Some(Arc::new(RwLock::new(L0Buffer::new(current_version, None))));
178        Ok(())
179    }
180
181    /// Returns the active L0 buffer: the transaction L0 if a transaction is open,
182    /// otherwise the current L0 from the manager.
183    fn active_l0(&self) -> Arc<RwLock<L0Buffer>> {
184        self.transaction_l0
185            .clone()
186            .unwrap_or_else(|| self.l0_manager.get_current())
187    }
188
189    fn update_metrics(&self) {
190        let l0 = self.l0_manager.get_current();
191        let size = l0.read().estimated_size;
192        metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
193
194        if let Some(tx_l0) = &self.transaction_l0 {
195            metrics::gauge!("active_transactions").set(1.0);
196            let tx_size = tx_l0.read().estimated_size;
197            metrics::gauge!("transaction_l0_size_bytes").set(tx_size as f64);
198        } else {
199            metrics::gauge!("active_transactions").set(0.0);
200            metrics::gauge!("transaction_l0_size_bytes").set(0.0);
201        }
202    }
203
204    pub async fn commit_transaction(&mut self) -> Result<()> {
205        // 1. Borrow transaction L0 - keep available for rollback if commit fails
206        let tx_l0_arc = self
207            .transaction_l0
208            .as_ref()
209            .ok_or_else(|| anyhow!("No active transaction"))?
210            .clone();
211
212        // 2. Write transaction mutations to WAL BEFORE merging into main L0
213        // This ensures durability before visibility.
214        // Note: WAL is optional - if present, we get durability; if absent, we skip this step.
215        {
216            let tx_l0 = tx_l0_arc.read();
217            let main_l0_arc = self.l0_manager.get_current();
218            let main_l0 = main_l0_arc.read();
219
220            // If WAL exists, write mutations to it for durability
221            if let Some(wal) = main_l0.wal.as_ref() {
222                // Append all transaction mutations to WAL
223                // Order: vertices first, then edges (to ensure src/dst exist on replay)
224
225                // Vertex insertions
226                for (vid, properties) in &tx_l0.vertex_properties {
227                    if !tx_l0.vertex_tombstones.contains(vid) {
228                        let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
229                        wal.append(&crate::runtime::wal::Mutation::InsertVertex {
230                            vid: *vid,
231                            properties: properties.clone(),
232                            labels,
233                        })?;
234                    }
235                }
236
237                // Vertex deletions
238                for vid in &tx_l0.vertex_tombstones {
239                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
240                    wal.append(&crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
241                }
242
243                // Edge insertions and deletions
244                for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
245                    if tx_l0.tombstones.contains_key(eid) {
246                        // Edge deletion
247                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
248                        wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
249                            eid: *eid,
250                            src_vid: *src_vid,
251                            dst_vid: *dst_vid,
252                            edge_type: *edge_type,
253                            version,
254                        })?;
255                    } else {
256                        // Edge insertion
257                        let properties =
258                            tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
259                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
260                        let edge_type_name = tx_l0.edge_types.get(eid).cloned();
261                        wal.append(&crate::runtime::wal::Mutation::InsertEdge {
262                            src_vid: *src_vid,
263                            dst_vid: *dst_vid,
264                            edge_type: *edge_type,
265                            eid: *eid,
266                            version,
267                            properties,
268                            edge_type_name,
269                        })?;
270                    }
271                }
272            }
273        }
274
275        // 3. Flush WAL to durable storage - THIS IS THE COMMIT POINT
276        // If this fails, transaction remains active and can be retried or rolled back.
277        self.flush_wal().await?;
278
279        // 4. Now that mutations are durable, merge into main L0 and make visible
280        {
281            let tx_l0 = tx_l0_arc.read();
282            let main_l0_arc = self.l0_manager.get_current();
283            let mut main_l0 = main_l0_arc.write();
284            main_l0.merge(&tx_l0)?;
285
286            // Replay transaction edges into the AdjacencyManager overlay so they
287            // become visible to queries that read from the AM (post-migration path).
288            // Use per-edge versions from tx_l0.edge_versions, falling back to current version.
289            for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
290                let edge_version = tx_l0
291                    .edge_versions
292                    .get(eid)
293                    .copied()
294                    .unwrap_or(main_l0.current_version);
295                if tx_l0.tombstones.contains_key(eid) {
296                    self.adjacency_manager
297                        .add_tombstone(*eid, *src, *dst, *etype, edge_version);
298                } else {
299                    self.adjacency_manager
300                        .insert_edge(*src, *dst, *eid, *etype, edge_version);
301                }
302            }
303        }
304
305        self.update_metrics();
306
307        // 5. Clear transaction (all critical steps succeeded)
308        self.transaction_l0 = None;
309
310        // 6. check_flush is best-effort compaction, not critical
311        if let Err(e) = self.check_flush().await {
312            tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
313        }
314
315        Ok(())
316    }
317
318    /// Flush the WAL buffer to durable storage.
319    pub async fn flush_wal(&self) -> Result<()> {
320        let l0 = self.l0_manager.get_current();
321        let wal = l0.read().wal.clone();
322
323        if let Some(wal) = wal {
324            wal.flush().await?;
325        }
326        Ok(())
327    }
328
329    pub fn rollback_transaction(&mut self) -> Result<()> {
330        // Idempotent: no error if already cleared (commit succeeded or Drop already cleaned up)
331        self.transaction_l0 = None;
332        Ok(())
333    }
334
335    /// Force-rollback any active transaction. Safe to call if no transaction active.
336    /// Used by Transaction::Drop for cleanup.
337    pub fn force_rollback(&mut self) {
338        if self.transaction_l0.take().is_some() {
339            tracing::warn!("Force-rolled back leaked transaction");
340        }
341    }
342
343    /// Validates vertex constraints for the given properties.
344    /// In the new design, label is passed as a parameter since VID no longer embeds label.
345    async fn validate_vertex_constraints_for_label(
346        &self,
347        vid: Vid,
348        properties: &Properties,
349        label: &str,
350    ) -> Result<()> {
351        let schema = self.schema_manager.schema();
352
353        {
354            // 1. Check NOT NULL constraints (from Property definitions)
355            if let Some(props_meta) = schema.properties.get(label) {
356                for (prop_name, meta) in props_meta {
357                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
358                        log::warn!(
359                            "Constraint violation: Property '{}' cannot be null for label '{}'",
360                            prop_name,
361                            label
362                        );
363                        return Err(anyhow!(
364                            "Constraint violation: Property '{}' cannot be null",
365                            prop_name
366                        ));
367                    }
368                }
369            }
370
371            // 2. Check Explicit Constraints (Unique, Check, etc.)
372            for constraint in &schema.constraints {
373                if !constraint.enabled {
374                    continue;
375                }
376                match &constraint.target {
377                    ConstraintTarget::Label(l) if l == label => {}
378                    _ => continue,
379                }
380
381                match &constraint.constraint_type {
382                    ConstraintType::Unique {
383                        properties: unique_props,
384                    } => {
385                        // Support single and multi-property unique constraints
386                        if !unique_props.is_empty() {
387                            let mut key_values = Vec::new();
388                            let mut missing = false;
389                            for prop in unique_props {
390                                if let Some(val) = properties.get(prop) {
391                                    key_values.push((prop.clone(), val.clone()));
392                                } else {
393                                    missing = true; // Can't enforce if property missing (partial update?)
394                                    // For INSERT, missing means null?
395                                    // If property is nullable, unique constraint typically allows multiple nulls or ignores?
396                                    // For now, only check if ALL keys are present
397                                }
398                            }
399
400                            if !missing {
401                                self.check_unique_constraint_multi(label, &key_values, vid)
402                                    .await?;
403                            }
404                        }
405                    }
406                    ConstraintType::Exists { property } => {
407                        if properties.get(property).is_none_or(|v| v.is_null()) {
408                            log::warn!(
409                                "Constraint violation: Property '{}' must exist for label '{}'",
410                                property,
411                                label
412                            );
413                            return Err(anyhow!(
414                                "Constraint violation: Property '{}' must exist",
415                                property
416                            ));
417                        }
418                    }
419                    ConstraintType::Check { expression } => {
420                        if !self.evaluate_check_constraint(expression, properties)? {
421                            return Err(anyhow!(
422                                "CHECK constraint '{}' violated: expression '{}' evaluated to false",
423                                constraint.name,
424                                expression
425                            ));
426                        }
427                    }
428                    _ => {
429                        return Err(anyhow!("Unsupported constraint type"));
430                    }
431                }
432            }
433        }
434        Ok(())
435    }
436
437    /// Validates vertex constraints for a vertex with the given labels.
438    /// Labels must be passed explicitly since the vertex may not yet be in L0.
439    /// Unknown labels (not in schema) are skipped.
440    async fn validate_vertex_constraints(
441        &self,
442        vid: Vid,
443        properties: &Properties,
444        labels: &[String],
445    ) -> Result<()> {
446        let schema = self.schema_manager.schema();
447
448        // Validate constraints only for known labels
449        for label in labels {
450            // Skip unknown labels (schemaless support)
451            if schema.get_label_case_insensitive(label).is_none() {
452                continue;
453            }
454            self.validate_vertex_constraints_for_label(vid, properties, label)
455                .await?;
456        }
457
458        // Check global ext_id uniqueness if ext_id is provided
459        if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
460            self.check_extid_globally_unique(ext_id, vid).await?;
461        }
462
463        Ok(())
464    }
465
466    /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
467    ///
468    /// Used to build a constraint key index from L0 buffers for batch validation.
469    fn collect_constraint_keys_from_properties<'a>(
470        properties_iter: impl Iterator<Item = &'a Properties>,
471        label: &str,
472        constraints: &[uni_common::core::schema::Constraint],
473        existing_keys: &mut HashMap<String, HashSet<String>>,
474        existing_extids: &mut HashSet<String>,
475    ) {
476        for props in properties_iter {
477            if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
478                existing_extids.insert(ext_id.to_string());
479            }
480
481            for constraint in constraints {
482                if !constraint.enabled {
483                    continue;
484                }
485                if let ConstraintTarget::Label(l) = &constraint.target {
486                    if l != label {
487                        continue;
488                    }
489                } else {
490                    continue;
491                }
492
493                if let ConstraintType::Unique {
494                    properties: unique_props,
495                } = &constraint.constraint_type
496                {
497                    let mut key_parts = Vec::new();
498                    let mut all_present = true;
499                    for prop in unique_props {
500                        if let Some(val) = props.get(prop) {
501                            key_parts.push(format!("{}:{}", prop, val));
502                        } else {
503                            all_present = false;
504                            break;
505                        }
506                    }
507                    if all_present {
508                        let key = key_parts.join("|");
509                        existing_keys
510                            .entry(constraint.name.clone())
511                            .or_default()
512                            .insert(key);
513                    }
514                }
515            }
516        }
517    }
518
519    /// Validates constraints for a batch of vertices efficiently.
520    ///
521    /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
522    /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
523    ///
524    /// # Arguments
525    /// * `vids` - VIDs of vertices being inserted
526    /// * `properties_batch` - Properties for each vertex
527    /// * `label` - Label for all vertices (assumes single label for now)
528    ///
529    /// # Performance
530    /// For N vertices with unique constraints:
531    /// - Old approach: O(N²) - scan L0 buffer N times
532    /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
533    async fn validate_vertex_batch_constraints(
534        &self,
535        vids: &[Vid],
536        properties_batch: &[Properties],
537        label: &str,
538    ) -> Result<()> {
539        if vids.len() != properties_batch.len() {
540            return Err(anyhow!("VID/properties length mismatch"));
541        }
542
543        let schema = self.schema_manager.schema();
544
545        // 1. Validate NOT NULL constraints for each vertex
546        if let Some(props_meta) = schema.properties.get(label) {
547            for (idx, properties) in properties_batch.iter().enumerate() {
548                for (prop_name, meta) in props_meta {
549                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
550                        return Err(anyhow!(
551                            "Constraint violation at index {}: Property '{}' cannot be null",
552                            idx,
553                            prop_name
554                        ));
555                    }
556                }
557            }
558        }
559
560        // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
561        let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
562        let mut existing_extids: HashSet<String> = HashSet::new();
563
564        // Scan current L0 buffer
565        {
566            let l0 = self.l0_manager.get_current();
567            let l0_guard = l0.read();
568            Self::collect_constraint_keys_from_properties(
569                l0_guard.vertex_properties.values(),
570                label,
571                &schema.constraints,
572                &mut existing_keys,
573                &mut existing_extids,
574            );
575        }
576
577        // Scan transaction L0 if present
578        if let Some(tx_l0) = &self.transaction_l0 {
579            let tx_l0_guard = tx_l0.read();
580            Self::collect_constraint_keys_from_properties(
581                tx_l0_guard.vertex_properties.values(),
582                label,
583                &schema.constraints,
584                &mut existing_keys,
585                &mut existing_extids,
586            );
587        }
588
589        // 3. Check batch vertices against index AND check for duplicates within batch
590        let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
591        let mut batch_extids: HashMap<String, usize> = HashMap::new();
592
593        for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
594            // Check ext_id uniqueness
595            if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
596                if existing_extids.contains(ext_id) {
597                    return Err(anyhow!(
598                        "Constraint violation at index {}: ext_id '{}' already exists",
599                        idx,
600                        ext_id
601                    ));
602                }
603                if let Some(first_idx) = batch_extids.get(ext_id) {
604                    return Err(anyhow!(
605                        "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
606                        ext_id,
607                        first_idx,
608                        idx
609                    ));
610                }
611                batch_extids.insert(ext_id.to_string(), idx);
612            }
613
614            // Check unique constraints
615            for constraint in &schema.constraints {
616                if !constraint.enabled {
617                    continue;
618                }
619                if let ConstraintTarget::Label(l) = &constraint.target {
620                    if l != label {
621                        continue;
622                    }
623                } else {
624                    continue;
625                }
626
627                match &constraint.constraint_type {
628                    ConstraintType::Unique {
629                        properties: unique_props,
630                    } => {
631                        let mut key_parts = Vec::new();
632                        let mut all_present = true;
633                        for prop in unique_props {
634                            if let Some(val) = properties.get(prop) {
635                                key_parts.push(format!("{}:{}", prop, val));
636                            } else {
637                                all_present = false;
638                                break;
639                            }
640                        }
641
642                        if all_present {
643                            let key = key_parts.join("|");
644
645                            // Check against existing L0 keys
646                            if let Some(keys) = existing_keys.get(&constraint.name)
647                                && keys.contains(&key)
648                            {
649                                return Err(anyhow!(
650                                    "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
651                                    idx,
652                                    label,
653                                    constraint.name
654                                ));
655                            }
656
657                            // Check for duplicates within batch
658                            let batch_constraint_keys =
659                                batch_keys.entry(constraint.name.clone()).or_default();
660                            if let Some(first_idx) = batch_constraint_keys.get(&key) {
661                                return Err(anyhow!(
662                                    "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
663                                    key,
664                                    first_idx,
665                                    idx
666                                ));
667                            }
668                            batch_constraint_keys.insert(key, idx);
669                        }
670                    }
671                    ConstraintType::Exists { property } => {
672                        if properties.get(property).is_none_or(|v| v.is_null()) {
673                            return Err(anyhow!(
674                                "Constraint violation at index {}: Property '{}' must exist",
675                                idx,
676                                property
677                            ));
678                        }
679                    }
680                    ConstraintType::Check { expression } => {
681                        if !self.evaluate_check_constraint(expression, properties)? {
682                            return Err(anyhow!(
683                                "Constraint violation at index {}: CHECK constraint '{}' violated",
684                                idx,
685                                constraint.name
686                            ));
687                        }
688                    }
689                    _ => {}
690                }
691            }
692        }
693
694        // 4. Check storage for unique constraints (can batch this into a single query)
695        for constraint in &schema.constraints {
696            if !constraint.enabled {
697                continue;
698            }
699            if let ConstraintTarget::Label(l) = &constraint.target {
700                if l != label {
701                    continue;
702                }
703            } else {
704                continue;
705            }
706
707            if let ConstraintType::Unique {
708                properties: unique_props,
709            } = &constraint.constraint_type
710            {
711                // Build compound OR filter for all batch vertices
712                let mut or_filters = Vec::new();
713                for properties in properties_batch.iter() {
714                    let mut and_parts = Vec::new();
715                    let mut all_present = true;
716                    for prop in unique_props {
717                        if let Some(val) = properties.get(prop) {
718                            let val_str = match val {
719                                Value::String(s) => format!("'{}'", s.replace('\'', "''")),
720                                Value::Int(n) => n.to_string(),
721                                Value::Float(f) => f.to_string(),
722                                Value::Bool(b) => b.to_string(),
723                                _ => {
724                                    all_present = false;
725                                    break;
726                                }
727                            };
728                            and_parts.push(format!("{} = {}", prop, val_str));
729                        } else {
730                            all_present = false;
731                            break;
732                        }
733                    }
734                    if all_present {
735                        or_filters.push(format!("({})", and_parts.join(" AND ")));
736                    }
737                }
738
739                if !or_filters.is_empty() {
740                    let vid_list: Vec<String> =
741                        vids.iter().map(|v| v.as_u64().to_string()).collect();
742                    let filter = format!(
743                        "({}) AND _deleted = false AND _vid NOT IN ({})",
744                        or_filters.join(" OR "),
745                        vid_list.join(", ")
746                    );
747
748                    if let Ok(ds) = self.storage.vertex_dataset(label)
749                        && let Ok(lance_ds) = ds.open_raw().await
750                    {
751                        let count = lance_ds.count_rows(Some(filter.clone())).await?;
752                        if count > 0 {
753                            return Err(anyhow!(
754                                "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
755                                label,
756                                constraint.name
757                            ));
758                        }
759                    }
760                }
761            }
762        }
763
764        Ok(())
765    }
766
767    /// Checks that ext_id is globally unique across all vertices.
768    ///
769    /// Searches L0 buffers (current, transaction, pending) and the main vertices table
770    /// to ensure no other vertex uses this ext_id.
771    ///
772    /// # Errors
773    ///
774    /// Returns error if another vertex with the same ext_id exists.
775    async fn check_extid_globally_unique(&self, ext_id: &str, current_vid: Vid) -> Result<()> {
776        // Check L0 buffers: current, transaction, and pending flush
777        let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
778            let mut buffers = vec![self.l0_manager.get_current()];
779            if let Some(tx_l0) = &self.transaction_l0 {
780                buffers.push(tx_l0.clone());
781            }
782            buffers.extend(self.l0_manager.get_pending_flush());
783            buffers
784        };
785
786        for l0 in &l0_buffers_to_check {
787            if let Some(vid) =
788                Self::find_extid_in_properties(&l0.read().vertex_properties, ext_id, current_vid)
789            {
790                return Err(anyhow!(
791                    "Constraint violation: ext_id '{}' already exists (vertex {:?})",
792                    ext_id,
793                    vid
794                ));
795            }
796        }
797
798        // Check main vertices table (if it exists)
799        // Pass None for global uniqueness check (not snapshot-isolated)
800        let lancedb = self.storage.lancedb_store();
801        if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(lancedb, ext_id, None).await
802            && found_vid != current_vid
803        {
804            return Err(anyhow!(
805                "Constraint violation: ext_id '{}' already exists (vertex {:?})",
806                ext_id,
807                found_vid
808            ));
809        }
810
811        Ok(())
812    }
813
814    /// Search vertex properties for a duplicate ext_id, excluding `current_vid`.
815    fn find_extid_in_properties(
816        vertex_properties: &HashMap<Vid, Properties>,
817        ext_id: &str,
818        current_vid: Vid,
819    ) -> Option<Vid> {
820        vertex_properties.iter().find_map(|(&vid, props)| {
821            if vid != current_vid && props.get("ext_id").and_then(|v| v.as_str()) == Some(ext_id) {
822                Some(vid)
823            } else {
824                None
825            }
826        })
827    }
828
829    /// Helper to get vertex labels from L0 buffer.
830    fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
831        let l0 = self.l0_manager.get_current();
832        let l0_guard = l0.read();
833        // Check if vertex is tombstoned (deleted) - if so, return None
834        if l0_guard.vertex_tombstones.contains(&vid) {
835            return None;
836        }
837        l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
838    }
839
840    /// Get vertex labels from all sources: current L0, pending L0s, and storage.
841    /// This is the proper way to read vertex labels after a flush, as it checks both
842    /// in-memory buffers and persisted storage.
843    pub async fn get_vertex_labels(&self, vid: Vid) -> Option<Vec<String>> {
844        // 1. Check current L0
845        if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
846            return Some(labels);
847        }
848
849        // 2. Check transaction L0 if present
850        if let Some(tx_l0) = &self.transaction_l0 {
851            let guard = tx_l0.read();
852            if guard.vertex_tombstones.contains(&vid) {
853                return None;
854            }
855            if let Some(labels) = guard.get_vertex_labels(vid) {
856                return Some(labels.to_vec());
857            }
858        }
859
860        // 3. Check pending flush L0s
861        for pending_l0 in self.l0_manager.get_pending_flush() {
862            let guard = pending_l0.read();
863            if guard.vertex_tombstones.contains(&vid) {
864                return None;
865            }
866            if let Some(labels) = guard.get_vertex_labels(vid) {
867                return Some(labels.to_vec());
868            }
869        }
870
871        // 4. Check storage
872        self.find_vertex_labels_in_storage(vid).await.ok().flatten()
873    }
874
875    /// Helper to get edge type from L0 buffer.
876    fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
877        let l0 = self.l0_manager.get_current();
878        let l0_guard = l0.read();
879        l0_guard.get_edge_type(eid).map(|s| s.to_string())
880    }
881
882    /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
883    /// Falls back to the transaction L0 if available.
884    pub fn get_edge_type_id_from_l0(&self, eid: Eid) -> Option<u32> {
885        // Check transaction L0 first
886        if let Some(tx_l0) = &self.transaction_l0 {
887            let guard = tx_l0.read();
888            if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
889                return Some(etype);
890            }
891        }
892        // Fall back to main L0
893        let l0 = self.l0_manager.get_current();
894        let l0_guard = l0.read();
895        l0_guard
896            .get_edge_endpoint_full(eid)
897            .map(|(_, _, etype)| etype)
898    }
899
900    /// Set the type name for an edge (used for schemaless edge types).
901    /// This is called during CREATE for edge types not found in the schema.
902    pub fn set_edge_type(&self, eid: Eid, type_name: String) {
903        self.active_l0().write().set_edge_type(eid, type_name);
904    }
905
906    /// Evaluate a simple CHECK constraint expression.
907    /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
908    fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
909        let parts: Vec<&str> = expression.split_whitespace().collect();
910        if parts.len() != 3 {
911            // For now, only support "prop op val"
912            // Fallback to true if too complex to avoid breaking, but warn
913            log::warn!(
914                "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
915                expression
916            );
917            return Ok(true);
918        }
919
920        let prop_part = parts[0].trim_start_matches('(');
921        // Handle "variable.property" format - take the part after the dot
922        let prop_name = if let Some(idx) = prop_part.find('.') {
923            &prop_part[idx + 1..]
924        } else {
925            prop_part
926        };
927
928        let op = parts[1];
929        let val_str = parts[2].trim_end_matches(')');
930
931        let prop_val = match properties.get(prop_name) {
932            Some(v) => v,
933            None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
934        };
935
936        // Parse value string (handle quotes for strings)
937        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
938            || (val_str.starts_with('"') && val_str.ends_with('"'))
939        {
940            Value::String(val_str[1..val_str.len() - 1].to_string())
941        } else if let Ok(n) = val_str.parse::<i64>() {
942            Value::Int(n)
943        } else if let Ok(n) = val_str.parse::<f64>() {
944            Value::Float(n)
945        } else if let Ok(b) = val_str.parse::<bool>() {
946            Value::Bool(b)
947        } else {
948            // Check for internal format wrappers if they somehow leaked through
949            if val_str.starts_with("Number(") && val_str.ends_with(')') {
950                let n_str = &val_str[7..val_str.len() - 1];
951                if let Ok(n) = n_str.parse::<i64>() {
952                    Value::Int(n)
953                } else if let Ok(n) = n_str.parse::<f64>() {
954                    Value::Float(n)
955                } else {
956                    Value::String(val_str.to_string())
957                }
958            } else {
959                Value::String(val_str.to_string())
960            }
961        };
962
963        match op {
964            "=" | "==" => Ok(prop_val == &target_val),
965            "!=" | "<>" => Ok(prop_val != &target_val),
966            ">" => self
967                .compare_values(prop_val, &target_val)
968                .map(|o| o.is_gt()),
969            "<" => self
970                .compare_values(prop_val, &target_val)
971                .map(|o| o.is_lt()),
972            ">=" => self
973                .compare_values(prop_val, &target_val)
974                .map(|o| o.is_ge()),
975            "<=" => self
976                .compare_values(prop_val, &target_val)
977                .map(|o| o.is_le()),
978            _ => {
979                log::warn!("Unsupported operator '{}' in CHECK constraint", op);
980                Ok(true)
981            }
982        }
983    }
984
985    fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
986        use std::cmp::Ordering;
987
988        fn cmp_f64(x: f64, y: f64) -> Ordering {
989            x.partial_cmp(&y).unwrap_or(Ordering::Equal)
990        }
991
992        match (a, b) {
993            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
994            (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
995            (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
996            (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
997            (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
998            _ => Err(anyhow!(
999                "Cannot compare incompatible types: {:?} vs {:?}",
1000                a,
1001                b
1002            )),
1003        }
1004    }
1005
1006    async fn check_unique_constraint_multi(
1007        &self,
1008        label: &str,
1009        key_values: &[(String, Value)],
1010        current_vid: Vid,
1011    ) -> Result<()> {
1012        // Serialize constraint key once for O(1) lookups
1013        let key = serialize_constraint_key(label, key_values);
1014
1015        // 1. Check L0 (in-memory) using O(1) constraint index
1016        {
1017            let l0 = self.l0_manager.get_current();
1018            let l0_guard = l0.read();
1019            if l0_guard.has_constraint_key(&key, current_vid) {
1020                return Err(anyhow!(
1021                    "Constraint violation: Duplicate composite key for label '{}'",
1022                    label
1023                ));
1024            }
1025        }
1026
1027        // Check Transaction L0
1028        if let Some(tx_l0) = &self.transaction_l0 {
1029            let tx_l0_guard = tx_l0.read();
1030            if tx_l0_guard.has_constraint_key(&key, current_vid) {
1031                return Err(anyhow!(
1032                    "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1033                    label
1034                ));
1035            }
1036        }
1037
1038        // 2. Check Storage (L1/L2)
1039        let filters: Vec<String> = key_values
1040            .iter()
1041            .map(|(prop, val)| {
1042                let val_str = match val {
1043                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1044                    Value::Int(n) => n.to_string(),
1045                    Value::Float(f) => f.to_string(),
1046                    Value::Bool(b) => b.to_string(),
1047                    _ => "NULL".to_string(),
1048                };
1049                format!("{} = {}", prop, val_str)
1050            })
1051            .collect();
1052
1053        let mut filter = filters.join(" AND ");
1054        filter.push_str(&format!(
1055            " AND _deleted = false AND _vid != {}",
1056            current_vid.as_u64()
1057        ));
1058
1059        if let Ok(ds) = self.storage.vertex_dataset(label)
1060            && let Ok(lance_ds) = ds.open_raw().await
1061        {
1062            let count = lance_ds.count_rows(Some(filter.clone())).await?;
1063            if count > 0 {
1064                return Err(anyhow!(
1065                    "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
1066                    label,
1067                    filter
1068                ));
1069            }
1070        }
1071
1072        Ok(())
1073    }
1074
1075    async fn check_write_pressure(&self) -> Result<()> {
1076        let status = self
1077            .storage
1078            .compaction_status()
1079            .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
1080        let l1_runs = status.l1_runs;
1081        let throttle = &self.config.throttle;
1082
1083        if l1_runs >= throttle.hard_limit {
1084            log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
1085            // Simple polling for now
1086            while self
1087                .storage
1088                .compaction_status()
1089                .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
1090                .l1_runs
1091                >= throttle.hard_limit
1092            {
1093                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1094            }
1095        } else if l1_runs >= throttle.soft_limit {
1096            let excess = l1_runs - throttle.soft_limit;
1097            // Cap multiplier to avoid overflow
1098            let excess = std::cmp::min(excess, 31);
1099            let multiplier = 2_u32.pow(excess as u32);
1100            let delay = throttle.base_delay * multiplier;
1101            tokio::time::sleep(delay).await;
1102        }
1103        Ok(())
1104    }
1105
1106    /// Check transaction memory limit to prevent OOM.
1107    /// No-op when no transaction is active.
1108    fn check_transaction_memory(&self) -> Result<()> {
1109        if let Some(tx_l0) = &self.transaction_l0 {
1110            let size = tx_l0.read().estimated_size;
1111            if size > self.config.max_transaction_memory {
1112                return Err(anyhow!(
1113                    "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
1114                     Roll back or commit the current transaction.",
1115                    size,
1116                    self.config.max_transaction_memory
1117                ));
1118            }
1119        }
1120        Ok(())
1121    }
1122
1123    async fn get_query_context(&self) -> Option<QueryContext> {
1124        Some(QueryContext::new_with_pending(
1125            self.l0_manager.get_current(),
1126            self.transaction_l0.clone(),
1127            self.l0_manager.get_pending_flush(),
1128        ))
1129    }
1130
1131    /// Prepare a vertex for upsert by merging CRDT properties with existing values.
1132    ///
1133    /// When `label` is provided, uses it directly to look up property metadata.
1134    /// Otherwise falls back to discovering the label from L0 buffers and storage.
1135    ///
1136    /// # Errors
1137    ///
1138    /// Returns an error if CRDT property merging fails.
1139    async fn prepare_vertex_upsert(
1140        &self,
1141        vid: Vid,
1142        properties: &mut Properties,
1143        label: Option<&str>,
1144    ) -> Result<()> {
1145        let Some(pm) = &self.property_manager else {
1146            return Ok(());
1147        };
1148
1149        let schema = self.schema_manager.schema();
1150
1151        // Resolve label: use provided label or discover from L0/storage
1152        let discovered_labels;
1153        let label_name = if let Some(l) = label {
1154            Some(l)
1155        } else {
1156            discovered_labels = self.get_vertex_labels(vid).await;
1157            discovered_labels
1158                .as_ref()
1159                .and_then(|l| l.first().map(|s| s.as_str()))
1160        };
1161
1162        let Some(label_str) = label_name else {
1163            return Ok(());
1164        };
1165        let Some(props_meta) = schema.properties.get(label_str) else {
1166            return Ok(());
1167        };
1168
1169        // Identify CRDT properties in the insert data
1170        let crdt_keys: Vec<String> = properties
1171            .keys()
1172            .filter(|key| {
1173                props_meta.get(*key).is_some_and(|meta| {
1174                    matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1175                })
1176            })
1177            .cloned()
1178            .collect();
1179
1180        if crdt_keys.is_empty() {
1181            return Ok(());
1182        }
1183
1184        let ctx = self.get_query_context().await;
1185        for key in crdt_keys {
1186            let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
1187            if !existing.is_null()
1188                && let Some(val) = properties.get_mut(&key)
1189            {
1190                *val = pm.merge_crdt_values(&existing, val)?;
1191            }
1192        }
1193
1194        Ok(())
1195    }
1196
1197    async fn prepare_edge_upsert(&self, eid: Eid, properties: &mut Properties) -> Result<()> {
1198        if let Some(pm) = &self.property_manager {
1199            let schema = self.schema_manager.schema();
1200            // Get edge type from L0 buffer instead of from EID
1201            let type_name = self.get_edge_type_from_l0(eid);
1202
1203            if let Some(ref t_name) = type_name
1204                && let Some(props_meta) = schema.properties.get(t_name)
1205            {
1206                let mut crdt_keys = Vec::new();
1207                for (key, _) in properties.iter() {
1208                    if let Some(meta) = props_meta.get(key)
1209                        && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1210                    {
1211                        crdt_keys.push(key.clone());
1212                    }
1213                }
1214
1215                if !crdt_keys.is_empty() {
1216                    let ctx = self.get_query_context().await;
1217                    for key in crdt_keys {
1218                        let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
1219
1220                        if !existing.is_null()
1221                            && let Some(val) = properties.get_mut(&key)
1222                        {
1223                            *val = pm.merge_crdt_values(&existing, val)?;
1224                        }
1225                    }
1226                }
1227            }
1228        }
1229        Ok(())
1230    }
1231
1232    #[instrument(skip(self, properties), level = "trace")]
1233    pub async fn insert_vertex(&mut self, vid: Vid, properties: Properties) -> Result<()> {
1234        self.insert_vertex_with_labels(vid, properties, &[]).await?;
1235        Ok(())
1236    }
1237
1238    #[instrument(skip(self, properties, labels), level = "trace")]
1239    pub async fn insert_vertex_with_labels(
1240        &mut self,
1241        vid: Vid,
1242        mut properties: Properties,
1243        labels: &[String],
1244    ) -> Result<Properties> {
1245        let start = std::time::Instant::now();
1246        self.check_write_pressure().await?;
1247        self.check_transaction_memory()?;
1248        self.process_embeddings_for_labels(labels, &mut properties)
1249            .await?;
1250        self.validate_vertex_constraints(vid, &properties, labels)
1251            .await?;
1252        self.prepare_vertex_upsert(vid, &mut properties, labels.first().map(|s| s.as_str()))
1253            .await?;
1254
1255        // Clone properties and labels before moving into L0 to return them and populate constraint index
1256        let properties_copy = properties.clone();
1257        let labels_copy = labels.to_vec();
1258
1259        {
1260            let l0 = self.active_l0();
1261            let mut l0_guard = l0.write();
1262            l0_guard.insert_vertex_with_labels(vid, properties, labels);
1263
1264            // Populate constraint index for O(1) duplicate detection
1265            let schema = self.schema_manager.schema();
1266            for label in &labels_copy {
1267                // Skip unknown labels (schemaless support)
1268                if schema.get_label_case_insensitive(label).is_none() {
1269                    continue;
1270                }
1271
1272                // For each unique constraint on this label, insert into constraint index
1273                for constraint in &schema.constraints {
1274                    if !constraint.enabled {
1275                        continue;
1276                    }
1277                    if let ConstraintTarget::Label(l) = &constraint.target {
1278                        if l != label {
1279                            continue;
1280                        }
1281                    } else {
1282                        continue;
1283                    }
1284
1285                    if let ConstraintType::Unique {
1286                        properties: unique_props,
1287                    } = &constraint.constraint_type
1288                    {
1289                        let mut key_values = Vec::new();
1290                        let mut all_present = true;
1291                        for prop in unique_props {
1292                            if let Some(val) = properties_copy.get(prop) {
1293                                key_values.push((prop.clone(), val.clone()));
1294                            } else {
1295                                all_present = false;
1296                                break;
1297                            }
1298                        }
1299
1300                        if all_present {
1301                            let key = serialize_constraint_key(label, &key_values);
1302                            l0_guard.insert_constraint_key(key, vid);
1303                        }
1304                    }
1305                }
1306            }
1307        }
1308
1309        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1310        self.update_metrics();
1311
1312        if self.transaction_l0.is_none() {
1313            self.check_flush().await?;
1314        }
1315        if start.elapsed().as_millis() > 100 {
1316            log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
1317        }
1318        Ok(properties_copy)
1319    }
1320
1321    /// Insert multiple vertices with batched operations.
1322    ///
1323    /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
1324    /// for bulk inserts with unique constraints.
1325    ///
1326    /// # Performance Improvements
1327    /// - Batch VID allocation: 1 call instead of N calls
1328    /// - Batch constraint validation: O(N) instead of O(N²)
1329    /// - Batch embedding generation: 1 API call per config instead of N calls
1330    /// - Transaction wrapping: Automatic flush deferral, atomicity
1331    ///
1332    /// # Arguments
1333    /// * `vids` - Pre-allocated VIDs for the vertices
1334    /// * `properties_batch` - Properties for each vertex
1335    /// * `labels` - Labels for all vertices (assumes single label for simplicity)
1336    ///
1337    /// # Errors
1338    /// Returns error if:
1339    /// - VID/properties length mismatch
1340    /// - Constraint violation detected
1341    /// - Embedding generation fails
1342    /// - Transaction commit fails
1343    ///
1344    /// # Atomicity
1345    /// If this method fails, all changes are rolled back (if transaction was started here).
1346    pub async fn insert_vertices_batch(
1347        &mut self,
1348        vids: Vec<Vid>,
1349        mut properties_batch: Vec<Properties>,
1350        labels: Vec<String>,
1351    ) -> Result<Vec<Properties>> {
1352        let start = std::time::Instant::now();
1353
1354        // Validate inputs
1355        if vids.len() != properties_batch.len() {
1356            return Err(anyhow!(
1357                "VID/properties size mismatch: {} vids, {} properties",
1358                vids.len(),
1359                properties_batch.len()
1360            ));
1361        }
1362
1363        if vids.is_empty() {
1364            return Ok(Vec::new());
1365        }
1366
1367        // Start transaction if not already in one
1368        let is_nested = self.transaction_l0.is_some();
1369        if !is_nested {
1370            self.begin_transaction()?;
1371        }
1372
1373        // Batch operations
1374        let result = async {
1375            self.check_write_pressure().await?;
1376            self.check_transaction_memory()?;
1377
1378            // Batch embedding generation (1 API call per config)
1379            self.process_embeddings_for_batch(&labels, &mut properties_batch)
1380                .await?;
1381
1382            // Batch constraint validation (O(N) instead of O(N²))
1383            let label = labels
1384                .first()
1385                .ok_or_else(|| anyhow!("No labels provided"))?;
1386            self.validate_vertex_batch_constraints(&vids, &properties_batch, label)
1387                .await?;
1388
1389            // Batch prepare (CRDT merging if needed)
1390            // Check schema once: skip entirely if no CRDT properties for this label.
1391            // For new vertices (freshly allocated VIDs), there are no existing CRDT
1392            // values to merge, so the per-vertex lookup is unnecessary in that case.
1393            let has_crdt_fields = {
1394                let schema = self.schema_manager.schema();
1395                schema
1396                    .properties
1397                    .get(label.as_str())
1398                    .is_some_and(|props_meta| {
1399                        props_meta.values().any(|meta| {
1400                            matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1401                        })
1402                    })
1403            };
1404
1405            if has_crdt_fields {
1406                // Batch fetch existing CRDT values: collect VIDs that need merging,
1407                // then query once via PropertyManager instead of per-vertex lookups.
1408                let schema = self.schema_manager.schema();
1409                let crdt_keys: Vec<String> = schema
1410                    .properties
1411                    .get(label.as_str())
1412                    .map(|props_meta| {
1413                        props_meta
1414                            .iter()
1415                            .filter(|(_, meta)| {
1416                                matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1417                            })
1418                            .map(|(key, _)| key.clone())
1419                            .collect()
1420                    })
1421                    .unwrap_or_default();
1422
1423                if let Some(pm) = &self.property_manager {
1424                    let ctx = self.get_query_context().await;
1425                    for (vid, props) in vids.iter().zip(&mut properties_batch) {
1426                        for key in &crdt_keys {
1427                            if props.contains_key(key) {
1428                                let existing =
1429                                    pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
1430                                if !existing.is_null()
1431                                    && let Some(val) = props.get_mut(key)
1432                                {
1433                                    *val = pm.merge_crdt_values(&existing, val)?;
1434                                }
1435                            }
1436                        }
1437                    }
1438                }
1439            }
1440
1441            // Batch L0 writes (WAL batched automatically via transaction)
1442            let tx_l0 = self
1443                .transaction_l0
1444                .as_ref()
1445                .ok_or_else(|| anyhow!("Transaction L0 missing"))?;
1446
1447            let properties_result = properties_batch.clone();
1448            {
1449                let mut l0_guard = tx_l0.write();
1450                for (vid, props) in vids.iter().zip(properties_batch) {
1451                    l0_guard.insert_vertex_with_labels(*vid, props, &labels);
1452                }
1453            }
1454
1455            // Update metrics (batch increment)
1456            metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
1457            self.update_metrics();
1458
1459            Ok::<Vec<Properties>, anyhow::Error>(properties_result)
1460        }
1461        .await;
1462
1463        // Handle transaction commit/rollback
1464        match result {
1465            Ok(props) => {
1466                // Commit if we started the transaction
1467                if !is_nested {
1468                    self.commit_transaction().await?;
1469                }
1470
1471                if start.elapsed().as_millis() > 100 {
1472                    log::warn!(
1473                        "Slow insert_vertices_batch ({} vertices): {}ms",
1474                        vids.len(),
1475                        start.elapsed().as_millis()
1476                    );
1477                }
1478
1479                Ok(props)
1480            }
1481            Err(e) => {
1482                // Rollback if we started the transaction
1483                if !is_nested {
1484                    self.rollback_transaction()?;
1485                }
1486                Err(e)
1487            }
1488        }
1489    }
1490
1491    /// Delete a vertex by VID.
1492    ///
1493    /// When `labels` is provided, uses them directly to populate L0 for
1494    /// correct tombstone flushing. Otherwise discovers labels from L0
1495    /// buffers and storage (which can be slow for many vertices).
1496    ///
1497    /// # Errors
1498    ///
1499    /// Returns an error if write pressure stalls, label lookup fails, or
1500    /// the L0 delete operation fails.
1501    #[instrument(skip(self, labels), level = "trace")]
1502    pub async fn delete_vertex(&mut self, vid: Vid, labels: Option<Vec<String>>) -> Result<()> {
1503        let start = std::time::Instant::now();
1504        self.check_write_pressure().await?;
1505        self.check_transaction_memory()?;
1506        let l0 = self.active_l0();
1507
1508        // Before deleting, ensure we have the vertex's labels stored in L0
1509        // so the tombstone can be properly flushed to the correct label datasets.
1510        let has_labels = {
1511            let l0_guard = l0.read();
1512            l0_guard.vertex_labels.contains_key(&vid)
1513        };
1514
1515        if !has_labels {
1516            let resolved_labels = if let Some(provided) = labels {
1517                // Caller provided labels — skip the lookup entirely
1518                Some(provided)
1519            } else {
1520                // Discover labels from pending flush L0s, then storage
1521                let mut found = None;
1522                for pending_l0 in self.l0_manager.get_pending_flush() {
1523                    let pending_guard = pending_l0.read();
1524                    if let Some(l) = pending_guard.get_vertex_labels(vid) {
1525                        found = Some(l.to_vec());
1526                        break;
1527                    }
1528                }
1529                if found.is_none() {
1530                    found = self.find_vertex_labels_in_storage(vid).await?;
1531                }
1532                found
1533            };
1534
1535            if let Some(found_labels) = resolved_labels {
1536                let mut l0_guard = l0.write();
1537                l0_guard.vertex_labels.insert(vid, found_labels);
1538            }
1539        }
1540
1541        l0.write().delete_vertex(vid)?;
1542        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1543        self.update_metrics();
1544
1545        if self.transaction_l0.is_none() {
1546            self.check_flush().await?;
1547        }
1548        if start.elapsed().as_millis() > 100 {
1549            log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
1550        }
1551        Ok(())
1552    }
1553
1554    /// Find vertex labels from storage by querying the main vertices table.
1555    /// Returns the labels from the latest non-deleted version of the vertex.
1556    async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1557        use arrow_array::Array;
1558        use arrow_array::cast::AsArray;
1559        use lancedb::query::{ExecutableQuery, QueryBase, Select};
1560
1561        let lancedb_store = self.storage.lancedb_store();
1562        let table_name = MainVertexDataset::table_name();
1563
1564        // Check if table exists first; if not, vertex hasn't been flushed to storage yet
1565        if !lancedb_store.table_exists(table_name).await? {
1566            return Ok(None);
1567        }
1568
1569        let table = lancedb_store.open_table(table_name).await?;
1570
1571        // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
1572        let filter = format!("_vid = {}", vid.as_u64());
1573        let query = table.query().only_if(filter).select(Select::Columns(vec![
1574            "_vid".to_string(),
1575            "labels".to_string(),
1576            "_version".to_string(),
1577            "_deleted".to_string(),
1578        ]));
1579
1580        let stream = query.execute().await?;
1581        let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await.unwrap_or_default();
1582
1583        // Find the row with the highest version number
1584        let mut max_version: Option<u64> = None;
1585        let mut labels: Option<Vec<String>> = None;
1586        let mut is_deleted = false;
1587
1588        for batch in batches {
1589            if batch.num_rows() == 0 {
1590                continue;
1591            }
1592
1593            let version_array = batch
1594                .column_by_name("_version")
1595                .unwrap()
1596                .as_primitive::<arrow_array::types::UInt64Type>();
1597
1598            let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
1599
1600            let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
1601
1602            for row_idx in 0..batch.num_rows() {
1603                let version = version_array.value(row_idx);
1604
1605                if max_version.is_none_or(|mv| version > mv) {
1606                    is_deleted = deleted_array.value(row_idx);
1607
1608                    let labels_list = labels_array.value(row_idx);
1609                    let string_array = labels_list.as_string::<i32>();
1610                    let vertex_labels: Vec<String> = (0..string_array.len())
1611                        .filter(|&i| !string_array.is_null(i))
1612                        .map(|i| string_array.value(i).to_string())
1613                        .collect();
1614
1615                    max_version = Some(version);
1616                    labels = Some(vertex_labels);
1617                }
1618            }
1619        }
1620
1621        // If the latest version is deleted, return None
1622        if is_deleted { Ok(None) } else { Ok(labels) }
1623    }
1624
1625    #[instrument(skip(self, properties), level = "trace")]
1626    pub async fn insert_edge(
1627        &mut self,
1628        src_vid: Vid,
1629        dst_vid: Vid,
1630        edge_type: u32,
1631        eid: Eid,
1632        mut properties: Properties,
1633        edge_type_name: Option<String>,
1634    ) -> Result<()> {
1635        let start = std::time::Instant::now();
1636        self.check_write_pressure().await?;
1637        self.check_transaction_memory()?;
1638        self.prepare_edge_upsert(eid, &mut properties).await?;
1639
1640        let l0 = self.active_l0();
1641        l0.write()
1642            .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
1643
1644        // Dual-write to AdjacencyManager overlay (survives flush).
1645        // Skip for transaction-local L0 -- transaction edges are overlaid separately.
1646        if self.transaction_l0.is_none() {
1647            let version = l0.read().current_version;
1648            self.adjacency_manager
1649                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
1650        }
1651
1652        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1653        self.update_metrics();
1654
1655        if self.transaction_l0.is_none() {
1656            self.check_flush().await?;
1657        }
1658        if start.elapsed().as_millis() > 100 {
1659            log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
1660        }
1661        Ok(())
1662    }
1663
1664    #[instrument(skip(self), level = "trace")]
1665    pub async fn delete_edge(
1666        &mut self,
1667        eid: Eid,
1668        src_vid: Vid,
1669        dst_vid: Vid,
1670        edge_type: u32,
1671    ) -> Result<()> {
1672        let start = std::time::Instant::now();
1673        self.check_write_pressure().await?;
1674        self.check_transaction_memory()?;
1675        let l0 = self.active_l0();
1676
1677        l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
1678
1679        // Dual-write tombstone to AdjacencyManager overlay.
1680        if self.transaction_l0.is_none() {
1681            let version = l0.read().current_version;
1682            self.adjacency_manager
1683                .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
1684        }
1685        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1686        self.update_metrics();
1687
1688        if self.transaction_l0.is_none() {
1689            self.check_flush().await?;
1690        }
1691        if start.elapsed().as_millis() > 100 {
1692            log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
1693        }
1694        Ok(())
1695    }
1696
1697    /// Check if flush should be triggered based on mutation count or time elapsed.
1698    /// This method is called after each write operation and can also be called
1699    /// by a background task for time-based flushing.
1700    pub async fn check_flush(&mut self) -> Result<()> {
1701        let count = self.l0_manager.get_current().read().mutation_count;
1702
1703        // Skip if no mutations
1704        if count == 0 {
1705            return Ok(());
1706        }
1707
1708        // Flush on mutation count threshold (10,000 default)
1709        if count >= self.config.auto_flush_threshold {
1710            self.flush_to_l1(None).await?;
1711            return Ok(());
1712        }
1713
1714        // Flush on time interval IF minimum mutations met
1715        if let Some(interval) = self.config.auto_flush_interval
1716            && self.last_flush_time.elapsed() >= interval
1717            && count >= self.config.auto_flush_min_mutations
1718        {
1719            self.flush_to_l1(None).await?;
1720        }
1721
1722        Ok(())
1723    }
1724
1725    /// Process embeddings for a vertex using labels passed directly.
1726    /// Use this when labels haven't been stored to L0 yet.
1727    async fn process_embeddings_for_labels(
1728        &self,
1729        labels: &[String],
1730        properties: &mut Properties,
1731    ) -> Result<()> {
1732        let label_name = labels.first().map(|s| s.as_str());
1733        self.process_embeddings_impl(label_name, properties).await
1734    }
1735
1736    /// Process embeddings for a batch of vertices efficiently.
1737    ///
1738    /// Groups vertices by embedding config and makes batched API calls to the
1739    /// embedding service instead of calling once per vertex.
1740    ///
1741    /// # Performance
1742    /// For N vertices with embedding config:
1743    /// - Old approach: N API calls to embedding service
1744    /// - New approach: 1 API call per embedding config (usually 1 total)
1745    async fn process_embeddings_for_batch(
1746        &self,
1747        labels: &[String],
1748        properties_batch: &mut [Properties],
1749    ) -> Result<()> {
1750        let label_name = labels.first().map(|s| s.as_str());
1751        let schema = self.schema_manager.schema();
1752
1753        if let Some(label) = label_name {
1754            // Find vector indexes with embedding config for this label
1755            let mut configs = Vec::new();
1756            for idx in &schema.indexes {
1757                if let IndexDefinition::Vector(v_config) = idx
1758                    && v_config.label == label
1759                    && let Some(emb_config) = &v_config.embedding_config
1760                {
1761                    configs.push((v_config.property.clone(), emb_config.clone()));
1762                }
1763            }
1764
1765            if configs.is_empty() {
1766                return Ok(());
1767            }
1768
1769            for (target_prop, emb_config) in configs {
1770                // Collect input texts from all vertices that need embeddings
1771                let mut input_texts: Vec<String> = Vec::new();
1772                let mut needs_embedding: Vec<usize> = Vec::new();
1773
1774                for (idx, properties) in properties_batch.iter().enumerate() {
1775                    // Skip if target property already exists
1776                    if properties.contains_key(&target_prop) {
1777                        continue;
1778                    }
1779
1780                    // Check if source properties exist
1781                    let mut inputs = Vec::new();
1782                    for src_prop in &emb_config.source_properties {
1783                        if let Some(val) = properties.get(src_prop)
1784                            && let Some(s) = val.as_str()
1785                        {
1786                            inputs.push(s.to_string());
1787                        }
1788                    }
1789
1790                    if !inputs.is_empty() {
1791                        let input_text = inputs.join(" ");
1792                        input_texts.push(input_text);
1793                        needs_embedding.push(idx);
1794                    }
1795                }
1796
1797                if input_texts.is_empty() {
1798                    continue;
1799                }
1800
1801                let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1802                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1803                })?;
1804                let embedder = runtime.embedding(&emb_config.alias).await?;
1805
1806                // Batch generate embeddings (single API call)
1807                let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
1808                let embeddings = embedder.embed(input_refs).await?;
1809
1810                // Distribute results back to properties
1811                for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
1812                    if let Some(vec) = embeddings.get(embedding_idx) {
1813                        let vals: Vec<Value> =
1814                            vec.iter().map(|f| Value::Float(*f as f64)).collect();
1815                        properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
1816                    }
1817                }
1818            }
1819        }
1820
1821        Ok(())
1822    }
1823
1824    async fn process_embeddings_impl(
1825        &self,
1826        label_name: Option<&str>,
1827        properties: &mut Properties,
1828    ) -> Result<()> {
1829        let schema = self.schema_manager.schema();
1830
1831        if let Some(label) = label_name {
1832            // Find vector indexes with embedding config for this label
1833            let mut configs = Vec::new();
1834            for idx in &schema.indexes {
1835                if let IndexDefinition::Vector(v_config) = idx
1836                    && v_config.label == label
1837                    && let Some(emb_config) = &v_config.embedding_config
1838                {
1839                    configs.push((v_config.property.clone(), emb_config.clone()));
1840                }
1841            }
1842
1843            if configs.is_empty() {
1844                log::info!("No embedding config found for label {}", label);
1845            }
1846
1847            for (target_prop, emb_config) in configs {
1848                // If target property already exists, skip (assume user provided it)
1849                if properties.contains_key(&target_prop) {
1850                    continue;
1851                }
1852
1853                // Check if source properties exist
1854                let mut inputs = Vec::new();
1855                for src_prop in &emb_config.source_properties {
1856                    if let Some(val) = properties.get(src_prop)
1857                        && let Some(s) = val.as_str()
1858                    {
1859                        inputs.push(s.to_string());
1860                    }
1861                }
1862
1863                if inputs.is_empty() {
1864                    continue;
1865                }
1866
1867                let input_text = inputs.join(" "); // Simple concatenation
1868
1869                let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1870                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1871                })?;
1872                let embedder = runtime.embedding(&emb_config.alias).await?;
1873
1874                // Generate
1875                let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
1876                if let Some(vec) = embeddings.first() {
1877                    // Store as array of floats
1878                    let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
1879                    properties.insert(target_prop.clone(), Value::List(vals));
1880                }
1881            }
1882        }
1883        Ok(())
1884    }
1885
1886    /// Flushes the current in-memory L0 buffer to L1 storage.
1887    ///
1888    /// # Lock Ordering
1889    ///
1890    /// To prevent deadlocks, locks must be acquired in the following order:
1891    /// 1. `Writer` lock (held by caller)
1892    /// 2. `L0Manager` lock (via `begin_flush` / `get_current`)
1893    /// 3. `L0Buffer` lock (individual buffer RWLocks)
1894    /// 4. `Index` / `Storage` locks (during actual flush)
1895    #[instrument(
1896        skip(self),
1897        fields(snapshot_id, mutations_count, size_bytes),
1898        level = "info"
1899    )]
1900    pub async fn flush_to_l1(&mut self, name: Option<String>) -> Result<String> {
1901        let start = std::time::Instant::now();
1902        let schema = self.schema_manager.schema();
1903
1904        let (initial_size, initial_count) = {
1905            let l0_arc = self.l0_manager.get_current();
1906            let l0 = l0_arc.read();
1907            (l0.estimated_size, l0.mutation_count)
1908        };
1909        tracing::Span::current().record("size_bytes", initial_size);
1910        tracing::Span::current().record("mutations_count", initial_count);
1911
1912        debug!("Starting L0 flush to L1");
1913
1914        // 1. Flush WAL BEFORE rotating L0
1915        // This ensures that if WAL flush fails, the current L0 is still active
1916        // and mutations are retained in memory until restart/retry.
1917        // Capture the LSN of the flushed segment for the snapshot's wal_high_water_mark.
1918        let wal_for_truncate = {
1919            let current_l0 = self.l0_manager.get_current();
1920            let l0_guard = current_l0.read();
1921            l0_guard.wal.clone()
1922        };
1923
1924        let wal_lsn = if let Some(ref w) = wal_for_truncate {
1925            w.flush().await?
1926        } else {
1927            0
1928        };
1929
1930        // 2. Begin flush: rotate L0 and keep old L0 visible to reads
1931        // The old L0 stays in pending_flush list until complete_flush is called,
1932        // ensuring data remains visible even if L1 writes fail.
1933        let old_l0_arc = self.l0_manager.begin_flush(0, None);
1934        metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
1935
1936        let current_version;
1937        {
1938            // Acquire Write lock to take WAL and version
1939            let mut old_l0_guard = old_l0_arc.write();
1940            current_version = old_l0_guard.current_version;
1941
1942            // Record the WAL LSN for this L0 so we don't truncate past it
1943            // if this flush fails and a subsequent flush succeeds.
1944            old_l0_guard.wal_lsn_at_flush = wal_lsn;
1945
1946            let wal = old_l0_guard.wal.take();
1947
1948            // Give WAL to new L0
1949            let new_l0_arc = self.l0_manager.get_current();
1950            let mut new_l0_guard = new_l0_arc.write();
1951            new_l0_guard.wal = wal;
1952            new_l0_guard.current_version = current_version;
1953        } // Drop locks
1954
1955        // 2. Acquire Read lock on Old L0 for flushing
1956        let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
1957        // (Vid, labels, properties, deleted, version)
1958        type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
1959        let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
1960        // Collect vertex timestamps from L0 for flushing to storage
1961        let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
1962        let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
1963        // Track tombstones missing labels for storage query fallback
1964        let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
1965
1966        {
1967            let old_l0 = old_l0_arc.read();
1968
1969            // 1. Collect all edges and tombstones from L0
1970            for edge in old_l0.graph.edges() {
1971                let properties = old_l0
1972                    .edge_properties
1973                    .get(&edge.eid)
1974                    .cloned()
1975                    .unwrap_or_default();
1976                let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
1977
1978                // Get timestamps from L0 buffer (populated during insert)
1979                let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
1980                let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
1981
1982                entries_by_type
1983                    .entry(edge.edge_type)
1984                    .or_default()
1985                    .push(L1Entry {
1986                        src_vid: edge.src_vid,
1987                        dst_vid: edge.dst_vid,
1988                        eid: edge.eid,
1989                        op: Op::Insert,
1990                        version,
1991                        properties,
1992                        created_at,
1993                        updated_at,
1994                    });
1995            }
1996
1997            // From tombstones
1998            for tombstone in old_l0.tombstones.values() {
1999                let version = old_l0
2000                    .edge_versions
2001                    .get(&tombstone.eid)
2002                    .copied()
2003                    .unwrap_or(0);
2004                // Get timestamps - for deletes, updated_at reflects deletion time
2005                let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
2006                let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
2007
2008                entries_by_type
2009                    .entry(tombstone.edge_type)
2010                    .or_default()
2011                    .push(L1Entry {
2012                        src_vid: tombstone.src_vid,
2013                        dst_vid: tombstone.dst_vid,
2014                        eid: tombstone.eid,
2015                        op: Op::Delete,
2016                        version,
2017                        properties: HashMap::new(),
2018                        created_at,
2019                        updated_at,
2020                    });
2021            }
2022
2023            // 2.5 Flush Vertices - Collect by label (using vertex_labels from L0)
2024            //
2025            // Helper: fan-out a single vertex entry into per-label buckets.
2026            // Each per-label table row carries the full label set so multi-label
2027            // info is preserved after flush.
2028            let push_vertex_to_labels =
2029                |vid: Vid,
2030                 all_labels: &[String],
2031                 props: Properties,
2032                 deleted: bool,
2033                 version: u64,
2034                 out: &mut HashMap<u16, Vec<VertexEntry>>| {
2035                    for label in all_labels {
2036                        if let Some(label_id) = schema.label_id_by_name(label) {
2037                            out.entry(label_id).or_default().push((
2038                                vid,
2039                                all_labels.to_vec(),
2040                                props.clone(),
2041                                deleted,
2042                                version,
2043                            ));
2044                        }
2045                    }
2046                };
2047
2048            for (vid, props) in &old_l0.vertex_properties {
2049                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2050                // Collect timestamps for this vertex
2051                if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
2052                    vertex_created_at.insert(*vid, ts);
2053                }
2054                if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
2055                    vertex_updated_at.insert(*vid, ts);
2056                }
2057                if let Some(labels) = old_l0.vertex_labels.get(vid) {
2058                    push_vertex_to_labels(
2059                        *vid,
2060                        labels,
2061                        props.clone(),
2062                        false,
2063                        version,
2064                        &mut vertices_by_label,
2065                    );
2066                }
2067            }
2068            for &vid in &old_l0.vertex_tombstones {
2069                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2070                if let Some(labels) = old_l0.vertex_labels.get(&vid) {
2071                    push_vertex_to_labels(
2072                        vid,
2073                        labels,
2074                        HashMap::new(),
2075                        true,
2076                        version,
2077                        &mut vertices_by_label,
2078                    );
2079                } else {
2080                    // Tombstone missing labels (old WAL format) - collect for storage query fallback
2081                    orphaned_tombstones.push((vid, version));
2082                }
2083            }
2084        } // Drop read lock
2085
2086        // Resolve orphaned tombstones (missing labels) from storage
2087        if !orphaned_tombstones.is_empty() {
2088            tracing::warn!(
2089                count = orphaned_tombstones.len(),
2090                "Tombstones missing labels in L0, querying storage as fallback"
2091            );
2092            for (vid, version) in orphaned_tombstones {
2093                if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
2094                    && !labels.is_empty()
2095                {
2096                    for label in &labels {
2097                        if let Some(label_id) = schema.label_id_by_name(label) {
2098                            vertices_by_label.entry(label_id).or_default().push((
2099                                vid,
2100                                labels.clone(),
2101                                HashMap::new(),
2102                                true,
2103                                version,
2104                            ));
2105                        }
2106                    }
2107                }
2108            }
2109        }
2110
2111        // 0. Load previous snapshot or create new
2112        let mut manifest = self
2113            .storage
2114            .snapshot_manager()
2115            .load_latest_snapshot()
2116            .await?
2117            .unwrap_or_else(|| {
2118                SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
2119            });
2120
2121        // Update snapshot metadata
2122        // Save parent snapshot ID before generating new one (for lineage tracking)
2123        let parent_id = manifest.snapshot_id.clone();
2124        manifest.parent_snapshot = Some(parent_id);
2125        manifest.snapshot_id = Uuid::new_v4().to_string();
2126        manifest.name = name;
2127        manifest.created_at = Utc::now();
2128        manifest.version_high_water_mark = current_version;
2129        manifest.wal_high_water_mark = wal_lsn;
2130        let snapshot_id = manifest.snapshot_id.clone();
2131
2132        tracing::Span::current().record("snapshot_id", &snapshot_id);
2133
2134        // 2. For each edge type, write FWD and BWD runs
2135        let lancedb_store = self.storage.lancedb_store();
2136
2137        for (&edge_type_id, entries) in entries_by_type.iter() {
2138            // Get edge type name from unified lookup (handles both schema'd and schemaless)
2139            let edge_type_name = self
2140                .storage
2141                .schema_manager()
2142                .edge_type_name_by_id_unified(edge_type_id)
2143                .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
2144
2145            // FWD Run (sorted by src_vid)
2146            let mut fwd_entries = entries.clone();
2147            fwd_entries.sort_by_key(|e| e.src_vid);
2148            let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
2149            let fwd_batch = fwd_ds.build_record_batch(&fwd_entries, &schema)?;
2150
2151            // Write using LanceDB
2152            let table = fwd_ds.write_run_lancedb(lancedb_store, fwd_batch).await?;
2153            fwd_ds.ensure_eid_index_lancedb(&table).await?;
2154
2155            // BWD Run (sorted by dst_vid)
2156            let mut bwd_entries = entries.clone();
2157            bwd_entries.sort_by_key(|e| e.dst_vid);
2158            let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
2159            let bwd_batch = bwd_ds.build_record_batch(&bwd_entries, &schema)?;
2160
2161            let bwd_table = bwd_ds.write_run_lancedb(lancedb_store, bwd_batch).await?;
2162            bwd_ds.ensure_eid_index_lancedb(&bwd_table).await?;
2163
2164            // Update Manifest
2165            let current_snap =
2166                manifest
2167                    .edges
2168                    .entry(edge_type_name.to_string())
2169                    .or_insert(EdgeSnapshot {
2170                        version: 0,
2171                        count: 0,
2172                        lance_version: 0,
2173                    });
2174            current_snap.version += 1;
2175            current_snap.count += entries.len() as u64;
2176            // LanceDB tables don't expose Lance version directly
2177            current_snap.lance_version = 0;
2178
2179            // Note: No CSR invalidation needed. AdjacencyManager's overlay
2180            // already has these edges via dual-write in insert_edge/delete_edge.
2181        }
2182
2183        // 2.5 Flush Vertices
2184        for (label_id, vertices) in vertices_by_label {
2185            let label_name = schema
2186                .label_name_by_id(label_id)
2187                .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
2188
2189            let ds = self.storage.vertex_dataset(label_name)?;
2190
2191            // Collect inverted index updates before consuming vertices
2192            // Maps: cfg.property -> (added, removed)
2193            type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
2194            let mut inverted_updates: InvertedUpdateMap = HashMap::new();
2195
2196            for idx in &schema.indexes {
2197                if let IndexDefinition::Inverted(cfg) = idx
2198                    && cfg.label == label_name
2199                {
2200                    let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
2201                    let mut removed: HashSet<Vid> = HashSet::new();
2202
2203                    for (vid, _labels, props, deleted, _version) in &vertices {
2204                        if *deleted {
2205                            removed.insert(*vid);
2206                        } else if let Some(prop_value) = props.get(&cfg.property) {
2207                            // Extract terms from the property value (List<String>)
2208                            if let Some(arr) = prop_value.as_array() {
2209                                let terms: Vec<String> = arr
2210                                    .iter()
2211                                    .filter_map(|v| v.as_str().map(ToString::to_string))
2212                                    .collect();
2213                                if !terms.is_empty() {
2214                                    added.insert(*vid, terms);
2215                                }
2216                            }
2217                        }
2218                    }
2219
2220                    if !added.is_empty() || !removed.is_empty() {
2221                        inverted_updates.insert(cfg.property.clone(), (added, removed));
2222                    }
2223                }
2224            }
2225
2226            let mut v_data = Vec::new();
2227            let mut d_data = Vec::new();
2228            let mut ver_data = Vec::new();
2229            for (vid, labels, props, deleted, version) in vertices {
2230                v_data.push((vid, labels, props));
2231                d_data.push(deleted);
2232                ver_data.push(version);
2233            }
2234
2235            let batch = ds.build_record_batch_with_timestamps(
2236                &v_data,
2237                &d_data,
2238                &ver_data,
2239                &schema,
2240                Some(&vertex_created_at),
2241                Some(&vertex_updated_at),
2242            )?;
2243
2244            // Write using LanceDB
2245            let table = ds
2246                .write_batch_lancedb(lancedb_store, batch, &schema)
2247                .await?;
2248            ds.ensure_default_indexes_lancedb(&table).await?;
2249
2250            // Update VidLabelsIndex (if enabled)
2251            for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
2252                if deleted {
2253                    self.storage.remove_from_vid_labels_index(*vid);
2254                } else {
2255                    self.storage.update_vid_labels_index(*vid, labels.clone());
2256                }
2257            }
2258
2259            // Update Manifest
2260            let current_snap =
2261                manifest
2262                    .vertices
2263                    .entry(label_name.to_string())
2264                    .or_insert(LabelSnapshot {
2265                        version: 0,
2266                        count: 0,
2267                        lance_version: 0,
2268                    });
2269            current_snap.version += 1;
2270            current_snap.count += v_data.len() as u64;
2271            // LanceDB tables don't expose Lance version directly
2272            current_snap.lance_version = 0;
2273
2274            // Invalidate table cache to ensure next read picks up new version
2275            self.storage.invalidate_table_cache(label_name);
2276
2277            // Apply inverted index updates incrementally
2278            for idx in &schema.indexes {
2279                if let IndexDefinition::Inverted(cfg) = idx
2280                    && cfg.label == label_name
2281                    && let Some((added, removed)) = inverted_updates.get(&cfg.property)
2282                {
2283                    self.storage
2284                        .index_manager()
2285                        .update_inverted_index_incremental(cfg, added, removed)
2286                        .await?;
2287                }
2288            }
2289
2290            // Update UID index with new vertex mappings
2291            // Collect (UniId, Vid) mappings from non-deleted vertices
2292            let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
2293            for (vid, _labels, props) in &v_data {
2294                let ext_id = props.get("ext_id").and_then(|v| v.as_str());
2295                let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
2296                    label_name, ext_id, props,
2297                );
2298                uid_mappings.push((uid, *vid));
2299            }
2300
2301            if !uid_mappings.is_empty()
2302                && let Ok(uid_index) = self.storage.uid_index(label_name)
2303            {
2304                // Issue #107: Check for UID collisions and FAIL instead of warning.
2305                // SHA3-256 collisions are astronomically unlikely (~2^256), but if one
2306                // occurs, we must reject the flush to prevent silent data corruption.
2307                // Changed from tracing::warn!() to anyhow::bail!().
2308                for (uid, vid) in &uid_mappings {
2309                    if let Ok(Some(existing_vid)) = uid_index.get_vid(uid).await
2310                        && existing_vid != *vid
2311                    {
2312                        anyhow::bail!(
2313                            "UID collision detected: UID {:?} maps to both VID {} and VID {}. \
2314                            This indicates either a hash collision (astronomically unlikely with SHA3-256) \
2315                            or data corruption. Cannot proceed with flush.",
2316                            uid,
2317                            existing_vid.as_u64(),
2318                            vid.as_u64()
2319                        );
2320                    }
2321                }
2322
2323                uid_index.write_mapping(&uid_mappings).await?;
2324            }
2325        }
2326
2327        // 3. Write to main unified tables (dual-write for fast ID-based lookups)
2328        // 3.1 Write to main edges table
2329        // Collect data while holding the lock, then release before async operations
2330        let (main_edges, edge_created_at_map, edge_updated_at_map) = {
2331            let _old_l0 = old_l0_arc.read();
2332            let mut main_edges: Vec<(
2333                uni_common::core::id::Eid,
2334                Vid,
2335                Vid,
2336                String,
2337                Properties,
2338                bool,
2339                u64,
2340            )> = Vec::new();
2341            let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2342            let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2343
2344            for (&edge_type_id, entries) in entries_by_type.iter() {
2345                for entry in entries {
2346                    // Get edge type name from unified lookup (handles both schema'd and schemaless)
2347                    let edge_type_name = self
2348                        .storage
2349                        .schema_manager()
2350                        .edge_type_name_by_id_unified(edge_type_id)
2351                        .unwrap_or_else(|| "unknown".to_string());
2352
2353                    let deleted = matches!(entry.op, Op::Delete);
2354                    main_edges.push((
2355                        entry.eid,
2356                        entry.src_vid,
2357                        entry.dst_vid,
2358                        edge_type_name,
2359                        entry.properties.clone(),
2360                        deleted,
2361                        entry.version,
2362                    ));
2363
2364                    if let Some(ts) = entry.created_at {
2365                        edge_created_at_map.insert(entry.eid, ts);
2366                    }
2367                    if let Some(ts) = entry.updated_at {
2368                        edge_updated_at_map.insert(entry.eid, ts);
2369                    }
2370                }
2371            }
2372
2373            (main_edges, edge_created_at_map, edge_updated_at_map)
2374        }; // Lock released here
2375
2376        if !main_edges.is_empty() {
2377            let main_edge_batch = MainEdgeDataset::build_record_batch(
2378                &main_edges,
2379                Some(&edge_created_at_map),
2380                Some(&edge_updated_at_map),
2381            )?;
2382            let main_edge_table =
2383                MainEdgeDataset::write_batch_lancedb(lancedb_store, main_edge_batch).await?;
2384            MainEdgeDataset::ensure_default_indexes_lancedb(&main_edge_table).await?;
2385        }
2386
2387        // 3.2 Write to main vertices table
2388        // Collect data while holding the lock, then release before async operations
2389        let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
2390            let old_l0 = old_l0_arc.read();
2391            let mut vertices = Vec::new();
2392
2393            // Collect all vertices from vertex_properties
2394            for (vid, props) in &old_l0.vertex_properties {
2395                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2396                let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
2397                vertices.push((*vid, labels, props.clone(), false, version));
2398            }
2399
2400            // Collect tombstones
2401            for &vid in &old_l0.vertex_tombstones {
2402                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2403                let labels = old_l0.vertex_labels.get(&vid).cloned().unwrap_or_default();
2404                vertices.push((vid, labels, HashMap::new(), true, version));
2405            }
2406
2407            vertices
2408        }; // Lock released here
2409
2410        if !main_vertices.is_empty() {
2411            let main_vertex_batch = MainVertexDataset::build_record_batch(
2412                &main_vertices,
2413                Some(&vertex_created_at),
2414                Some(&vertex_updated_at),
2415            )?;
2416            let main_vertex_table =
2417                MainVertexDataset::write_batch_lancedb(lancedb_store, main_vertex_batch).await?;
2418            MainVertexDataset::ensure_default_indexes_lancedb(&main_vertex_table).await?;
2419        }
2420
2421        // Save Snapshot
2422        self.storage
2423            .snapshot_manager()
2424            .save_snapshot(&manifest)
2425            .await?;
2426        self.storage
2427            .snapshot_manager()
2428            .set_latest_snapshot(&manifest.snapshot_id)
2429            .await?;
2430
2431        // Complete flush: remove old L0 from pending list now that L1 writes succeeded.
2432        // This must happen BEFORE WAL truncation so min_pending_wal_lsn is accurate.
2433        self.l0_manager.complete_flush(&old_l0_arc);
2434
2435        // Truncate WAL segments, but only up to the minimum LSN of any remaining pending L0s.
2436        // This prevents data loss if earlier flushes failed and left L0s in pending_flush.
2437        if let Some(w) = wal_for_truncate {
2438            // Determine safe truncation point: the minimum of our LSN and any pending L0s
2439            let safe_lsn = self
2440                .l0_manager
2441                .min_pending_wal_lsn()
2442                .map(|min_pending| min_pending.min(wal_lsn))
2443                .unwrap_or(wal_lsn);
2444            w.truncate_before(safe_lsn).await?;
2445        }
2446
2447        // Invalidate property cache after flush to prevent stale reads.
2448        // Once L0 data moves to storage, cached values from storage may be outdated.
2449        if let Some(ref pm) = self.property_manager {
2450            pm.clear_cache().await;
2451        }
2452
2453        // Reset last flush time for time-based auto-flush
2454        self.last_flush_time = std::time::Instant::now();
2455
2456        info!(
2457            snapshot_id,
2458            mutations_count = initial_count,
2459            size_bytes = initial_size,
2460            "L0 flush to L1 completed successfully"
2461        );
2462        metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
2463        metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
2464        metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
2465
2466        // Trigger CSR compaction if enough frozen segments have accumulated.
2467        // After flush, the old L0 data is now in L1; the overlay segments can be merged
2468        // into the Main CSR to reduce lookup overhead.
2469        let am = self.adjacency_manager.clone();
2470        if am.should_compact(4) {
2471            let previous_still_running = {
2472                let guard = self.compaction_handle.read();
2473                guard.as_ref().is_some_and(|h| !h.is_finished())
2474            };
2475
2476            if previous_still_running {
2477                info!("Skipping compaction: previous compaction still in progress");
2478            } else {
2479                let handle = tokio::spawn(async move {
2480                    am.compact();
2481                });
2482                *self.compaction_handle.write() = Some(handle);
2483            }
2484        }
2485
2486        Ok(snapshot_id)
2487    }
2488
2489    /// Set the property manager for cache invalidation.
2490    pub fn set_property_manager(&mut self, pm: Arc<PropertyManager>) {
2491        self.property_manager = Some(pm);
2492    }
2493}
2494
2495#[cfg(test)]
2496mod tests {
2497    use super::*;
2498    use tempfile::tempdir;
2499
2500    /// Test that commit_transaction writes mutations to WAL before merging to main L0.
2501    /// This verifies fix for issue #137 (transaction commit atomicity).
2502    #[tokio::test]
2503    async fn test_commit_transaction_wal_before_merge() -> Result<()> {
2504        use crate::runtime::wal::WriteAheadLog;
2505        use crate::storage::manager::StorageManager;
2506        use object_store::local::LocalFileSystem;
2507        use object_store::path::Path as ObjectStorePath;
2508        use uni_common::core::schema::SchemaManager;
2509
2510        let dir = tempdir()?;
2511        let path = dir.path().to_str().unwrap();
2512        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2513        let schema_path = ObjectStorePath::from("schema.json");
2514
2515        let schema_manager =
2516            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2517        let _label_id = schema_manager.add_label("Test")?;
2518        schema_manager.save().await?;
2519
2520        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2521
2522        // Create WAL for main L0
2523        let wal_path = ObjectStorePath::from("wal");
2524        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2525
2526        let mut writer = Writer::new_with_config(
2527            storage.clone(),
2528            schema_manager.clone(),
2529            1,
2530            UniConfig::default(),
2531            Some(wal),
2532            None,
2533        )
2534        .await?;
2535
2536        // Begin transaction
2537        writer.begin_transaction()?;
2538
2539        // Insert data in transaction
2540        let vid_a = writer.next_vid().await?;
2541        let vid_b = writer.next_vid().await?;
2542
2543        let mut props = std::collections::HashMap::new();
2544        props.insert("test".to_string(), Value::String("data".to_string()));
2545
2546        writer
2547            .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()])
2548            .await?;
2549        writer
2550            .insert_vertex_with_labels(
2551                vid_b,
2552                std::collections::HashMap::new(),
2553                &["Test".to_string()],
2554            )
2555            .await?;
2556
2557        let eid = writer.next_eid(1).await?;
2558        writer
2559            .insert_edge(vid_a, vid_b, 1, eid, std::collections::HashMap::new(), None)
2560            .await?;
2561
2562        // Get WAL before commit
2563        let l0 = writer.l0_manager.get_current();
2564        let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
2565        let mutations_before = wal.replay().await?;
2566        let count_before = mutations_before.len();
2567
2568        // Commit transaction - this should write to WAL first
2569        writer.commit_transaction().await?;
2570
2571        // Verify WAL has the new mutations
2572        let mutations_after = wal.replay().await?;
2573        assert!(
2574            mutations_after.len() > count_before,
2575            "WAL should contain transaction mutations after commit"
2576        );
2577
2578        // Verify mutations are in correct order: vertices first, then edges
2579        let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
2580
2581        let mut saw_vertex_a = false;
2582        let mut saw_vertex_b = false;
2583        let mut saw_edge = false;
2584
2585        for mutation in &new_mutations {
2586            match mutation {
2587                crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
2588                    if *vid == vid_a {
2589                        saw_vertex_a = true;
2590                    }
2591                    if *vid == vid_b {
2592                        saw_vertex_b = true;
2593                    }
2594                    // Vertices should come before edges
2595                    assert!(!saw_edge, "Vertices should be logged to WAL before edges");
2596                }
2597                crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
2598                    if *e == eid {
2599                        saw_edge = true;
2600                    }
2601                    // Edges should come after vertices
2602                    assert!(
2603                        saw_vertex_a && saw_vertex_b,
2604                        "Edge should be logged after both vertices"
2605                    );
2606                }
2607                _ => {}
2608            }
2609        }
2610
2611        assert!(saw_vertex_a, "Vertex A should be in WAL");
2612        assert!(saw_vertex_b, "Vertex B should be in WAL");
2613        assert!(saw_edge, "Edge should be in WAL");
2614
2615        // Verify data is also in main L0
2616        let l0_read = l0.read();
2617        assert!(
2618            l0_read.vertex_properties.contains_key(&vid_a),
2619            "Vertex A should be in main L0"
2620        );
2621        assert!(
2622            l0_read.vertex_properties.contains_key(&vid_b),
2623            "Vertex B should be in main L0"
2624        );
2625        assert!(
2626            l0_read.edge_endpoints.contains_key(&eid),
2627            "Edge should be in main L0"
2628        );
2629
2630        Ok(())
2631    }
2632
2633    /// Test that failed WAL flush leaves transaction intact for retry or rollback.
2634    #[tokio::test]
2635    async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
2636        use crate::runtime::wal::WriteAheadLog;
2637        use crate::storage::manager::StorageManager;
2638        use object_store::local::LocalFileSystem;
2639        use object_store::path::Path as ObjectStorePath;
2640        use uni_common::core::schema::SchemaManager;
2641
2642        let dir = tempdir()?;
2643        let path = dir.path().to_str().unwrap();
2644        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2645        let schema_path = ObjectStorePath::from("schema.json");
2646
2647        let schema_manager =
2648            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2649        let _label_id = schema_manager.add_label("Test")?;
2650        let _baseline_label_id = schema_manager.add_label("Baseline")?;
2651        let _txdata_label_id = schema_manager.add_label("TxData")?;
2652        schema_manager.save().await?;
2653
2654        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2655
2656        // Create WAL for main L0
2657        let wal_path = ObjectStorePath::from("wal");
2658        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2659
2660        let mut writer = Writer::new_with_config(
2661            storage.clone(),
2662            schema_manager.clone(),
2663            1,
2664            UniConfig::default(),
2665            Some(wal),
2666            None,
2667        )
2668        .await?;
2669
2670        // Insert baseline data (outside transaction)
2671        let baseline_vid = writer.next_vid().await?;
2672        writer
2673            .insert_vertex_with_labels(
2674                baseline_vid,
2675                [("baseline".to_string(), Value::Bool(true))]
2676                    .into_iter()
2677                    .collect(),
2678                &["Baseline".to_string()],
2679            )
2680            .await?;
2681
2682        // Begin transaction
2683        writer.begin_transaction()?;
2684
2685        // Insert data in transaction
2686        let tx_vid = writer.next_vid().await?;
2687        writer
2688            .insert_vertex_with_labels(
2689                tx_vid,
2690                [("tx_data".to_string(), Value::Bool(true))]
2691                    .into_iter()
2692                    .collect(),
2693                &["TxData".to_string()],
2694            )
2695            .await?;
2696
2697        // Capture main L0 state before rollback
2698        let l0 = writer.l0_manager.get_current();
2699        let vertex_count_before = l0.read().vertex_properties.len();
2700
2701        // Rollback transaction (simulating what would happen after WAL flush failure)
2702        writer.rollback_transaction()?;
2703
2704        // Verify main L0 is unchanged
2705        let vertex_count_after = l0.read().vertex_properties.len();
2706        assert_eq!(
2707            vertex_count_before, vertex_count_after,
2708            "Main L0 should not change after rollback"
2709        );
2710
2711        // Baseline should still be present
2712        assert!(
2713            l0.read().vertex_properties.contains_key(&baseline_vid),
2714            "Baseline data should remain"
2715        );
2716
2717        // Transaction data should NOT be in main L0
2718        assert!(
2719            !l0.read().vertex_properties.contains_key(&tx_vid),
2720            "Transaction data should not be in main L0 after rollback"
2721        );
2722
2723        Ok(())
2724    }
2725
2726    /// Test that batch insert with shared labels does not clone labels per vertex.
2727    /// This verifies fix for issue #161 (redundant label cloning).
2728    #[tokio::test]
2729    async fn test_batch_insert_shared_labels() -> Result<()> {
2730        use crate::storage::manager::StorageManager;
2731        use object_store::local::LocalFileSystem;
2732        use object_store::path::Path as ObjectStorePath;
2733        use uni_common::core::schema::SchemaManager;
2734
2735        let dir = tempdir()?;
2736        let path = dir.path().to_str().unwrap();
2737        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2738        let schema_path = ObjectStorePath::from("schema.json");
2739
2740        let schema_manager =
2741            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2742        let _label_id = schema_manager.add_label("Person")?;
2743        schema_manager.save().await?;
2744
2745        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2746
2747        let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2748
2749        // Shared labels - should not be cloned per vertex
2750        let labels = &["Person".to_string()];
2751
2752        // Insert batch of vertices with same labels
2753        let mut vids = Vec::new();
2754        for i in 0..100 {
2755            let vid = writer.next_vid().await?;
2756            let mut props = std::collections::HashMap::new();
2757            props.insert("id".to_string(), Value::Int(i));
2758            writer.insert_vertex_with_labels(vid, props, labels).await?;
2759            vids.push(vid);
2760        }
2761
2762        // Verify all vertices have the correct labels
2763        let l0 = writer.l0_manager.get_current();
2764        for vid in vids {
2765            let l0_guard = l0.read();
2766            let vertex_labels = l0_guard.vertex_labels.get(&vid);
2767            assert!(vertex_labels.is_some(), "Vertex should have labels");
2768            assert_eq!(
2769                vertex_labels.unwrap(),
2770                &vec!["Person".to_string()],
2771                "Labels should match"
2772            );
2773        }
2774
2775        Ok(())
2776    }
2777
2778    /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
2779    /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
2780    #[tokio::test]
2781    async fn test_estimated_size_tracks_mutations() -> Result<()> {
2782        use crate::storage::manager::StorageManager;
2783        use object_store::local::LocalFileSystem;
2784        use object_store::path::Path as ObjectStorePath;
2785        use uni_common::core::schema::SchemaManager;
2786
2787        let dir = tempdir()?;
2788        let path = dir.path().to_str().unwrap();
2789        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2790        let schema_path = ObjectStorePath::from("schema.json");
2791
2792        let schema_manager =
2793            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2794        let _label_id = schema_manager.add_label("Test")?;
2795        schema_manager.save().await?;
2796
2797        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2798
2799        let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2800
2801        let l0 = writer.l0_manager.get_current();
2802
2803        // Initial state should be empty
2804        let initial_estimated = l0.read().estimated_size;
2805        let initial_actual = l0.read().size_bytes();
2806        assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
2807        assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
2808
2809        // Insert vertices with properties
2810        let mut vids = Vec::new();
2811        for i in 0..10 {
2812            let vid = writer.next_vid().await?;
2813            let mut props = std::collections::HashMap::new();
2814            props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
2815            props.insert("index".to_string(), Value::Int(i));
2816            writer.insert_vertex_with_labels(vid, props, &[]).await?;
2817            vids.push(vid);
2818        }
2819
2820        // Verify estimated_size grew
2821        let after_vertices_estimated = l0.read().estimated_size;
2822        let after_vertices_actual = l0.read().size_bytes();
2823        assert!(
2824            after_vertices_estimated > 0,
2825            "estimated_size should grow after insertions"
2826        );
2827
2828        // Verify estimated_size is within reasonable bounds of actual size (within 2x)
2829        let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
2830        assert!(
2831            (0.5..=2.0).contains(&ratio),
2832            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2833            after_vertices_estimated,
2834            after_vertices_actual,
2835            ratio
2836        );
2837
2838        // Insert edges with a simple edge type
2839        let edge_type = 1u32;
2840        for i in 0..9 {
2841            let eid = writer.next_eid(edge_type).await?;
2842            writer
2843                .insert_edge(
2844                    vids[i],
2845                    vids[i + 1],
2846                    edge_type,
2847                    eid,
2848                    std::collections::HashMap::new(),
2849                    Some("NEXT".to_string()),
2850                )
2851                .await?;
2852        }
2853
2854        // Verify estimated_size grew further
2855        let after_edges_estimated = l0.read().estimated_size;
2856        let after_edges_actual = l0.read().size_bytes();
2857        assert!(
2858            after_edges_estimated > after_vertices_estimated,
2859            "estimated_size should grow after edge insertions"
2860        );
2861
2862        // Verify still within reasonable bounds
2863        let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
2864        assert!(
2865            (0.5..=2.0).contains(&ratio),
2866            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2867            after_edges_estimated,
2868            after_edges_actual,
2869            ratio
2870        );
2871
2872        Ok(())
2873    }
2874}