Skip to main content

uni_store/runtime/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::id_allocator::IdAllocator;
6use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
7use crate::runtime::l0_manager::L0Manager;
8use crate::runtime::property_manager::PropertyManager;
9use crate::runtime::wal::WriteAheadLog;
10use crate::storage::adjacency_manager::AdjacencyManager;
11use crate::storage::delta::{L1Entry, Op};
12use crate::storage::main_edge::MainEdgeDataset;
13use crate::storage::main_vertex::MainVertexDataset;
14use crate::storage::manager::StorageManager;
15use anyhow::{Result, anyhow};
16use chrono::Utc;
17use futures::TryStreamExt;
18use metrics;
19use parking_lot::RwLock;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use tracing::{debug, info, instrument};
23use uni_common::Properties;
24use uni_common::Value;
25use uni_common::config::UniConfig;
26use uni_common::core::id::{Eid, Vid};
27use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
28use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
29use uni_xervo::runtime::ModelRuntime;
30use uuid::Uuid;
31
32#[derive(Clone, Debug)]
33pub struct WriterConfig {
34    pub max_mutations: usize,
35}
36
37impl Default for WriterConfig {
38    fn default() -> Self {
39        Self {
40            max_mutations: 10_000,
41        }
42    }
43}
44
45pub struct Writer {
46    pub l0_manager: Arc<L0Manager>,
47    pub storage: Arc<StorageManager>,
48    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
49    pub allocator: Arc<IdAllocator>,
50    pub config: UniConfig,
51    pub xervo_runtime: Option<Arc<ModelRuntime>>,
52    pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
53    /// Property manager for cache invalidation after flush
54    pub property_manager: Option<Arc<PropertyManager>>,
55    /// Adjacency manager for dual-write (edges survive flush).
56    adjacency_manager: Arc<AdjacencyManager>,
57    /// Timestamp of last flush or creation
58    last_flush_time: std::time::Instant,
59    /// Background compaction task handle (prevents concurrent compaction races)
60    compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
61    /// Optional index rebuild manager for post-flush automatic rebuild scheduling
62    index_rebuild_manager: Option<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
63}
64
65impl Writer {
66    pub async fn new(
67        storage: Arc<StorageManager>,
68        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
69        start_version: u64,
70    ) -> Result<Self> {
71        Self::new_with_config(
72            storage,
73            schema_manager,
74            start_version,
75            UniConfig::default(),
76            None,
77            None,
78        )
79        .await
80    }
81
82    pub async fn new_with_config(
83        storage: Arc<StorageManager>,
84        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
85        start_version: u64,
86        config: UniConfig,
87        wal: Option<Arc<WriteAheadLog>>,
88        allocator: Option<Arc<IdAllocator>>,
89    ) -> Result<Self> {
90        let allocator = if let Some(a) = allocator {
91            a
92        } else {
93            let store = storage.store();
94            let path = object_store::path::Path::from("id_allocator.json");
95            Arc::new(IdAllocator::new(store, path, 1000).await?)
96        };
97
98        let l0_manager = Arc::new(L0Manager::new(start_version, wal));
99
100        let property_manager = Some(Arc::new(PropertyManager::new(
101            storage.clone(),
102            schema_manager.clone(),
103            1000,
104        )));
105
106        let adjacency_manager = storage.adjacency_manager();
107
108        Ok(Self {
109            l0_manager,
110            storage,
111            schema_manager,
112            allocator,
113            config,
114            xervo_runtime: None,
115            transaction_l0: None,
116            property_manager,
117            adjacency_manager,
118            last_flush_time: std::time::Instant::now(),
119            compaction_handle: Arc::new(RwLock::new(None)),
120            index_rebuild_manager: None,
121        })
122    }
123
124    /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
125    pub fn set_index_rebuild_manager(
126        &mut self,
127        manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
128    ) {
129        self.index_rebuild_manager = Some(manager);
130    }
131
132    /// Replay WAL mutations into the current L0 buffer.
133    pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
134        let l0 = self.l0_manager.get_current();
135        let wal = l0.read().wal.clone();
136
137        if let Some(wal) = wal {
138            wal.initialize().await?;
139            let mutations = wal.replay_since(wal_high_water_mark).await?;
140            let count = mutations.len();
141
142            if count > 0 {
143                log::info!(
144                    "Replaying {} mutations from WAL (LSN > {})",
145                    count,
146                    wal_high_water_mark
147                );
148                let mut l0_guard = l0.write();
149                l0_guard.replay_mutations(mutations)?;
150            }
151
152            Ok(count)
153        } else {
154            Ok(0)
155        }
156    }
157
158    /// Allocates the next VID (pure auto-increment).
159    pub async fn next_vid(&self) -> Result<Vid> {
160        self.allocator.allocate_vid().await
161    }
162
163    /// Allocates multiple VIDs at once for bulk operations.
164    /// This is more efficient than calling next_vid() in a loop.
165    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
166        self.allocator.allocate_vids(count).await
167    }
168
169    /// Allocates the next EID (pure auto-increment).
170    pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
171        self.allocator.allocate_eid().await
172    }
173
174    pub fn set_xervo_runtime(&mut self, runtime: Arc<ModelRuntime>) {
175        self.xervo_runtime = Some(runtime);
176    }
177
178    pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
179        self.xervo_runtime.clone()
180    }
181
182    pub fn begin_transaction(&mut self) -> Result<()> {
183        if self.transaction_l0.is_some() {
184            return Err(anyhow!("Transaction already active"));
185        }
186        let current_version = self.l0_manager.get_current().read().current_version;
187        // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
188        self.transaction_l0 = Some(Arc::new(RwLock::new(L0Buffer::new(current_version, None))));
189        Ok(())
190    }
191
192    /// Returns the active L0 buffer: the transaction L0 if a transaction is open,
193    /// otherwise the current L0 from the manager.
194    fn active_l0(&self) -> Arc<RwLock<L0Buffer>> {
195        self.transaction_l0
196            .clone()
197            .unwrap_or_else(|| self.l0_manager.get_current())
198    }
199
200    fn update_metrics(&self) {
201        let l0 = self.l0_manager.get_current();
202        let size = l0.read().estimated_size;
203        metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
204
205        if let Some(tx_l0) = &self.transaction_l0 {
206            metrics::gauge!("active_transactions").set(1.0);
207            let tx_size = tx_l0.read().estimated_size;
208            metrics::gauge!("transaction_l0_size_bytes").set(tx_size as f64);
209        } else {
210            metrics::gauge!("active_transactions").set(0.0);
211            metrics::gauge!("transaction_l0_size_bytes").set(0.0);
212        }
213    }
214
215    pub async fn commit_transaction(&mut self) -> Result<()> {
216        // 1. Borrow transaction L0 - keep available for rollback if commit fails
217        let tx_l0_arc = self
218            .transaction_l0
219            .as_ref()
220            .ok_or_else(|| anyhow!("No active transaction"))?
221            .clone();
222
223        // 2. Write transaction mutations to WAL BEFORE merging into main L0
224        // This ensures durability before visibility.
225        // Note: WAL is optional - if present, we get durability; if absent, we skip this step.
226        {
227            let tx_l0 = tx_l0_arc.read();
228            let main_l0_arc = self.l0_manager.get_current();
229            let main_l0 = main_l0_arc.read();
230
231            // If WAL exists, write mutations to it for durability
232            if let Some(wal) = main_l0.wal.as_ref() {
233                // Append all transaction mutations to WAL
234                // Order: vertices first, then edges (to ensure src/dst exist on replay)
235
236                // Vertex insertions
237                for (vid, properties) in &tx_l0.vertex_properties {
238                    if !tx_l0.vertex_tombstones.contains(vid) {
239                        let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
240                        wal.append(&crate::runtime::wal::Mutation::InsertVertex {
241                            vid: *vid,
242                            properties: properties.clone(),
243                            labels,
244                        })?;
245                    }
246                }
247
248                // Vertex deletions
249                for vid in &tx_l0.vertex_tombstones {
250                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
251                    wal.append(&crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
252                }
253
254                // Edge insertions and deletions
255                for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
256                    if tx_l0.tombstones.contains_key(eid) {
257                        // Edge deletion
258                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
259                        wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
260                            eid: *eid,
261                            src_vid: *src_vid,
262                            dst_vid: *dst_vid,
263                            edge_type: *edge_type,
264                            version,
265                        })?;
266                    } else {
267                        // Edge insertion
268                        let properties =
269                            tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
270                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
271                        let edge_type_name = tx_l0.edge_types.get(eid).cloned();
272                        wal.append(&crate::runtime::wal::Mutation::InsertEdge {
273                            src_vid: *src_vid,
274                            dst_vid: *dst_vid,
275                            edge_type: *edge_type,
276                            eid: *eid,
277                            version,
278                            properties,
279                            edge_type_name,
280                        })?;
281                    }
282                }
283            }
284        }
285
286        // 3. Flush WAL to durable storage - THIS IS THE COMMIT POINT
287        // If this fails, transaction remains active and can be retried or rolled back.
288        self.flush_wal().await?;
289
290        // 4. Now that mutations are durable, merge into main L0 and make visible
291        {
292            let tx_l0 = tx_l0_arc.read();
293            let main_l0_arc = self.l0_manager.get_current();
294            let mut main_l0 = main_l0_arc.write();
295            main_l0.merge(&tx_l0)?;
296
297            // Replay transaction edges into the AdjacencyManager overlay so they
298            // become visible to queries that read from the AM (post-migration path).
299            // Use per-edge versions from tx_l0.edge_versions, falling back to current version.
300            for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
301                let edge_version = tx_l0
302                    .edge_versions
303                    .get(eid)
304                    .copied()
305                    .unwrap_or(main_l0.current_version);
306                if tx_l0.tombstones.contains_key(eid) {
307                    self.adjacency_manager
308                        .add_tombstone(*eid, *src, *dst, *etype, edge_version);
309                } else {
310                    self.adjacency_manager
311                        .insert_edge(*src, *dst, *eid, *etype, edge_version);
312                }
313            }
314        }
315
316        self.update_metrics();
317
318        // 5. Clear transaction (all critical steps succeeded)
319        self.transaction_l0 = None;
320
321        // 6. check_flush is best-effort compaction, not critical
322        if let Err(e) = self.check_flush().await {
323            tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
324        }
325
326        Ok(())
327    }
328
329    /// Flush the WAL buffer to durable storage.
330    pub async fn flush_wal(&self) -> Result<()> {
331        let l0 = self.l0_manager.get_current();
332        let wal = l0.read().wal.clone();
333
334        if let Some(wal) = wal {
335            wal.flush().await?;
336        }
337        Ok(())
338    }
339
340    pub fn rollback_transaction(&mut self) -> Result<()> {
341        // Idempotent: no error if already cleared (commit succeeded or Drop already cleaned up)
342        self.transaction_l0 = None;
343        Ok(())
344    }
345
346    /// Force-rollback any active transaction. Safe to call if no transaction active.
347    /// Used by Transaction::Drop for cleanup.
348    pub fn force_rollback(&mut self) {
349        if self.transaction_l0.take().is_some() {
350            tracing::warn!("Force-rolled back leaked transaction");
351        }
352    }
353
354    /// Validates vertex constraints for the given properties.
355    /// In the new design, label is passed as a parameter since VID no longer embeds label.
356    async fn validate_vertex_constraints_for_label(
357        &self,
358        vid: Vid,
359        properties: &Properties,
360        label: &str,
361    ) -> Result<()> {
362        let schema = self.schema_manager.schema();
363
364        {
365            // 1. Check NOT NULL constraints (from Property definitions)
366            if let Some(props_meta) = schema.properties.get(label) {
367                for (prop_name, meta) in props_meta {
368                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
369                        log::warn!(
370                            "Constraint violation: Property '{}' cannot be null for label '{}'",
371                            prop_name,
372                            label
373                        );
374                        return Err(anyhow!(
375                            "Constraint violation: Property '{}' cannot be null",
376                            prop_name
377                        ));
378                    }
379                }
380            }
381
382            // 2. Check Explicit Constraints (Unique, Check, etc.)
383            for constraint in &schema.constraints {
384                if !constraint.enabled {
385                    continue;
386                }
387                match &constraint.target {
388                    ConstraintTarget::Label(l) if l == label => {}
389                    _ => continue,
390                }
391
392                match &constraint.constraint_type {
393                    ConstraintType::Unique {
394                        properties: unique_props,
395                    } => {
396                        // Support single and multi-property unique constraints
397                        if !unique_props.is_empty() {
398                            let mut key_values = Vec::new();
399                            let mut missing = false;
400                            for prop in unique_props {
401                                if let Some(val) = properties.get(prop) {
402                                    key_values.push((prop.clone(), val.clone()));
403                                } else {
404                                    missing = true; // Can't enforce if property missing (partial update?)
405                                    // For INSERT, missing means null?
406                                    // If property is nullable, unique constraint typically allows multiple nulls or ignores?
407                                    // For now, only check if ALL keys are present
408                                }
409                            }
410
411                            if !missing {
412                                self.check_unique_constraint_multi(label, &key_values, vid)
413                                    .await?;
414                            }
415                        }
416                    }
417                    ConstraintType::Exists { property } => {
418                        if properties.get(property).is_none_or(|v| v.is_null()) {
419                            log::warn!(
420                                "Constraint violation: Property '{}' must exist for label '{}'",
421                                property,
422                                label
423                            );
424                            return Err(anyhow!(
425                                "Constraint violation: Property '{}' must exist",
426                                property
427                            ));
428                        }
429                    }
430                    ConstraintType::Check { expression } => {
431                        if !self.evaluate_check_constraint(expression, properties)? {
432                            return Err(anyhow!(
433                                "CHECK constraint '{}' violated: expression '{}' evaluated to false",
434                                constraint.name,
435                                expression
436                            ));
437                        }
438                    }
439                    _ => {
440                        return Err(anyhow!("Unsupported constraint type"));
441                    }
442                }
443            }
444        }
445        Ok(())
446    }
447
448    /// Validates vertex constraints for a vertex with the given labels.
449    /// Labels must be passed explicitly since the vertex may not yet be in L0.
450    /// Unknown labels (not in schema) are skipped.
451    async fn validate_vertex_constraints(
452        &self,
453        vid: Vid,
454        properties: &Properties,
455        labels: &[String],
456    ) -> Result<()> {
457        let schema = self.schema_manager.schema();
458
459        // Validate constraints only for known labels
460        for label in labels {
461            // Skip unknown labels (schemaless support)
462            if schema.get_label_case_insensitive(label).is_none() {
463                continue;
464            }
465            self.validate_vertex_constraints_for_label(vid, properties, label)
466                .await?;
467        }
468
469        // Check global ext_id uniqueness if ext_id is provided
470        if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
471            self.check_extid_globally_unique(ext_id, vid).await?;
472        }
473
474        Ok(())
475    }
476
477    /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
478    ///
479    /// Used to build a constraint key index from L0 buffers for batch validation.
480    fn collect_constraint_keys_from_properties<'a>(
481        properties_iter: impl Iterator<Item = &'a Properties>,
482        label: &str,
483        constraints: &[uni_common::core::schema::Constraint],
484        existing_keys: &mut HashMap<String, HashSet<String>>,
485        existing_extids: &mut HashSet<String>,
486    ) {
487        for props in properties_iter {
488            if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
489                existing_extids.insert(ext_id.to_string());
490            }
491
492            for constraint in constraints {
493                if !constraint.enabled {
494                    continue;
495                }
496                if let ConstraintTarget::Label(l) = &constraint.target {
497                    if l != label {
498                        continue;
499                    }
500                } else {
501                    continue;
502                }
503
504                if let ConstraintType::Unique {
505                    properties: unique_props,
506                } = &constraint.constraint_type
507                {
508                    let mut key_parts = Vec::new();
509                    let mut all_present = true;
510                    for prop in unique_props {
511                        if let Some(val) = props.get(prop) {
512                            key_parts.push(format!("{}:{}", prop, val));
513                        } else {
514                            all_present = false;
515                            break;
516                        }
517                    }
518                    if all_present {
519                        let key = key_parts.join("|");
520                        existing_keys
521                            .entry(constraint.name.clone())
522                            .or_default()
523                            .insert(key);
524                    }
525                }
526            }
527        }
528    }
529
530    /// Validates constraints for a batch of vertices efficiently.
531    ///
532    /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
533    /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
534    ///
535    /// # Arguments
536    /// * `vids` - VIDs of vertices being inserted
537    /// * `properties_batch` - Properties for each vertex
538    /// * `label` - Label for all vertices (assumes single label for now)
539    ///
540    /// # Performance
541    /// For N vertices with unique constraints:
542    /// - Old approach: O(N²) - scan L0 buffer N times
543    /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
544    async fn validate_vertex_batch_constraints(
545        &self,
546        vids: &[Vid],
547        properties_batch: &[Properties],
548        label: &str,
549    ) -> Result<()> {
550        if vids.len() != properties_batch.len() {
551            return Err(anyhow!("VID/properties length mismatch"));
552        }
553
554        let schema = self.schema_manager.schema();
555
556        // 1. Validate NOT NULL constraints for each vertex
557        if let Some(props_meta) = schema.properties.get(label) {
558            for (idx, properties) in properties_batch.iter().enumerate() {
559                for (prop_name, meta) in props_meta {
560                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
561                        return Err(anyhow!(
562                            "Constraint violation at index {}: Property '{}' cannot be null",
563                            idx,
564                            prop_name
565                        ));
566                    }
567                }
568            }
569        }
570
571        // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
572        let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
573        let mut existing_extids: HashSet<String> = HashSet::new();
574
575        // Scan current L0 buffer
576        {
577            let l0 = self.l0_manager.get_current();
578            let l0_guard = l0.read();
579            Self::collect_constraint_keys_from_properties(
580                l0_guard.vertex_properties.values(),
581                label,
582                &schema.constraints,
583                &mut existing_keys,
584                &mut existing_extids,
585            );
586        }
587
588        // Scan transaction L0 if present
589        if let Some(tx_l0) = &self.transaction_l0 {
590            let tx_l0_guard = tx_l0.read();
591            Self::collect_constraint_keys_from_properties(
592                tx_l0_guard.vertex_properties.values(),
593                label,
594                &schema.constraints,
595                &mut existing_keys,
596                &mut existing_extids,
597            );
598        }
599
600        // 3. Check batch vertices against index AND check for duplicates within batch
601        let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
602        let mut batch_extids: HashMap<String, usize> = HashMap::new();
603
604        for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
605            // Check ext_id uniqueness
606            if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
607                if existing_extids.contains(ext_id) {
608                    return Err(anyhow!(
609                        "Constraint violation at index {}: ext_id '{}' already exists",
610                        idx,
611                        ext_id
612                    ));
613                }
614                if let Some(first_idx) = batch_extids.get(ext_id) {
615                    return Err(anyhow!(
616                        "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
617                        ext_id,
618                        first_idx,
619                        idx
620                    ));
621                }
622                batch_extids.insert(ext_id.to_string(), idx);
623            }
624
625            // Check unique constraints
626            for constraint in &schema.constraints {
627                if !constraint.enabled {
628                    continue;
629                }
630                if let ConstraintTarget::Label(l) = &constraint.target {
631                    if l != label {
632                        continue;
633                    }
634                } else {
635                    continue;
636                }
637
638                match &constraint.constraint_type {
639                    ConstraintType::Unique {
640                        properties: unique_props,
641                    } => {
642                        let mut key_parts = Vec::new();
643                        let mut all_present = true;
644                        for prop in unique_props {
645                            if let Some(val) = properties.get(prop) {
646                                key_parts.push(format!("{}:{}", prop, val));
647                            } else {
648                                all_present = false;
649                                break;
650                            }
651                        }
652
653                        if all_present {
654                            let key = key_parts.join("|");
655
656                            // Check against existing L0 keys
657                            if let Some(keys) = existing_keys.get(&constraint.name)
658                                && keys.contains(&key)
659                            {
660                                return Err(anyhow!(
661                                    "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
662                                    idx,
663                                    label,
664                                    constraint.name
665                                ));
666                            }
667
668                            // Check for duplicates within batch
669                            let batch_constraint_keys =
670                                batch_keys.entry(constraint.name.clone()).or_default();
671                            if let Some(first_idx) = batch_constraint_keys.get(&key) {
672                                return Err(anyhow!(
673                                    "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
674                                    key,
675                                    first_idx,
676                                    idx
677                                ));
678                            }
679                            batch_constraint_keys.insert(key, idx);
680                        }
681                    }
682                    ConstraintType::Exists { property } => {
683                        if properties.get(property).is_none_or(|v| v.is_null()) {
684                            return Err(anyhow!(
685                                "Constraint violation at index {}: Property '{}' must exist",
686                                idx,
687                                property
688                            ));
689                        }
690                    }
691                    ConstraintType::Check { expression } => {
692                        if !self.evaluate_check_constraint(expression, properties)? {
693                            return Err(anyhow!(
694                                "Constraint violation at index {}: CHECK constraint '{}' violated",
695                                idx,
696                                constraint.name
697                            ));
698                        }
699                    }
700                    _ => {}
701                }
702            }
703        }
704
705        // 4. Check storage for unique constraints (can batch this into a single query)
706        for constraint in &schema.constraints {
707            if !constraint.enabled {
708                continue;
709            }
710            if let ConstraintTarget::Label(l) = &constraint.target {
711                if l != label {
712                    continue;
713                }
714            } else {
715                continue;
716            }
717
718            if let ConstraintType::Unique {
719                properties: unique_props,
720            } = &constraint.constraint_type
721            {
722                // Build compound OR filter for all batch vertices
723                let mut or_filters = Vec::new();
724                for properties in properties_batch.iter() {
725                    let mut and_parts = Vec::new();
726                    let mut all_present = true;
727                    for prop in unique_props {
728                        if let Some(val) = properties.get(prop) {
729                            let val_str = match val {
730                                Value::String(s) => format!("'{}'", s.replace('\'', "''")),
731                                Value::Int(n) => n.to_string(),
732                                Value::Float(f) => f.to_string(),
733                                Value::Bool(b) => b.to_string(),
734                                _ => {
735                                    all_present = false;
736                                    break;
737                                }
738                            };
739                            and_parts.push(format!("{} = {}", prop, val_str));
740                        } else {
741                            all_present = false;
742                            break;
743                        }
744                    }
745                    if all_present {
746                        or_filters.push(format!("({})", and_parts.join(" AND ")));
747                    }
748                }
749
750                if !or_filters.is_empty() {
751                    let vid_list: Vec<String> =
752                        vids.iter().map(|v| v.as_u64().to_string()).collect();
753                    let filter = format!(
754                        "({}) AND _deleted = false AND _vid NOT IN ({})",
755                        or_filters.join(" OR "),
756                        vid_list.join(", ")
757                    );
758
759                    if let Ok(ds) = self.storage.vertex_dataset(label)
760                        && let Ok(lance_ds) = ds.open_raw().await
761                    {
762                        let count = lance_ds.count_rows(Some(filter.clone())).await?;
763                        if count > 0 {
764                            return Err(anyhow!(
765                                "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
766                                label,
767                                constraint.name
768                            ));
769                        }
770                    }
771                }
772            }
773        }
774
775        Ok(())
776    }
777
778    /// Checks that ext_id is globally unique across all vertices.
779    ///
780    /// Searches L0 buffers (current, transaction, pending) and the main vertices table
781    /// to ensure no other vertex uses this ext_id.
782    ///
783    /// # Errors
784    ///
785    /// Returns error if another vertex with the same ext_id exists.
786    async fn check_extid_globally_unique(&self, ext_id: &str, current_vid: Vid) -> Result<()> {
787        // Check L0 buffers: current, transaction, and pending flush
788        let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
789            let mut buffers = vec![self.l0_manager.get_current()];
790            if let Some(tx_l0) = &self.transaction_l0 {
791                buffers.push(tx_l0.clone());
792            }
793            buffers.extend(self.l0_manager.get_pending_flush());
794            buffers
795        };
796
797        for l0 in &l0_buffers_to_check {
798            if let Some(vid) =
799                Self::find_extid_in_properties(&l0.read().vertex_properties, ext_id, current_vid)
800            {
801                return Err(anyhow!(
802                    "Constraint violation: ext_id '{}' already exists (vertex {:?})",
803                    ext_id,
804                    vid
805                ));
806            }
807        }
808
809        // Check main vertices table (if it exists)
810        // Pass None for global uniqueness check (not snapshot-isolated)
811        let lancedb = self.storage.lancedb_store();
812        if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(lancedb, ext_id, None).await
813            && found_vid != current_vid
814        {
815            return Err(anyhow!(
816                "Constraint violation: ext_id '{}' already exists (vertex {:?})",
817                ext_id,
818                found_vid
819            ));
820        }
821
822        Ok(())
823    }
824
825    /// Search vertex properties for a duplicate ext_id, excluding `current_vid`.
826    fn find_extid_in_properties(
827        vertex_properties: &HashMap<Vid, Properties>,
828        ext_id: &str,
829        current_vid: Vid,
830    ) -> Option<Vid> {
831        vertex_properties.iter().find_map(|(&vid, props)| {
832            if vid != current_vid && props.get("ext_id").and_then(|v| v.as_str()) == Some(ext_id) {
833                Some(vid)
834            } else {
835                None
836            }
837        })
838    }
839
840    /// Helper to get vertex labels from L0 buffer.
841    fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
842        let l0 = self.l0_manager.get_current();
843        let l0_guard = l0.read();
844        // Check if vertex is tombstoned (deleted) - if so, return None
845        if l0_guard.vertex_tombstones.contains(&vid) {
846            return None;
847        }
848        l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
849    }
850
851    /// Get vertex labels from all sources: current L0, pending L0s, and storage.
852    /// This is the proper way to read vertex labels after a flush, as it checks both
853    /// in-memory buffers and persisted storage.
854    pub async fn get_vertex_labels(&self, vid: Vid) -> Option<Vec<String>> {
855        // 1. Check current L0
856        if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
857            return Some(labels);
858        }
859
860        // 2. Check transaction L0 if present
861        if let Some(tx_l0) = &self.transaction_l0 {
862            let guard = tx_l0.read();
863            if guard.vertex_tombstones.contains(&vid) {
864                return None;
865            }
866            if let Some(labels) = guard.get_vertex_labels(vid) {
867                return Some(labels.to_vec());
868            }
869        }
870
871        // 3. Check pending flush L0s
872        for pending_l0 in self.l0_manager.get_pending_flush() {
873            let guard = pending_l0.read();
874            if guard.vertex_tombstones.contains(&vid) {
875                return None;
876            }
877            if let Some(labels) = guard.get_vertex_labels(vid) {
878                return Some(labels.to_vec());
879            }
880        }
881
882        // 4. Check storage
883        self.find_vertex_labels_in_storage(vid).await.ok().flatten()
884    }
885
886    /// Helper to get edge type from L0 buffer.
887    fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
888        let l0 = self.l0_manager.get_current();
889        let l0_guard = l0.read();
890        l0_guard.get_edge_type(eid).map(|s| s.to_string())
891    }
892
893    /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
894    /// Falls back to the transaction L0 if available.
895    pub fn get_edge_type_id_from_l0(&self, eid: Eid) -> Option<u32> {
896        // Check transaction L0 first
897        if let Some(tx_l0) = &self.transaction_l0 {
898            let guard = tx_l0.read();
899            if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
900                return Some(etype);
901            }
902        }
903        // Fall back to main L0
904        let l0 = self.l0_manager.get_current();
905        let l0_guard = l0.read();
906        l0_guard
907            .get_edge_endpoint_full(eid)
908            .map(|(_, _, etype)| etype)
909    }
910
911    /// Set the type name for an edge (used for schemaless edge types).
912    /// This is called during CREATE for edge types not found in the schema.
913    pub fn set_edge_type(&self, eid: Eid, type_name: String) {
914        self.active_l0().write().set_edge_type(eid, type_name);
915    }
916
917    /// Evaluate a simple CHECK constraint expression.
918    /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
919    fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
920        let parts: Vec<&str> = expression.split_whitespace().collect();
921        if parts.len() != 3 {
922            // For now, only support "prop op val"
923            // Fallback to true if too complex to avoid breaking, but warn
924            log::warn!(
925                "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
926                expression
927            );
928            return Ok(true);
929        }
930
931        let prop_part = parts[0].trim_start_matches('(');
932        // Handle "variable.property" format - take the part after the dot
933        let prop_name = if let Some(idx) = prop_part.find('.') {
934            &prop_part[idx + 1..]
935        } else {
936            prop_part
937        };
938
939        let op = parts[1];
940        let val_str = parts[2].trim_end_matches(')');
941
942        let prop_val = match properties.get(prop_name) {
943            Some(v) => v,
944            None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
945        };
946
947        // Parse value string (handle quotes for strings)
948        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
949            || (val_str.starts_with('"') && val_str.ends_with('"'))
950        {
951            Value::String(val_str[1..val_str.len() - 1].to_string())
952        } else if let Ok(n) = val_str.parse::<i64>() {
953            Value::Int(n)
954        } else if let Ok(n) = val_str.parse::<f64>() {
955            Value::Float(n)
956        } else if let Ok(b) = val_str.parse::<bool>() {
957            Value::Bool(b)
958        } else {
959            // Check for internal format wrappers if they somehow leaked through
960            if val_str.starts_with("Number(") && val_str.ends_with(')') {
961                let n_str = &val_str[7..val_str.len() - 1];
962                if let Ok(n) = n_str.parse::<i64>() {
963                    Value::Int(n)
964                } else if let Ok(n) = n_str.parse::<f64>() {
965                    Value::Float(n)
966                } else {
967                    Value::String(val_str.to_string())
968                }
969            } else {
970                Value::String(val_str.to_string())
971            }
972        };
973
974        match op {
975            "=" | "==" => Ok(prop_val == &target_val),
976            "!=" | "<>" => Ok(prop_val != &target_val),
977            ">" => self
978                .compare_values(prop_val, &target_val)
979                .map(|o| o.is_gt()),
980            "<" => self
981                .compare_values(prop_val, &target_val)
982                .map(|o| o.is_lt()),
983            ">=" => self
984                .compare_values(prop_val, &target_val)
985                .map(|o| o.is_ge()),
986            "<=" => self
987                .compare_values(prop_val, &target_val)
988                .map(|o| o.is_le()),
989            _ => {
990                log::warn!("Unsupported operator '{}' in CHECK constraint", op);
991                Ok(true)
992            }
993        }
994    }
995
996    fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
997        use std::cmp::Ordering;
998
999        fn cmp_f64(x: f64, y: f64) -> Ordering {
1000            x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1001        }
1002
1003        match (a, b) {
1004            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1005            (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1006            (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1007            (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1008            (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1009            _ => Err(anyhow!(
1010                "Cannot compare incompatible types: {:?} vs {:?}",
1011                a,
1012                b
1013            )),
1014        }
1015    }
1016
1017    async fn check_unique_constraint_multi(
1018        &self,
1019        label: &str,
1020        key_values: &[(String, Value)],
1021        current_vid: Vid,
1022    ) -> Result<()> {
1023        // Serialize constraint key once for O(1) lookups
1024        let key = serialize_constraint_key(label, key_values);
1025
1026        // 1. Check L0 (in-memory) using O(1) constraint index
1027        {
1028            let l0 = self.l0_manager.get_current();
1029            let l0_guard = l0.read();
1030            if l0_guard.has_constraint_key(&key, current_vid) {
1031                return Err(anyhow!(
1032                    "Constraint violation: Duplicate composite key for label '{}'",
1033                    label
1034                ));
1035            }
1036        }
1037
1038        // Check Transaction L0
1039        if let Some(tx_l0) = &self.transaction_l0 {
1040            let tx_l0_guard = tx_l0.read();
1041            if tx_l0_guard.has_constraint_key(&key, current_vid) {
1042                return Err(anyhow!(
1043                    "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1044                    label
1045                ));
1046            }
1047        }
1048
1049        // 2. Check Storage (L1/L2)
1050        let filters: Vec<String> = key_values
1051            .iter()
1052            .map(|(prop, val)| {
1053                let val_str = match val {
1054                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1055                    Value::Int(n) => n.to_string(),
1056                    Value::Float(f) => f.to_string(),
1057                    Value::Bool(b) => b.to_string(),
1058                    _ => "NULL".to_string(),
1059                };
1060                format!("{} = {}", prop, val_str)
1061            })
1062            .collect();
1063
1064        let mut filter = filters.join(" AND ");
1065        filter.push_str(&format!(
1066            " AND _deleted = false AND _vid != {}",
1067            current_vid.as_u64()
1068        ));
1069
1070        if let Ok(ds) = self.storage.vertex_dataset(label)
1071            && let Ok(lance_ds) = ds.open_raw().await
1072        {
1073            let count = lance_ds.count_rows(Some(filter.clone())).await?;
1074            if count > 0 {
1075                return Err(anyhow!(
1076                    "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
1077                    label,
1078                    filter
1079                ));
1080            }
1081        }
1082
1083        Ok(())
1084    }
1085
1086    async fn check_write_pressure(&self) -> Result<()> {
1087        let status = self
1088            .storage
1089            .compaction_status()
1090            .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
1091        let l1_runs = status.l1_runs;
1092        let throttle = &self.config.throttle;
1093
1094        if l1_runs >= throttle.hard_limit {
1095            log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
1096            // Simple polling for now
1097            while self
1098                .storage
1099                .compaction_status()
1100                .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
1101                .l1_runs
1102                >= throttle.hard_limit
1103            {
1104                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1105            }
1106        } else if l1_runs >= throttle.soft_limit {
1107            let excess = l1_runs - throttle.soft_limit;
1108            // Cap multiplier to avoid overflow
1109            let excess = std::cmp::min(excess, 31);
1110            let multiplier = 2_u32.pow(excess as u32);
1111            let delay = throttle.base_delay * multiplier;
1112            tokio::time::sleep(delay).await;
1113        }
1114        Ok(())
1115    }
1116
1117    /// Check transaction memory limit to prevent OOM.
1118    /// No-op when no transaction is active.
1119    fn check_transaction_memory(&self) -> Result<()> {
1120        if let Some(tx_l0) = &self.transaction_l0 {
1121            let size = tx_l0.read().estimated_size;
1122            if size > self.config.max_transaction_memory {
1123                return Err(anyhow!(
1124                    "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
1125                     Roll back or commit the current transaction.",
1126                    size,
1127                    self.config.max_transaction_memory
1128                ));
1129            }
1130        }
1131        Ok(())
1132    }
1133
1134    async fn get_query_context(&self) -> Option<QueryContext> {
1135        Some(QueryContext::new_with_pending(
1136            self.l0_manager.get_current(),
1137            self.transaction_l0.clone(),
1138            self.l0_manager.get_pending_flush(),
1139        ))
1140    }
1141
1142    /// Prepare a vertex for upsert by merging CRDT properties with existing values.
1143    ///
1144    /// When `label` is provided, uses it directly to look up property metadata.
1145    /// Otherwise falls back to discovering the label from L0 buffers and storage.
1146    ///
1147    /// # Errors
1148    ///
1149    /// Returns an error if CRDT property merging fails.
1150    async fn prepare_vertex_upsert(
1151        &self,
1152        vid: Vid,
1153        properties: &mut Properties,
1154        label: Option<&str>,
1155    ) -> Result<()> {
1156        let Some(pm) = &self.property_manager else {
1157            return Ok(());
1158        };
1159
1160        let schema = self.schema_manager.schema();
1161
1162        // Resolve label: use provided label or discover from L0/storage
1163        let discovered_labels;
1164        let label_name = if let Some(l) = label {
1165            Some(l)
1166        } else {
1167            discovered_labels = self.get_vertex_labels(vid).await;
1168            discovered_labels
1169                .as_ref()
1170                .and_then(|l| l.first().map(|s| s.as_str()))
1171        };
1172
1173        let Some(label_str) = label_name else {
1174            return Ok(());
1175        };
1176        let Some(props_meta) = schema.properties.get(label_str) else {
1177            return Ok(());
1178        };
1179
1180        // Identify CRDT properties in the insert data
1181        let crdt_keys: Vec<String> = properties
1182            .keys()
1183            .filter(|key| {
1184                props_meta.get(*key).is_some_and(|meta| {
1185                    matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1186                })
1187            })
1188            .cloned()
1189            .collect();
1190
1191        if crdt_keys.is_empty() {
1192            return Ok(());
1193        }
1194
1195        let ctx = self.get_query_context().await;
1196        for key in crdt_keys {
1197            let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
1198            if !existing.is_null()
1199                && let Some(val) = properties.get_mut(&key)
1200            {
1201                *val = pm.merge_crdt_values(&existing, val)?;
1202            }
1203        }
1204
1205        Ok(())
1206    }
1207
1208    async fn prepare_edge_upsert(&self, eid: Eid, properties: &mut Properties) -> Result<()> {
1209        if let Some(pm) = &self.property_manager {
1210            let schema = self.schema_manager.schema();
1211            // Get edge type from L0 buffer instead of from EID
1212            let type_name = self.get_edge_type_from_l0(eid);
1213
1214            if let Some(ref t_name) = type_name
1215                && let Some(props_meta) = schema.properties.get(t_name)
1216            {
1217                let mut crdt_keys = Vec::new();
1218                for (key, _) in properties.iter() {
1219                    if let Some(meta) = props_meta.get(key)
1220                        && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1221                    {
1222                        crdt_keys.push(key.clone());
1223                    }
1224                }
1225
1226                if !crdt_keys.is_empty() {
1227                    let ctx = self.get_query_context().await;
1228                    for key in crdt_keys {
1229                        let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
1230
1231                        if !existing.is_null()
1232                            && let Some(val) = properties.get_mut(&key)
1233                        {
1234                            *val = pm.merge_crdt_values(&existing, val)?;
1235                        }
1236                    }
1237                }
1238            }
1239        }
1240        Ok(())
1241    }
1242
1243    #[instrument(skip(self, properties), level = "trace")]
1244    pub async fn insert_vertex(&mut self, vid: Vid, properties: Properties) -> Result<()> {
1245        self.insert_vertex_with_labels(vid, properties, &[]).await?;
1246        Ok(())
1247    }
1248
1249    #[instrument(skip(self, properties, labels), level = "trace")]
1250    pub async fn insert_vertex_with_labels(
1251        &mut self,
1252        vid: Vid,
1253        mut properties: Properties,
1254        labels: &[String],
1255    ) -> Result<Properties> {
1256        let start = std::time::Instant::now();
1257        self.check_write_pressure().await?;
1258        self.check_transaction_memory()?;
1259        self.process_embeddings_for_labels(labels, &mut properties)
1260            .await?;
1261        self.validate_vertex_constraints(vid, &properties, labels)
1262            .await?;
1263        self.prepare_vertex_upsert(vid, &mut properties, labels.first().map(|s| s.as_str()))
1264            .await?;
1265
1266        // Clone properties and labels before moving into L0 to return them and populate constraint index
1267        let properties_copy = properties.clone();
1268        let labels_copy = labels.to_vec();
1269
1270        {
1271            let l0 = self.active_l0();
1272            let mut l0_guard = l0.write();
1273            l0_guard.insert_vertex_with_labels(vid, properties, labels);
1274
1275            // Populate constraint index for O(1) duplicate detection
1276            let schema = self.schema_manager.schema();
1277            for label in &labels_copy {
1278                // Skip unknown labels (schemaless support)
1279                if schema.get_label_case_insensitive(label).is_none() {
1280                    continue;
1281                }
1282
1283                // For each unique constraint on this label, insert into constraint index
1284                for constraint in &schema.constraints {
1285                    if !constraint.enabled {
1286                        continue;
1287                    }
1288                    if let ConstraintTarget::Label(l) = &constraint.target {
1289                        if l != label {
1290                            continue;
1291                        }
1292                    } else {
1293                        continue;
1294                    }
1295
1296                    if let ConstraintType::Unique {
1297                        properties: unique_props,
1298                    } = &constraint.constraint_type
1299                    {
1300                        let mut key_values = Vec::new();
1301                        let mut all_present = true;
1302                        for prop in unique_props {
1303                            if let Some(val) = properties_copy.get(prop) {
1304                                key_values.push((prop.clone(), val.clone()));
1305                            } else {
1306                                all_present = false;
1307                                break;
1308                            }
1309                        }
1310
1311                        if all_present {
1312                            let key = serialize_constraint_key(label, &key_values);
1313                            l0_guard.insert_constraint_key(key, vid);
1314                        }
1315                    }
1316                }
1317            }
1318        }
1319
1320        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1321        self.update_metrics();
1322
1323        if self.transaction_l0.is_none() {
1324            self.check_flush().await?;
1325        }
1326        if start.elapsed().as_millis() > 100 {
1327            log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
1328        }
1329        Ok(properties_copy)
1330    }
1331
1332    /// Insert multiple vertices with batched operations.
1333    ///
1334    /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
1335    /// for bulk inserts with unique constraints.
1336    ///
1337    /// # Performance Improvements
1338    /// - Batch VID allocation: 1 call instead of N calls
1339    /// - Batch constraint validation: O(N) instead of O(N²)
1340    /// - Batch embedding generation: 1 API call per config instead of N calls
1341    /// - Transaction wrapping: Automatic flush deferral, atomicity
1342    ///
1343    /// # Arguments
1344    /// * `vids` - Pre-allocated VIDs for the vertices
1345    /// * `properties_batch` - Properties for each vertex
1346    /// * `labels` - Labels for all vertices (assumes single label for simplicity)
1347    ///
1348    /// # Errors
1349    /// Returns error if:
1350    /// - VID/properties length mismatch
1351    /// - Constraint violation detected
1352    /// - Embedding generation fails
1353    /// - Transaction commit fails
1354    ///
1355    /// # Atomicity
1356    /// If this method fails, all changes are rolled back (if transaction was started here).
1357    pub async fn insert_vertices_batch(
1358        &mut self,
1359        vids: Vec<Vid>,
1360        mut properties_batch: Vec<Properties>,
1361        labels: Vec<String>,
1362    ) -> Result<Vec<Properties>> {
1363        let start = std::time::Instant::now();
1364
1365        // Validate inputs
1366        if vids.len() != properties_batch.len() {
1367            return Err(anyhow!(
1368                "VID/properties size mismatch: {} vids, {} properties",
1369                vids.len(),
1370                properties_batch.len()
1371            ));
1372        }
1373
1374        if vids.is_empty() {
1375            return Ok(Vec::new());
1376        }
1377
1378        // Start transaction if not already in one
1379        let is_nested = self.transaction_l0.is_some();
1380        if !is_nested {
1381            self.begin_transaction()?;
1382        }
1383
1384        // Batch operations
1385        let result = async {
1386            self.check_write_pressure().await?;
1387            self.check_transaction_memory()?;
1388
1389            // Batch embedding generation (1 API call per config)
1390            self.process_embeddings_for_batch(&labels, &mut properties_batch)
1391                .await?;
1392
1393            // Batch constraint validation (O(N) instead of O(N²))
1394            let label = labels
1395                .first()
1396                .ok_or_else(|| anyhow!("No labels provided"))?;
1397            self.validate_vertex_batch_constraints(&vids, &properties_batch, label)
1398                .await?;
1399
1400            // Batch prepare (CRDT merging if needed)
1401            // Check schema once: skip entirely if no CRDT properties for this label.
1402            // For new vertices (freshly allocated VIDs), there are no existing CRDT
1403            // values to merge, so the per-vertex lookup is unnecessary in that case.
1404            let has_crdt_fields = {
1405                let schema = self.schema_manager.schema();
1406                schema
1407                    .properties
1408                    .get(label.as_str())
1409                    .is_some_and(|props_meta| {
1410                        props_meta.values().any(|meta| {
1411                            matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1412                        })
1413                    })
1414            };
1415
1416            if has_crdt_fields {
1417                // Batch fetch existing CRDT values: collect VIDs that need merging,
1418                // then query once via PropertyManager instead of per-vertex lookups.
1419                let schema = self.schema_manager.schema();
1420                let crdt_keys: Vec<String> = schema
1421                    .properties
1422                    .get(label.as_str())
1423                    .map(|props_meta| {
1424                        props_meta
1425                            .iter()
1426                            .filter(|(_, meta)| {
1427                                matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1428                            })
1429                            .map(|(key, _)| key.clone())
1430                            .collect()
1431                    })
1432                    .unwrap_or_default();
1433
1434                if let Some(pm) = &self.property_manager {
1435                    let ctx = self.get_query_context().await;
1436                    for (vid, props) in vids.iter().zip(&mut properties_batch) {
1437                        for key in &crdt_keys {
1438                            if props.contains_key(key) {
1439                                let existing =
1440                                    pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
1441                                if !existing.is_null()
1442                                    && let Some(val) = props.get_mut(key)
1443                                {
1444                                    *val = pm.merge_crdt_values(&existing, val)?;
1445                                }
1446                            }
1447                        }
1448                    }
1449                }
1450            }
1451
1452            // Batch L0 writes (WAL batched automatically via transaction)
1453            let tx_l0 = self
1454                .transaction_l0
1455                .as_ref()
1456                .ok_or_else(|| anyhow!("Transaction L0 missing"))?;
1457
1458            let properties_result = properties_batch.clone();
1459            {
1460                let mut l0_guard = tx_l0.write();
1461                for (vid, props) in vids.iter().zip(properties_batch) {
1462                    l0_guard.insert_vertex_with_labels(*vid, props, &labels);
1463                }
1464            }
1465
1466            // Update metrics (batch increment)
1467            metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
1468            self.update_metrics();
1469
1470            Ok::<Vec<Properties>, anyhow::Error>(properties_result)
1471        }
1472        .await;
1473
1474        // Handle transaction commit/rollback
1475        match result {
1476            Ok(props) => {
1477                // Commit if we started the transaction
1478                if !is_nested {
1479                    self.commit_transaction().await?;
1480                }
1481
1482                if start.elapsed().as_millis() > 100 {
1483                    log::warn!(
1484                        "Slow insert_vertices_batch ({} vertices): {}ms",
1485                        vids.len(),
1486                        start.elapsed().as_millis()
1487                    );
1488                }
1489
1490                Ok(props)
1491            }
1492            Err(e) => {
1493                // Rollback if we started the transaction
1494                if !is_nested {
1495                    self.rollback_transaction()?;
1496                }
1497                Err(e)
1498            }
1499        }
1500    }
1501
1502    /// Delete a vertex by VID.
1503    ///
1504    /// When `labels` is provided, uses them directly to populate L0 for
1505    /// correct tombstone flushing. Otherwise discovers labels from L0
1506    /// buffers and storage (which can be slow for many vertices).
1507    ///
1508    /// # Errors
1509    ///
1510    /// Returns an error if write pressure stalls, label lookup fails, or
1511    /// the L0 delete operation fails.
1512    #[instrument(skip(self, labels), level = "trace")]
1513    pub async fn delete_vertex(&mut self, vid: Vid, labels: Option<Vec<String>>) -> Result<()> {
1514        let start = std::time::Instant::now();
1515        self.check_write_pressure().await?;
1516        self.check_transaction_memory()?;
1517        let l0 = self.active_l0();
1518
1519        // Before deleting, ensure we have the vertex's labels stored in L0
1520        // so the tombstone can be properly flushed to the correct label datasets.
1521        let has_labels = {
1522            let l0_guard = l0.read();
1523            l0_guard.vertex_labels.contains_key(&vid)
1524        };
1525
1526        if !has_labels {
1527            let resolved_labels = if let Some(provided) = labels {
1528                // Caller provided labels — skip the lookup entirely
1529                Some(provided)
1530            } else {
1531                // Discover labels from pending flush L0s, then storage
1532                let mut found = None;
1533                for pending_l0 in self.l0_manager.get_pending_flush() {
1534                    let pending_guard = pending_l0.read();
1535                    if let Some(l) = pending_guard.get_vertex_labels(vid) {
1536                        found = Some(l.to_vec());
1537                        break;
1538                    }
1539                }
1540                if found.is_none() {
1541                    found = self.find_vertex_labels_in_storage(vid).await?;
1542                }
1543                found
1544            };
1545
1546            if let Some(found_labels) = resolved_labels {
1547                let mut l0_guard = l0.write();
1548                l0_guard.vertex_labels.insert(vid, found_labels);
1549            }
1550        }
1551
1552        l0.write().delete_vertex(vid)?;
1553        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1554        self.update_metrics();
1555
1556        if self.transaction_l0.is_none() {
1557            self.check_flush().await?;
1558        }
1559        if start.elapsed().as_millis() > 100 {
1560            log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
1561        }
1562        Ok(())
1563    }
1564
1565    /// Find vertex labels from storage by querying the main vertices table.
1566    /// Returns the labels from the latest non-deleted version of the vertex.
1567    async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1568        use arrow_array::Array;
1569        use arrow_array::cast::AsArray;
1570        use lancedb::query::{ExecutableQuery, QueryBase, Select};
1571
1572        let lancedb_store = self.storage.lancedb_store();
1573        let table_name = MainVertexDataset::table_name();
1574
1575        // Check if table exists first; if not, vertex hasn't been flushed to storage yet
1576        if !lancedb_store.table_exists(table_name).await? {
1577            return Ok(None);
1578        }
1579
1580        let table = lancedb_store.open_table(table_name).await?;
1581
1582        // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
1583        let filter = format!("_vid = {}", vid.as_u64());
1584        let query = table.query().only_if(filter).select(Select::Columns(vec![
1585            "_vid".to_string(),
1586            "labels".to_string(),
1587            "_version".to_string(),
1588            "_deleted".to_string(),
1589        ]));
1590
1591        let stream = query.execute().await?;
1592        let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await.unwrap_or_default();
1593
1594        // Find the row with the highest version number
1595        let mut max_version: Option<u64> = None;
1596        let mut labels: Option<Vec<String>> = None;
1597        let mut is_deleted = false;
1598
1599        for batch in batches {
1600            if batch.num_rows() == 0 {
1601                continue;
1602            }
1603
1604            let version_array = batch
1605                .column_by_name("_version")
1606                .unwrap()
1607                .as_primitive::<arrow_array::types::UInt64Type>();
1608
1609            let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
1610
1611            let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
1612
1613            for row_idx in 0..batch.num_rows() {
1614                let version = version_array.value(row_idx);
1615
1616                if max_version.is_none_or(|mv| version > mv) {
1617                    is_deleted = deleted_array.value(row_idx);
1618
1619                    let labels_list = labels_array.value(row_idx);
1620                    let string_array = labels_list.as_string::<i32>();
1621                    let vertex_labels: Vec<String> = (0..string_array.len())
1622                        .filter(|&i| !string_array.is_null(i))
1623                        .map(|i| string_array.value(i).to_string())
1624                        .collect();
1625
1626                    max_version = Some(version);
1627                    labels = Some(vertex_labels);
1628                }
1629            }
1630        }
1631
1632        // If the latest version is deleted, return None
1633        if is_deleted { Ok(None) } else { Ok(labels) }
1634    }
1635
1636    #[instrument(skip(self, properties), level = "trace")]
1637    pub async fn insert_edge(
1638        &mut self,
1639        src_vid: Vid,
1640        dst_vid: Vid,
1641        edge_type: u32,
1642        eid: Eid,
1643        mut properties: Properties,
1644        edge_type_name: Option<String>,
1645    ) -> Result<()> {
1646        let start = std::time::Instant::now();
1647        self.check_write_pressure().await?;
1648        self.check_transaction_memory()?;
1649        self.prepare_edge_upsert(eid, &mut properties).await?;
1650
1651        let l0 = self.active_l0();
1652        l0.write()
1653            .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
1654
1655        // Dual-write to AdjacencyManager overlay (survives flush).
1656        // Skip for transaction-local L0 -- transaction edges are overlaid separately.
1657        if self.transaction_l0.is_none() {
1658            let version = l0.read().current_version;
1659            self.adjacency_manager
1660                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
1661        }
1662
1663        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1664        self.update_metrics();
1665
1666        if self.transaction_l0.is_none() {
1667            self.check_flush().await?;
1668        }
1669        if start.elapsed().as_millis() > 100 {
1670            log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
1671        }
1672        Ok(())
1673    }
1674
1675    #[instrument(skip(self), level = "trace")]
1676    pub async fn delete_edge(
1677        &mut self,
1678        eid: Eid,
1679        src_vid: Vid,
1680        dst_vid: Vid,
1681        edge_type: u32,
1682    ) -> Result<()> {
1683        let start = std::time::Instant::now();
1684        self.check_write_pressure().await?;
1685        self.check_transaction_memory()?;
1686        let l0 = self.active_l0();
1687
1688        l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
1689
1690        // Dual-write tombstone to AdjacencyManager overlay.
1691        if self.transaction_l0.is_none() {
1692            let version = l0.read().current_version;
1693            self.adjacency_manager
1694                .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
1695        }
1696        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1697        self.update_metrics();
1698
1699        if self.transaction_l0.is_none() {
1700            self.check_flush().await?;
1701        }
1702        if start.elapsed().as_millis() > 100 {
1703            log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
1704        }
1705        Ok(())
1706    }
1707
1708    /// Check if flush should be triggered based on mutation count or time elapsed.
1709    /// This method is called after each write operation and can also be called
1710    /// by a background task for time-based flushing.
1711    pub async fn check_flush(&mut self) -> Result<()> {
1712        let count = self.l0_manager.get_current().read().mutation_count;
1713
1714        // Skip if no mutations
1715        if count == 0 {
1716            return Ok(());
1717        }
1718
1719        // Flush on mutation count threshold (10,000 default)
1720        if count >= self.config.auto_flush_threshold {
1721            self.flush_to_l1(None).await?;
1722            return Ok(());
1723        }
1724
1725        // Flush on time interval IF minimum mutations met
1726        if let Some(interval) = self.config.auto_flush_interval
1727            && self.last_flush_time.elapsed() >= interval
1728            && count >= self.config.auto_flush_min_mutations
1729        {
1730            self.flush_to_l1(None).await?;
1731        }
1732
1733        Ok(())
1734    }
1735
1736    /// Process embeddings for a vertex using labels passed directly.
1737    /// Use this when labels haven't been stored to L0 yet.
1738    async fn process_embeddings_for_labels(
1739        &self,
1740        labels: &[String],
1741        properties: &mut Properties,
1742    ) -> Result<()> {
1743        let label_name = labels.first().map(|s| s.as_str());
1744        self.process_embeddings_impl(label_name, properties).await
1745    }
1746
1747    /// Process embeddings for a batch of vertices efficiently.
1748    ///
1749    /// Groups vertices by embedding config and makes batched API calls to the
1750    /// embedding service instead of calling once per vertex.
1751    ///
1752    /// # Performance
1753    /// For N vertices with embedding config:
1754    /// - Old approach: N API calls to embedding service
1755    /// - New approach: 1 API call per embedding config (usually 1 total)
1756    async fn process_embeddings_for_batch(
1757        &self,
1758        labels: &[String],
1759        properties_batch: &mut [Properties],
1760    ) -> Result<()> {
1761        let label_name = labels.first().map(|s| s.as_str());
1762        let schema = self.schema_manager.schema();
1763
1764        if let Some(label) = label_name {
1765            // Find vector indexes with embedding config for this label
1766            let mut configs = Vec::new();
1767            for idx in &schema.indexes {
1768                if let IndexDefinition::Vector(v_config) = idx
1769                    && v_config.label == label
1770                    && let Some(emb_config) = &v_config.embedding_config
1771                {
1772                    configs.push((v_config.property.clone(), emb_config.clone()));
1773                }
1774            }
1775
1776            if configs.is_empty() {
1777                return Ok(());
1778            }
1779
1780            for (target_prop, emb_config) in configs {
1781                // Collect input texts from all vertices that need embeddings
1782                let mut input_texts: Vec<String> = Vec::new();
1783                let mut needs_embedding: Vec<usize> = Vec::new();
1784
1785                for (idx, properties) in properties_batch.iter().enumerate() {
1786                    // Skip if target property already exists
1787                    if properties.contains_key(&target_prop) {
1788                        continue;
1789                    }
1790
1791                    // Check if source properties exist
1792                    let mut inputs = Vec::new();
1793                    for src_prop in &emb_config.source_properties {
1794                        if let Some(val) = properties.get(src_prop)
1795                            && let Some(s) = val.as_str()
1796                        {
1797                            inputs.push(s.to_string());
1798                        }
1799                    }
1800
1801                    if !inputs.is_empty() {
1802                        let input_text = inputs.join(" ");
1803                        input_texts.push(input_text);
1804                        needs_embedding.push(idx);
1805                    }
1806                }
1807
1808                if input_texts.is_empty() {
1809                    continue;
1810                }
1811
1812                let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1813                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1814                })?;
1815                let embedder = runtime.embedding(&emb_config.alias).await?;
1816
1817                // Batch generate embeddings (single API call)
1818                let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
1819                let embeddings = embedder.embed(input_refs).await?;
1820
1821                // Distribute results back to properties
1822                for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
1823                    if let Some(vec) = embeddings.get(embedding_idx) {
1824                        let vals: Vec<Value> =
1825                            vec.iter().map(|f| Value::Float(*f as f64)).collect();
1826                        properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
1827                    }
1828                }
1829            }
1830        }
1831
1832        Ok(())
1833    }
1834
1835    async fn process_embeddings_impl(
1836        &self,
1837        label_name: Option<&str>,
1838        properties: &mut Properties,
1839    ) -> Result<()> {
1840        let schema = self.schema_manager.schema();
1841
1842        if let Some(label) = label_name {
1843            // Find vector indexes with embedding config for this label
1844            let mut configs = Vec::new();
1845            for idx in &schema.indexes {
1846                if let IndexDefinition::Vector(v_config) = idx
1847                    && v_config.label == label
1848                    && let Some(emb_config) = &v_config.embedding_config
1849                {
1850                    configs.push((v_config.property.clone(), emb_config.clone()));
1851                }
1852            }
1853
1854            if configs.is_empty() {
1855                log::info!("No embedding config found for label {}", label);
1856            }
1857
1858            for (target_prop, emb_config) in configs {
1859                // If target property already exists, skip (assume user provided it)
1860                if properties.contains_key(&target_prop) {
1861                    continue;
1862                }
1863
1864                // Check if source properties exist
1865                let mut inputs = Vec::new();
1866                for src_prop in &emb_config.source_properties {
1867                    if let Some(val) = properties.get(src_prop)
1868                        && let Some(s) = val.as_str()
1869                    {
1870                        inputs.push(s.to_string());
1871                    }
1872                }
1873
1874                if inputs.is_empty() {
1875                    continue;
1876                }
1877
1878                let input_text = inputs.join(" "); // Simple concatenation
1879
1880                let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1881                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1882                })?;
1883                let embedder = runtime.embedding(&emb_config.alias).await?;
1884
1885                // Generate
1886                let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
1887                if let Some(vec) = embeddings.first() {
1888                    // Store as array of floats
1889                    let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
1890                    properties.insert(target_prop.clone(), Value::List(vals));
1891                }
1892            }
1893        }
1894        Ok(())
1895    }
1896
1897    /// Flushes the current in-memory L0 buffer to L1 storage.
1898    ///
1899    /// # Lock Ordering
1900    ///
1901    /// To prevent deadlocks, locks must be acquired in the following order:
1902    /// 1. `Writer` lock (held by caller)
1903    /// 2. `L0Manager` lock (via `begin_flush` / `get_current`)
1904    /// 3. `L0Buffer` lock (individual buffer RWLocks)
1905    /// 4. `Index` / `Storage` locks (during actual flush)
1906    #[instrument(
1907        skip(self),
1908        fields(snapshot_id, mutations_count, size_bytes),
1909        level = "info"
1910    )]
1911    pub async fn flush_to_l1(&mut self, name: Option<String>) -> Result<String> {
1912        let start = std::time::Instant::now();
1913        let schema = self.schema_manager.schema();
1914
1915        let (initial_size, initial_count) = {
1916            let l0_arc = self.l0_manager.get_current();
1917            let l0 = l0_arc.read();
1918            (l0.estimated_size, l0.mutation_count)
1919        };
1920        tracing::Span::current().record("size_bytes", initial_size);
1921        tracing::Span::current().record("mutations_count", initial_count);
1922
1923        debug!("Starting L0 flush to L1");
1924
1925        // 1. Flush WAL BEFORE rotating L0
1926        // This ensures that if WAL flush fails, the current L0 is still active
1927        // and mutations are retained in memory until restart/retry.
1928        // Capture the LSN of the flushed segment for the snapshot's wal_high_water_mark.
1929        let wal_for_truncate = {
1930            let current_l0 = self.l0_manager.get_current();
1931            let l0_guard = current_l0.read();
1932            l0_guard.wal.clone()
1933        };
1934
1935        let wal_lsn = if let Some(ref w) = wal_for_truncate {
1936            w.flush().await?
1937        } else {
1938            0
1939        };
1940
1941        // 2. Begin flush: rotate L0 and keep old L0 visible to reads
1942        // The old L0 stays in pending_flush list until complete_flush is called,
1943        // ensuring data remains visible even if L1 writes fail.
1944        let old_l0_arc = self.l0_manager.begin_flush(0, None);
1945        metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
1946
1947        let current_version;
1948        {
1949            // Acquire Write lock to take WAL and version
1950            let mut old_l0_guard = old_l0_arc.write();
1951            current_version = old_l0_guard.current_version;
1952
1953            // Record the WAL LSN for this L0 so we don't truncate past it
1954            // if this flush fails and a subsequent flush succeeds.
1955            old_l0_guard.wal_lsn_at_flush = wal_lsn;
1956
1957            let wal = old_l0_guard.wal.take();
1958
1959            // Give WAL to new L0
1960            let new_l0_arc = self.l0_manager.get_current();
1961            let mut new_l0_guard = new_l0_arc.write();
1962            new_l0_guard.wal = wal;
1963            new_l0_guard.current_version = current_version;
1964        } // Drop locks
1965
1966        // 2. Acquire Read lock on Old L0 for flushing
1967        let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
1968        // (Vid, labels, properties, deleted, version)
1969        type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
1970        let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
1971        // Collect vertex timestamps from L0 for flushing to storage
1972        let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
1973        let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
1974        // Track tombstones missing labels for storage query fallback
1975        let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
1976
1977        {
1978            let old_l0 = old_l0_arc.read();
1979
1980            // 1. Collect all edges and tombstones from L0
1981            for edge in old_l0.graph.edges() {
1982                let properties = old_l0
1983                    .edge_properties
1984                    .get(&edge.eid)
1985                    .cloned()
1986                    .unwrap_or_default();
1987                let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
1988
1989                // Get timestamps from L0 buffer (populated during insert)
1990                let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
1991                let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
1992
1993                entries_by_type
1994                    .entry(edge.edge_type)
1995                    .or_default()
1996                    .push(L1Entry {
1997                        src_vid: edge.src_vid,
1998                        dst_vid: edge.dst_vid,
1999                        eid: edge.eid,
2000                        op: Op::Insert,
2001                        version,
2002                        properties,
2003                        created_at,
2004                        updated_at,
2005                    });
2006            }
2007
2008            // From tombstones
2009            for tombstone in old_l0.tombstones.values() {
2010                let version = old_l0
2011                    .edge_versions
2012                    .get(&tombstone.eid)
2013                    .copied()
2014                    .unwrap_or(0);
2015                // Get timestamps - for deletes, updated_at reflects deletion time
2016                let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
2017                let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
2018
2019                entries_by_type
2020                    .entry(tombstone.edge_type)
2021                    .or_default()
2022                    .push(L1Entry {
2023                        src_vid: tombstone.src_vid,
2024                        dst_vid: tombstone.dst_vid,
2025                        eid: tombstone.eid,
2026                        op: Op::Delete,
2027                        version,
2028                        properties: HashMap::new(),
2029                        created_at,
2030                        updated_at,
2031                    });
2032            }
2033
2034            // 2.5 Flush Vertices - Collect by label (using vertex_labels from L0)
2035            //
2036            // Helper: fan-out a single vertex entry into per-label buckets.
2037            // Each per-label table row carries the full label set so multi-label
2038            // info is preserved after flush.
2039            let push_vertex_to_labels =
2040                |vid: Vid,
2041                 all_labels: &[String],
2042                 props: Properties,
2043                 deleted: bool,
2044                 version: u64,
2045                 out: &mut HashMap<u16, Vec<VertexEntry>>| {
2046                    for label in all_labels {
2047                        if let Some(label_id) = schema.label_id_by_name(label) {
2048                            out.entry(label_id).or_default().push((
2049                                vid,
2050                                all_labels.to_vec(),
2051                                props.clone(),
2052                                deleted,
2053                                version,
2054                            ));
2055                        }
2056                    }
2057                };
2058
2059            for (vid, props) in &old_l0.vertex_properties {
2060                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2061                // Collect timestamps for this vertex
2062                if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
2063                    vertex_created_at.insert(*vid, ts);
2064                }
2065                if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
2066                    vertex_updated_at.insert(*vid, ts);
2067                }
2068                if let Some(labels) = old_l0.vertex_labels.get(vid) {
2069                    push_vertex_to_labels(
2070                        *vid,
2071                        labels,
2072                        props.clone(),
2073                        false,
2074                        version,
2075                        &mut vertices_by_label,
2076                    );
2077                }
2078            }
2079            for &vid in &old_l0.vertex_tombstones {
2080                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2081                if let Some(labels) = old_l0.vertex_labels.get(&vid) {
2082                    push_vertex_to_labels(
2083                        vid,
2084                        labels,
2085                        HashMap::new(),
2086                        true,
2087                        version,
2088                        &mut vertices_by_label,
2089                    );
2090                } else {
2091                    // Tombstone missing labels (old WAL format) - collect for storage query fallback
2092                    orphaned_tombstones.push((vid, version));
2093                }
2094            }
2095        } // Drop read lock
2096
2097        // Resolve orphaned tombstones (missing labels) from storage
2098        if !orphaned_tombstones.is_empty() {
2099            tracing::warn!(
2100                count = orphaned_tombstones.len(),
2101                "Tombstones missing labels in L0, querying storage as fallback"
2102            );
2103            for (vid, version) in orphaned_tombstones {
2104                if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
2105                    && !labels.is_empty()
2106                {
2107                    for label in &labels {
2108                        if let Some(label_id) = schema.label_id_by_name(label) {
2109                            vertices_by_label.entry(label_id).or_default().push((
2110                                vid,
2111                                labels.clone(),
2112                                HashMap::new(),
2113                                true,
2114                                version,
2115                            ));
2116                        }
2117                    }
2118                }
2119            }
2120        }
2121
2122        // 0. Load previous snapshot or create new
2123        let mut manifest = self
2124            .storage
2125            .snapshot_manager()
2126            .load_latest_snapshot()
2127            .await?
2128            .unwrap_or_else(|| {
2129                SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
2130            });
2131
2132        // Update snapshot metadata
2133        // Save parent snapshot ID before generating new one (for lineage tracking)
2134        let parent_id = manifest.snapshot_id.clone();
2135        manifest.parent_snapshot = Some(parent_id);
2136        manifest.snapshot_id = Uuid::new_v4().to_string();
2137        manifest.name = name;
2138        manifest.created_at = Utc::now();
2139        manifest.version_high_water_mark = current_version;
2140        manifest.wal_high_water_mark = wal_lsn;
2141        let snapshot_id = manifest.snapshot_id.clone();
2142
2143        tracing::Span::current().record("snapshot_id", &snapshot_id);
2144
2145        // 2. For each edge type, write FWD and BWD runs
2146        let lancedb_store = self.storage.lancedb_store();
2147
2148        for (&edge_type_id, entries) in entries_by_type.iter() {
2149            // Get edge type name from unified lookup (handles both schema'd and schemaless)
2150            let edge_type_name = self
2151                .storage
2152                .schema_manager()
2153                .edge_type_name_by_id_unified(edge_type_id)
2154                .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
2155
2156            // FWD Run (sorted by src_vid)
2157            let mut fwd_entries = entries.clone();
2158            fwd_entries.sort_by_key(|e| e.src_vid);
2159            let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
2160            let fwd_batch = fwd_ds.build_record_batch(&fwd_entries, &schema)?;
2161
2162            // Write using LanceDB
2163            let table = fwd_ds.write_run_lancedb(lancedb_store, fwd_batch).await?;
2164            fwd_ds.ensure_eid_index_lancedb(&table).await?;
2165
2166            // BWD Run (sorted by dst_vid)
2167            let mut bwd_entries = entries.clone();
2168            bwd_entries.sort_by_key(|e| e.dst_vid);
2169            let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
2170            let bwd_batch = bwd_ds.build_record_batch(&bwd_entries, &schema)?;
2171
2172            let bwd_table = bwd_ds.write_run_lancedb(lancedb_store, bwd_batch).await?;
2173            bwd_ds.ensure_eid_index_lancedb(&bwd_table).await?;
2174
2175            // Update Manifest
2176            let current_snap =
2177                manifest
2178                    .edges
2179                    .entry(edge_type_name.to_string())
2180                    .or_insert(EdgeSnapshot {
2181                        version: 0,
2182                        count: 0,
2183                        lance_version: 0,
2184                    });
2185            current_snap.version += 1;
2186            current_snap.count += entries.len() as u64;
2187            // LanceDB tables don't expose Lance version directly
2188            current_snap.lance_version = 0;
2189
2190            // Note: No CSR invalidation needed. AdjacencyManager's overlay
2191            // already has these edges via dual-write in insert_edge/delete_edge.
2192        }
2193
2194        // 2.5 Flush Vertices
2195        for (label_id, vertices) in vertices_by_label {
2196            let label_name = schema
2197                .label_name_by_id(label_id)
2198                .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
2199
2200            let ds = self.storage.vertex_dataset(label_name)?;
2201
2202            // Collect inverted index updates before consuming vertices
2203            // Maps: cfg.property -> (added, removed)
2204            type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
2205            let mut inverted_updates: InvertedUpdateMap = HashMap::new();
2206
2207            for idx in &schema.indexes {
2208                if let IndexDefinition::Inverted(cfg) = idx
2209                    && cfg.label == label_name
2210                {
2211                    let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
2212                    let mut removed: HashSet<Vid> = HashSet::new();
2213
2214                    for (vid, _labels, props, deleted, _version) in &vertices {
2215                        if *deleted {
2216                            removed.insert(*vid);
2217                        } else if let Some(prop_value) = props.get(&cfg.property) {
2218                            // Extract terms from the property value (List<String>)
2219                            if let Some(arr) = prop_value.as_array() {
2220                                let terms: Vec<String> = arr
2221                                    .iter()
2222                                    .filter_map(|v| v.as_str().map(ToString::to_string))
2223                                    .collect();
2224                                if !terms.is_empty() {
2225                                    added.insert(*vid, terms);
2226                                }
2227                            }
2228                        }
2229                    }
2230
2231                    if !added.is_empty() || !removed.is_empty() {
2232                        inverted_updates.insert(cfg.property.clone(), (added, removed));
2233                    }
2234                }
2235            }
2236
2237            let mut v_data = Vec::new();
2238            let mut d_data = Vec::new();
2239            let mut ver_data = Vec::new();
2240            for (vid, labels, props, deleted, version) in vertices {
2241                v_data.push((vid, labels, props));
2242                d_data.push(deleted);
2243                ver_data.push(version);
2244            }
2245
2246            let batch = ds.build_record_batch_with_timestamps(
2247                &v_data,
2248                &d_data,
2249                &ver_data,
2250                &schema,
2251                Some(&vertex_created_at),
2252                Some(&vertex_updated_at),
2253            )?;
2254
2255            // Write using LanceDB
2256            let table = ds
2257                .write_batch_lancedb(lancedb_store, batch, &schema)
2258                .await?;
2259            ds.ensure_default_indexes_lancedb(&table).await?;
2260
2261            // Update VidLabelsIndex (if enabled)
2262            for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
2263                if deleted {
2264                    self.storage.remove_from_vid_labels_index(*vid);
2265                } else {
2266                    self.storage.update_vid_labels_index(*vid, labels.clone());
2267                }
2268            }
2269
2270            // Update Manifest
2271            let current_snap =
2272                manifest
2273                    .vertices
2274                    .entry(label_name.to_string())
2275                    .or_insert(LabelSnapshot {
2276                        version: 0,
2277                        count: 0,
2278                        lance_version: 0,
2279                    });
2280            current_snap.version += 1;
2281            current_snap.count += v_data.len() as u64;
2282            // LanceDB tables don't expose Lance version directly
2283            current_snap.lance_version = 0;
2284
2285            // Invalidate table cache to ensure next read picks up new version
2286            self.storage.invalidate_table_cache(label_name);
2287
2288            // Apply inverted index updates incrementally
2289            for idx in &schema.indexes {
2290                if let IndexDefinition::Inverted(cfg) = idx
2291                    && cfg.label == label_name
2292                    && let Some((added, removed)) = inverted_updates.get(&cfg.property)
2293                {
2294                    self.storage
2295                        .index_manager()
2296                        .update_inverted_index_incremental(cfg, added, removed)
2297                        .await?;
2298                }
2299            }
2300
2301            // Update UID index with new vertex mappings
2302            // Collect (UniId, Vid) mappings from non-deleted vertices
2303            let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
2304            for (vid, _labels, props) in &v_data {
2305                let ext_id = props.get("ext_id").and_then(|v| v.as_str());
2306                let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
2307                    label_name, ext_id, props,
2308                );
2309                uid_mappings.push((uid, *vid));
2310            }
2311
2312            if !uid_mappings.is_empty()
2313                && let Ok(uid_index) = self.storage.uid_index(label_name)
2314            {
2315                // Issue #107: Check for UID collisions and FAIL instead of warning.
2316                // SHA3-256 collisions are astronomically unlikely (~2^256), but if one
2317                // occurs, we must reject the flush to prevent silent data corruption.
2318                // Changed from tracing::warn!() to anyhow::bail!().
2319                for (uid, vid) in &uid_mappings {
2320                    if let Ok(Some(existing_vid)) = uid_index.get_vid(uid).await
2321                        && existing_vid != *vid
2322                    {
2323                        anyhow::bail!(
2324                            "UID collision detected: UID {:?} maps to both VID {} and VID {}. \
2325                            This indicates either a hash collision (astronomically unlikely with SHA3-256) \
2326                            or data corruption. Cannot proceed with flush.",
2327                            uid,
2328                            existing_vid.as_u64(),
2329                            vid.as_u64()
2330                        );
2331                    }
2332                }
2333
2334                uid_index.write_mapping(&uid_mappings).await?;
2335            }
2336        }
2337
2338        // 3. Write to main unified tables (dual-write for fast ID-based lookups)
2339        // 3.1 Write to main edges table
2340        // Collect data while holding the lock, then release before async operations
2341        let (main_edges, edge_created_at_map, edge_updated_at_map) = {
2342            let _old_l0 = old_l0_arc.read();
2343            let mut main_edges: Vec<(
2344                uni_common::core::id::Eid,
2345                Vid,
2346                Vid,
2347                String,
2348                Properties,
2349                bool,
2350                u64,
2351            )> = Vec::new();
2352            let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2353            let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2354
2355            for (&edge_type_id, entries) in entries_by_type.iter() {
2356                for entry in entries {
2357                    // Get edge type name from unified lookup (handles both schema'd and schemaless)
2358                    let edge_type_name = self
2359                        .storage
2360                        .schema_manager()
2361                        .edge_type_name_by_id_unified(edge_type_id)
2362                        .unwrap_or_else(|| "unknown".to_string());
2363
2364                    let deleted = matches!(entry.op, Op::Delete);
2365                    main_edges.push((
2366                        entry.eid,
2367                        entry.src_vid,
2368                        entry.dst_vid,
2369                        edge_type_name,
2370                        entry.properties.clone(),
2371                        deleted,
2372                        entry.version,
2373                    ));
2374
2375                    if let Some(ts) = entry.created_at {
2376                        edge_created_at_map.insert(entry.eid, ts);
2377                    }
2378                    if let Some(ts) = entry.updated_at {
2379                        edge_updated_at_map.insert(entry.eid, ts);
2380                    }
2381                }
2382            }
2383
2384            (main_edges, edge_created_at_map, edge_updated_at_map)
2385        }; // Lock released here
2386
2387        if !main_edges.is_empty() {
2388            let main_edge_batch = MainEdgeDataset::build_record_batch(
2389                &main_edges,
2390                Some(&edge_created_at_map),
2391                Some(&edge_updated_at_map),
2392            )?;
2393            let main_edge_table =
2394                MainEdgeDataset::write_batch_lancedb(lancedb_store, main_edge_batch).await?;
2395            MainEdgeDataset::ensure_default_indexes_lancedb(&main_edge_table).await?;
2396        }
2397
2398        // 3.2 Write to main vertices table
2399        // Collect data while holding the lock, then release before async operations
2400        let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
2401            let old_l0 = old_l0_arc.read();
2402            let mut vertices = Vec::new();
2403
2404            // Collect all vertices from vertex_properties
2405            for (vid, props) in &old_l0.vertex_properties {
2406                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2407                let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
2408                vertices.push((*vid, labels, props.clone(), false, version));
2409            }
2410
2411            // Collect tombstones
2412            for &vid in &old_l0.vertex_tombstones {
2413                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2414                let labels = old_l0.vertex_labels.get(&vid).cloned().unwrap_or_default();
2415                vertices.push((vid, labels, HashMap::new(), true, version));
2416            }
2417
2418            vertices
2419        }; // Lock released here
2420
2421        if !main_vertices.is_empty() {
2422            let main_vertex_batch = MainVertexDataset::build_record_batch(
2423                &main_vertices,
2424                Some(&vertex_created_at),
2425                Some(&vertex_updated_at),
2426            )?;
2427            let main_vertex_table =
2428                MainVertexDataset::write_batch_lancedb(lancedb_store, main_vertex_batch).await?;
2429            MainVertexDataset::ensure_default_indexes_lancedb(&main_vertex_table).await?;
2430        }
2431
2432        // Save Snapshot
2433        self.storage
2434            .snapshot_manager()
2435            .save_snapshot(&manifest)
2436            .await?;
2437        self.storage
2438            .snapshot_manager()
2439            .set_latest_snapshot(&manifest.snapshot_id)
2440            .await?;
2441
2442        // Complete flush: remove old L0 from pending list now that L1 writes succeeded.
2443        // This must happen BEFORE WAL truncation so min_pending_wal_lsn is accurate.
2444        self.l0_manager.complete_flush(&old_l0_arc);
2445
2446        // Truncate WAL segments, but only up to the minimum LSN of any remaining pending L0s.
2447        // This prevents data loss if earlier flushes failed and left L0s in pending_flush.
2448        if let Some(w) = wal_for_truncate {
2449            // Determine safe truncation point: the minimum of our LSN and any pending L0s
2450            let safe_lsn = self
2451                .l0_manager
2452                .min_pending_wal_lsn()
2453                .map(|min_pending| min_pending.min(wal_lsn))
2454                .unwrap_or(wal_lsn);
2455            w.truncate_before(safe_lsn).await?;
2456        }
2457
2458        // Invalidate property cache after flush to prevent stale reads.
2459        // Once L0 data moves to storage, cached values from storage may be outdated.
2460        if let Some(ref pm) = self.property_manager {
2461            pm.clear_cache().await;
2462        }
2463
2464        // Reset last flush time for time-based auto-flush
2465        self.last_flush_time = std::time::Instant::now();
2466
2467        info!(
2468            snapshot_id,
2469            mutations_count = initial_count,
2470            size_bytes = initial_size,
2471            "L0 flush to L1 completed successfully"
2472        );
2473        metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
2474        metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
2475        metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
2476
2477        // Trigger CSR compaction if enough frozen segments have accumulated.
2478        // After flush, the old L0 data is now in L1; the overlay segments can be merged
2479        // into the Main CSR to reduce lookup overhead.
2480        let am = self.adjacency_manager.clone();
2481        if am.should_compact(4) {
2482            let previous_still_running = {
2483                let guard = self.compaction_handle.read();
2484                guard.as_ref().is_some_and(|h| !h.is_finished())
2485            };
2486
2487            if previous_still_running {
2488                info!("Skipping compaction: previous compaction still in progress");
2489            } else {
2490                let handle = tokio::spawn(async move {
2491                    am.compact();
2492                });
2493                *self.compaction_handle.write() = Some(handle);
2494            }
2495        }
2496
2497        // Post-flush: check if any indexes need rebuilding based on thresholds
2498        if let Some(ref rebuild_mgr) = self.index_rebuild_manager
2499            && self.config.index_rebuild.auto_rebuild_enabled
2500        {
2501            self.schedule_index_rebuilds_if_needed(&manifest, rebuild_mgr.clone());
2502        }
2503
2504        Ok(snapshot_id)
2505    }
2506
2507    /// Check rebuild thresholds and schedule background index rebuilds for
2508    /// labels that exceed growth or age limits. Marks affected indexes as
2509    /// `Stale` and spawns an async task to schedule the rebuild.
2510    fn schedule_index_rebuilds_if_needed(
2511        &self,
2512        manifest: &SnapshotManifest,
2513        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
2514    ) {
2515        let checker = crate::storage::index_rebuild::RebuildTriggerChecker::new(
2516            self.config.index_rebuild.clone(),
2517        );
2518        let schema = self.schema_manager.schema();
2519        let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
2520
2521        if labels.is_empty() {
2522            return;
2523        }
2524
2525        // Mark affected indexes as Stale
2526        for label in &labels {
2527            for idx in &schema.indexes {
2528                if idx.label() == label {
2529                    let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
2530                        m.status = uni_common::core::schema::IndexStatus::Stale;
2531                    });
2532                }
2533            }
2534        }
2535
2536        tokio::spawn(async move {
2537            if let Err(e) = rebuild_mgr.schedule(labels).await {
2538                tracing::warn!("Failed to schedule index rebuild: {e}");
2539            }
2540        });
2541    }
2542
2543    /// Set the property manager for cache invalidation.
2544    pub fn set_property_manager(&mut self, pm: Arc<PropertyManager>) {
2545        self.property_manager = Some(pm);
2546    }
2547}
2548
2549#[cfg(test)]
2550mod tests {
2551    use super::*;
2552    use tempfile::tempdir;
2553
2554    /// Test that commit_transaction writes mutations to WAL before merging to main L0.
2555    /// This verifies fix for issue #137 (transaction commit atomicity).
2556    #[tokio::test]
2557    async fn test_commit_transaction_wal_before_merge() -> Result<()> {
2558        use crate::runtime::wal::WriteAheadLog;
2559        use crate::storage::manager::StorageManager;
2560        use object_store::local::LocalFileSystem;
2561        use object_store::path::Path as ObjectStorePath;
2562        use uni_common::core::schema::SchemaManager;
2563
2564        let dir = tempdir()?;
2565        let path = dir.path().to_str().unwrap();
2566        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2567        let schema_path = ObjectStorePath::from("schema.json");
2568
2569        let schema_manager =
2570            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2571        let _label_id = schema_manager.add_label("Test")?;
2572        schema_manager.save().await?;
2573
2574        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2575
2576        // Create WAL for main L0
2577        let wal_path = ObjectStorePath::from("wal");
2578        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2579
2580        let mut writer = Writer::new_with_config(
2581            storage.clone(),
2582            schema_manager.clone(),
2583            1,
2584            UniConfig::default(),
2585            Some(wal),
2586            None,
2587        )
2588        .await?;
2589
2590        // Begin transaction
2591        writer.begin_transaction()?;
2592
2593        // Insert data in transaction
2594        let vid_a = writer.next_vid().await?;
2595        let vid_b = writer.next_vid().await?;
2596
2597        let mut props = std::collections::HashMap::new();
2598        props.insert("test".to_string(), Value::String("data".to_string()));
2599
2600        writer
2601            .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()])
2602            .await?;
2603        writer
2604            .insert_vertex_with_labels(
2605                vid_b,
2606                std::collections::HashMap::new(),
2607                &["Test".to_string()],
2608            )
2609            .await?;
2610
2611        let eid = writer.next_eid(1).await?;
2612        writer
2613            .insert_edge(vid_a, vid_b, 1, eid, std::collections::HashMap::new(), None)
2614            .await?;
2615
2616        // Get WAL before commit
2617        let l0 = writer.l0_manager.get_current();
2618        let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
2619        let mutations_before = wal.replay().await?;
2620        let count_before = mutations_before.len();
2621
2622        // Commit transaction - this should write to WAL first
2623        writer.commit_transaction().await?;
2624
2625        // Verify WAL has the new mutations
2626        let mutations_after = wal.replay().await?;
2627        assert!(
2628            mutations_after.len() > count_before,
2629            "WAL should contain transaction mutations after commit"
2630        );
2631
2632        // Verify mutations are in correct order: vertices first, then edges
2633        let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
2634
2635        let mut saw_vertex_a = false;
2636        let mut saw_vertex_b = false;
2637        let mut saw_edge = false;
2638
2639        for mutation in &new_mutations {
2640            match mutation {
2641                crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
2642                    if *vid == vid_a {
2643                        saw_vertex_a = true;
2644                    }
2645                    if *vid == vid_b {
2646                        saw_vertex_b = true;
2647                    }
2648                    // Vertices should come before edges
2649                    assert!(!saw_edge, "Vertices should be logged to WAL before edges");
2650                }
2651                crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
2652                    if *e == eid {
2653                        saw_edge = true;
2654                    }
2655                    // Edges should come after vertices
2656                    assert!(
2657                        saw_vertex_a && saw_vertex_b,
2658                        "Edge should be logged after both vertices"
2659                    );
2660                }
2661                _ => {}
2662            }
2663        }
2664
2665        assert!(saw_vertex_a, "Vertex A should be in WAL");
2666        assert!(saw_vertex_b, "Vertex B should be in WAL");
2667        assert!(saw_edge, "Edge should be in WAL");
2668
2669        // Verify data is also in main L0
2670        let l0_read = l0.read();
2671        assert!(
2672            l0_read.vertex_properties.contains_key(&vid_a),
2673            "Vertex A should be in main L0"
2674        );
2675        assert!(
2676            l0_read.vertex_properties.contains_key(&vid_b),
2677            "Vertex B should be in main L0"
2678        );
2679        assert!(
2680            l0_read.edge_endpoints.contains_key(&eid),
2681            "Edge should be in main L0"
2682        );
2683
2684        Ok(())
2685    }
2686
2687    /// Test that failed WAL flush leaves transaction intact for retry or rollback.
2688    #[tokio::test]
2689    async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
2690        use crate::runtime::wal::WriteAheadLog;
2691        use crate::storage::manager::StorageManager;
2692        use object_store::local::LocalFileSystem;
2693        use object_store::path::Path as ObjectStorePath;
2694        use uni_common::core::schema::SchemaManager;
2695
2696        let dir = tempdir()?;
2697        let path = dir.path().to_str().unwrap();
2698        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2699        let schema_path = ObjectStorePath::from("schema.json");
2700
2701        let schema_manager =
2702            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2703        let _label_id = schema_manager.add_label("Test")?;
2704        let _baseline_label_id = schema_manager.add_label("Baseline")?;
2705        let _txdata_label_id = schema_manager.add_label("TxData")?;
2706        schema_manager.save().await?;
2707
2708        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2709
2710        // Create WAL for main L0
2711        let wal_path = ObjectStorePath::from("wal");
2712        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2713
2714        let mut writer = Writer::new_with_config(
2715            storage.clone(),
2716            schema_manager.clone(),
2717            1,
2718            UniConfig::default(),
2719            Some(wal),
2720            None,
2721        )
2722        .await?;
2723
2724        // Insert baseline data (outside transaction)
2725        let baseline_vid = writer.next_vid().await?;
2726        writer
2727            .insert_vertex_with_labels(
2728                baseline_vid,
2729                [("baseline".to_string(), Value::Bool(true))]
2730                    .into_iter()
2731                    .collect(),
2732                &["Baseline".to_string()],
2733            )
2734            .await?;
2735
2736        // Begin transaction
2737        writer.begin_transaction()?;
2738
2739        // Insert data in transaction
2740        let tx_vid = writer.next_vid().await?;
2741        writer
2742            .insert_vertex_with_labels(
2743                tx_vid,
2744                [("tx_data".to_string(), Value::Bool(true))]
2745                    .into_iter()
2746                    .collect(),
2747                &["TxData".to_string()],
2748            )
2749            .await?;
2750
2751        // Capture main L0 state before rollback
2752        let l0 = writer.l0_manager.get_current();
2753        let vertex_count_before = l0.read().vertex_properties.len();
2754
2755        // Rollback transaction (simulating what would happen after WAL flush failure)
2756        writer.rollback_transaction()?;
2757
2758        // Verify main L0 is unchanged
2759        let vertex_count_after = l0.read().vertex_properties.len();
2760        assert_eq!(
2761            vertex_count_before, vertex_count_after,
2762            "Main L0 should not change after rollback"
2763        );
2764
2765        // Baseline should still be present
2766        assert!(
2767            l0.read().vertex_properties.contains_key(&baseline_vid),
2768            "Baseline data should remain"
2769        );
2770
2771        // Transaction data should NOT be in main L0
2772        assert!(
2773            !l0.read().vertex_properties.contains_key(&tx_vid),
2774            "Transaction data should not be in main L0 after rollback"
2775        );
2776
2777        Ok(())
2778    }
2779
2780    /// Test that batch insert with shared labels does not clone labels per vertex.
2781    /// This verifies fix for issue #161 (redundant label cloning).
2782    #[tokio::test]
2783    async fn test_batch_insert_shared_labels() -> Result<()> {
2784        use crate::storage::manager::StorageManager;
2785        use object_store::local::LocalFileSystem;
2786        use object_store::path::Path as ObjectStorePath;
2787        use uni_common::core::schema::SchemaManager;
2788
2789        let dir = tempdir()?;
2790        let path = dir.path().to_str().unwrap();
2791        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2792        let schema_path = ObjectStorePath::from("schema.json");
2793
2794        let schema_manager =
2795            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2796        let _label_id = schema_manager.add_label("Person")?;
2797        schema_manager.save().await?;
2798
2799        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2800
2801        let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2802
2803        // Shared labels - should not be cloned per vertex
2804        let labels = &["Person".to_string()];
2805
2806        // Insert batch of vertices with same labels
2807        let mut vids = Vec::new();
2808        for i in 0..100 {
2809            let vid = writer.next_vid().await?;
2810            let mut props = std::collections::HashMap::new();
2811            props.insert("id".to_string(), Value::Int(i));
2812            writer.insert_vertex_with_labels(vid, props, labels).await?;
2813            vids.push(vid);
2814        }
2815
2816        // Verify all vertices have the correct labels
2817        let l0 = writer.l0_manager.get_current();
2818        for vid in vids {
2819            let l0_guard = l0.read();
2820            let vertex_labels = l0_guard.vertex_labels.get(&vid);
2821            assert!(vertex_labels.is_some(), "Vertex should have labels");
2822            assert_eq!(
2823                vertex_labels.unwrap(),
2824                &vec!["Person".to_string()],
2825                "Labels should match"
2826            );
2827        }
2828
2829        Ok(())
2830    }
2831
2832    /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
2833    /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
2834    #[tokio::test]
2835    async fn test_estimated_size_tracks_mutations() -> Result<()> {
2836        use crate::storage::manager::StorageManager;
2837        use object_store::local::LocalFileSystem;
2838        use object_store::path::Path as ObjectStorePath;
2839        use uni_common::core::schema::SchemaManager;
2840
2841        let dir = tempdir()?;
2842        let path = dir.path().to_str().unwrap();
2843        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2844        let schema_path = ObjectStorePath::from("schema.json");
2845
2846        let schema_manager =
2847            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2848        let _label_id = schema_manager.add_label("Test")?;
2849        schema_manager.save().await?;
2850
2851        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2852
2853        let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2854
2855        let l0 = writer.l0_manager.get_current();
2856
2857        // Initial state should be empty
2858        let initial_estimated = l0.read().estimated_size;
2859        let initial_actual = l0.read().size_bytes();
2860        assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
2861        assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
2862
2863        // Insert vertices with properties
2864        let mut vids = Vec::new();
2865        for i in 0..10 {
2866            let vid = writer.next_vid().await?;
2867            let mut props = std::collections::HashMap::new();
2868            props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
2869            props.insert("index".to_string(), Value::Int(i));
2870            writer.insert_vertex_with_labels(vid, props, &[]).await?;
2871            vids.push(vid);
2872        }
2873
2874        // Verify estimated_size grew
2875        let after_vertices_estimated = l0.read().estimated_size;
2876        let after_vertices_actual = l0.read().size_bytes();
2877        assert!(
2878            after_vertices_estimated > 0,
2879            "estimated_size should grow after insertions"
2880        );
2881
2882        // Verify estimated_size is within reasonable bounds of actual size (within 2x)
2883        let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
2884        assert!(
2885            (0.5..=2.0).contains(&ratio),
2886            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2887            after_vertices_estimated,
2888            after_vertices_actual,
2889            ratio
2890        );
2891
2892        // Insert edges with a simple edge type
2893        let edge_type = 1u32;
2894        for i in 0..9 {
2895            let eid = writer.next_eid(edge_type).await?;
2896            writer
2897                .insert_edge(
2898                    vids[i],
2899                    vids[i + 1],
2900                    edge_type,
2901                    eid,
2902                    std::collections::HashMap::new(),
2903                    Some("NEXT".to_string()),
2904                )
2905                .await?;
2906        }
2907
2908        // Verify estimated_size grew further
2909        let after_edges_estimated = l0.read().estimated_size;
2910        let after_edges_actual = l0.read().size_bytes();
2911        assert!(
2912            after_edges_estimated > after_vertices_estimated,
2913            "estimated_size should grow after edge insertions"
2914        );
2915
2916        // Verify still within reasonable bounds
2917        let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
2918        assert!(
2919            (0.5..=2.0).contains(&ratio),
2920            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2921            after_edges_estimated,
2922            after_edges_actual,
2923            ratio
2924        );
2925
2926        Ok(())
2927    }
2928}