Skip to main content

uni_store/runtime/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::id_allocator::IdAllocator;
6use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
7use crate::runtime::l0_manager::L0Manager;
8use crate::runtime::property_manager::PropertyManager;
9use crate::runtime::wal::WriteAheadLog;
10use crate::storage::adjacency_manager::AdjacencyManager;
11use crate::storage::delta::{L1Entry, Op};
12use crate::storage::main_edge::MainEdgeDataset;
13use crate::storage::main_vertex::MainVertexDataset;
14use crate::storage::manager::StorageManager;
15use anyhow::{Result, anyhow};
16use chrono::Utc;
17use metrics;
18use parking_lot::RwLock;
19use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21use tracing::{debug, info, instrument};
22use uni_common::Properties;
23use uni_common::Value;
24use uni_common::config::UniConfig;
25use uni_common::core::id::{Eid, Vid};
26use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
27use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
28use uni_xervo::runtime::ModelRuntime;
29use uuid::Uuid;
30
31#[derive(Clone, Debug)]
32pub struct WriterConfig {
33    pub max_mutations: usize,
34}
35
36impl Default for WriterConfig {
37    fn default() -> Self {
38        Self {
39            max_mutations: 10_000,
40        }
41    }
42}
43
44pub struct Writer {
45    pub l0_manager: Arc<L0Manager>,
46    pub storage: Arc<StorageManager>,
47    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
48    pub allocator: Arc<IdAllocator>,
49    pub config: UniConfig,
50    pub xervo_runtime: Option<Arc<ModelRuntime>>,
51    /// Property manager for cache invalidation after flush
52    pub property_manager: Option<Arc<PropertyManager>>,
53    /// Adjacency manager for dual-write (edges survive flush).
54    adjacency_manager: Arc<AdjacencyManager>,
55    /// Timestamp of last flush or creation
56    last_flush_time: std::time::Instant,
57    /// Background compaction task handle (prevents concurrent compaction races)
58    compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
59    /// Optional index rebuild manager for post-flush automatic rebuild scheduling
60    index_rebuild_manager: Option<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
61}
62
63impl Writer {
64    pub async fn new(
65        storage: Arc<StorageManager>,
66        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
67        start_version: u64,
68    ) -> Result<Self> {
69        Self::new_with_config(
70            storage,
71            schema_manager,
72            start_version,
73            UniConfig::default(),
74            None,
75            None,
76        )
77        .await
78    }
79
80    pub async fn new_with_config(
81        storage: Arc<StorageManager>,
82        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
83        start_version: u64,
84        config: UniConfig,
85        wal: Option<Arc<WriteAheadLog>>,
86        allocator: Option<Arc<IdAllocator>>,
87    ) -> Result<Self> {
88        let allocator = if let Some(a) = allocator {
89            a
90        } else {
91            let store = storage.store();
92            let path = object_store::path::Path::from("id_allocator.json");
93            Arc::new(IdAllocator::new(store, path, 1000).await?)
94        };
95
96        let l0_manager = Arc::new(L0Manager::new(start_version, wal));
97
98        let property_manager = Some(Arc::new(PropertyManager::new(
99            storage.clone(),
100            schema_manager.clone(),
101            1000,
102        )));
103
104        let adjacency_manager = storage.adjacency_manager();
105
106        Ok(Self {
107            l0_manager,
108            storage,
109            schema_manager,
110            allocator,
111            config,
112            xervo_runtime: None,
113            property_manager,
114            adjacency_manager,
115            last_flush_time: std::time::Instant::now(),
116            compaction_handle: Arc::new(RwLock::new(None)),
117            index_rebuild_manager: None,
118        })
119    }
120
121    /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
122    pub fn set_index_rebuild_manager(
123        &mut self,
124        manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
125    ) {
126        self.index_rebuild_manager = Some(manager);
127    }
128
129    /// Replay WAL mutations into the current L0 buffer.
130    pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
131        let l0 = self.l0_manager.get_current();
132        let wal = l0.read().wal.clone();
133
134        if let Some(wal) = wal {
135            wal.initialize().await?;
136            let mutations = wal.replay_since(wal_high_water_mark).await?;
137            let count = mutations.len();
138
139            if count > 0 {
140                log::info!(
141                    "Replaying {} mutations from WAL (LSN > {})",
142                    count,
143                    wal_high_water_mark
144                );
145                let mut l0_guard = l0.write();
146                l0_guard.replay_mutations(mutations)?;
147            }
148
149            Ok(count)
150        } else {
151            Ok(0)
152        }
153    }
154
155    /// Allocates the next VID (pure auto-increment).
156    pub async fn next_vid(&self) -> Result<Vid> {
157        self.allocator.allocate_vid().await
158    }
159
160    /// Allocates multiple VIDs at once for bulk operations.
161    /// This is more efficient than calling next_vid() in a loop.
162    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
163        self.allocator.allocate_vids(count).await
164    }
165
166    /// Allocates the next EID (pure auto-increment).
167    pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
168        self.allocator.allocate_eid().await
169    }
170
171    pub fn set_xervo_runtime(&mut self, runtime: Arc<ModelRuntime>) {
172        self.xervo_runtime = Some(runtime);
173    }
174
175    pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
176        self.xervo_runtime.clone()
177    }
178
179    /// Create a new empty L0 buffer for transaction-scoped mutations.
180    ///
181    /// Only reads the current version — no exclusive lock required on Writer.
182    /// The returned buffer has no WAL reference; mutations are logged at
183    /// commit time via [`Self::commit_transaction_l0`].
184    pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
185        let current_version = self.l0_manager.get_current().read().current_version;
186        // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
187        Arc::new(RwLock::new(L0Buffer::new(current_version, None)))
188    }
189
190    /// Resolve the target L0 buffer for a mutation.
191    ///
192    /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
193    /// When `None`, it targets the global L0 from the manager.
194    fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
195        tx_l0
196            .cloned()
197            .unwrap_or_else(|| self.l0_manager.get_current())
198    }
199
200    fn update_metrics(&self) {
201        let l0 = self.l0_manager.get_current();
202        let size = l0.read().estimated_size;
203        metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
204    }
205
206    /// Commit an externally-owned transaction L0 buffer.
207    ///
208    /// Writes mutations to WAL, flushes, merges into main L0, and replays
209    /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
210    /// (0 when no WAL is configured).
211    pub async fn commit_transaction_l0(&mut self, tx_l0_arc: Arc<RwLock<L0Buffer>>) -> Result<u64> {
212        // 1. Write transaction mutations to WAL BEFORE merging into main L0
213        // This ensures durability before visibility.
214        {
215            let tx_l0 = tx_l0_arc.read();
216            let main_l0_arc = self.l0_manager.get_current();
217            let main_l0 = main_l0_arc.read();
218
219            // If WAL exists, write mutations to it for durability
220            if let Some(wal) = main_l0.wal.as_ref() {
221                // Order: vertices first, then edges (to ensure src/dst exist on replay)
222
223                // Vertex insertions
224                for (vid, properties) in &tx_l0.vertex_properties {
225                    if !tx_l0.vertex_tombstones.contains(vid) {
226                        let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
227                        wal.append(&crate::runtime::wal::Mutation::InsertVertex {
228                            vid: *vid,
229                            properties: properties.clone(),
230                            labels,
231                        })?;
232                    }
233                }
234
235                // Vertex deletions
236                for vid in &tx_l0.vertex_tombstones {
237                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
238                    wal.append(&crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
239                }
240
241                // Edge insertions and deletions from edge_endpoints
242                for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
243                    if tx_l0.tombstones.contains_key(eid) {
244                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
245                        wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
246                            eid: *eid,
247                            src_vid: *src_vid,
248                            dst_vid: *dst_vid,
249                            edge_type: *edge_type,
250                            version,
251                        })?;
252                    } else {
253                        let properties =
254                            tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
255                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
256                        let edge_type_name = tx_l0.edge_types.get(eid).cloned();
257                        wal.append(&crate::runtime::wal::Mutation::InsertEdge {
258                            src_vid: *src_vid,
259                            dst_vid: *dst_vid,
260                            edge_type: *edge_type,
261                            eid: *eid,
262                            version,
263                            properties,
264                            edge_type_name,
265                        })?;
266                    }
267                }
268
269                // Tombstones for edges that only exist in the global L0 (not in
270                // this transaction's edge_endpoints).  Without this, deletes of
271                // pre-existing edges would be silently lost.
272                for (eid, tombstone) in &tx_l0.tombstones {
273                    if !tx_l0.edge_endpoints.contains_key(eid) {
274                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
275                        wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
276                            eid: *eid,
277                            src_vid: tombstone.src_vid,
278                            dst_vid: tombstone.dst_vid,
279                            edge_type: tombstone.edge_type,
280                            version,
281                        })?;
282                    }
283                }
284            }
285        }
286
287        // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
288        let wal_lsn = self.flush_wal().await?;
289
290        // 3. Merge into main L0 and make visible
291        {
292            let tx_l0 = tx_l0_arc.read();
293            let main_l0_arc = self.l0_manager.get_current();
294            let mut main_l0 = main_l0_arc.write();
295            main_l0.merge(&tx_l0)?;
296
297            // Replay transaction edges into the AdjacencyManager overlay
298            for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
299                let edge_version = tx_l0
300                    .edge_versions
301                    .get(eid)
302                    .copied()
303                    .unwrap_or(main_l0.current_version);
304                if tx_l0.tombstones.contains_key(eid) {
305                    self.adjacency_manager
306                        .add_tombstone(*eid, *src, *dst, *etype, edge_version);
307                } else {
308                    self.adjacency_manager
309                        .insert_edge(*src, *dst, *eid, *etype, edge_version);
310                }
311            }
312
313            // Replay tombstones for edges that only exist in the global L0
314            // (not in this transaction's edge_endpoints).
315            for (eid, tombstone) in &tx_l0.tombstones {
316                if !tx_l0.edge_endpoints.contains_key(eid) {
317                    let edge_version = tx_l0
318                        .edge_versions
319                        .get(eid)
320                        .copied()
321                        .unwrap_or(main_l0.current_version);
322                    self.adjacency_manager.add_tombstone(
323                        *eid,
324                        tombstone.src_vid,
325                        tombstone.dst_vid,
326                        tombstone.edge_type,
327                        edge_version,
328                    );
329                }
330            }
331        }
332
333        self.update_metrics();
334
335        // 4. Best-effort compaction
336        if let Err(e) = self.check_flush().await {
337            tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
338        }
339
340        Ok(wal_lsn)
341    }
342
343    /// Flush the WAL buffer to durable storage.
344    ///
345    /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
346    pub async fn flush_wal(&self) -> Result<u64> {
347        let l0 = self.l0_manager.get_current();
348        let wal = l0.read().wal.clone();
349
350        match wal {
351            Some(wal) => Ok(wal.flush().await?),
352            None => Ok(0),
353        }
354    }
355
356    /// Record property removals in the active L0 mutation stats.
357    ///
358    /// Routes to the transaction L0 if provided, otherwise to the main L0.
359    pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
360        if count == 0 {
361            return;
362        }
363        let l0 = self.resolve_l0(tx_l0);
364        l0.write().mutation_stats.properties_removed += count;
365    }
366
367    /// Validates vertex constraints for the given properties.
368    /// In the new design, label is passed as a parameter since VID no longer embeds label.
369    async fn validate_vertex_constraints_for_label(
370        &self,
371        vid: Vid,
372        properties: &Properties,
373        label: &str,
374        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
375    ) -> Result<()> {
376        let schema = self.schema_manager.schema();
377
378        {
379            // 1. Check NOT NULL constraints (from Property definitions)
380            if let Some(props_meta) = schema.properties.get(label) {
381                for (prop_name, meta) in props_meta {
382                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
383                        log::warn!(
384                            "Constraint violation: Property '{}' cannot be null for label '{}'",
385                            prop_name,
386                            label
387                        );
388                        return Err(anyhow!(
389                            "Constraint violation: Property '{}' cannot be null",
390                            prop_name
391                        ));
392                    }
393                }
394            }
395
396            // 2. Check Explicit Constraints (Unique, Check, etc.)
397            for constraint in &schema.constraints {
398                if !constraint.enabled {
399                    continue;
400                }
401                match &constraint.target {
402                    ConstraintTarget::Label(l) if l == label => {}
403                    _ => continue,
404                }
405
406                match &constraint.constraint_type {
407                    ConstraintType::Unique {
408                        properties: unique_props,
409                    } => {
410                        // Support single and multi-property unique constraints
411                        if !unique_props.is_empty() {
412                            let mut key_values = Vec::new();
413                            let mut missing = false;
414                            for prop in unique_props {
415                                if let Some(val) = properties.get(prop) {
416                                    key_values.push((prop.clone(), val.clone()));
417                                } else {
418                                    missing = true; // Can't enforce if property missing (partial update?)
419                                    // For INSERT, missing means null?
420                                    // If property is nullable, unique constraint typically allows multiple nulls or ignores?
421                                    // For now, only check if ALL keys are present
422                                }
423                            }
424
425                            if !missing {
426                                self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
427                                    .await?;
428                            }
429                        }
430                    }
431                    ConstraintType::Exists { property } => {
432                        if properties.get(property).is_none_or(|v| v.is_null()) {
433                            log::warn!(
434                                "Constraint violation: Property '{}' must exist for label '{}'",
435                                property,
436                                label
437                            );
438                            return Err(anyhow!(
439                                "Constraint violation: Property '{}' must exist",
440                                property
441                            ));
442                        }
443                    }
444                    ConstraintType::Check { expression } => {
445                        if !self.evaluate_check_constraint(expression, properties)? {
446                            return Err(anyhow!(
447                                "CHECK constraint '{}' violated: expression '{}' evaluated to false",
448                                constraint.name,
449                                expression
450                            ));
451                        }
452                    }
453                    _ => {
454                        return Err(anyhow!("Unsupported constraint type"));
455                    }
456                }
457            }
458        }
459        Ok(())
460    }
461
462    /// Validates vertex constraints for a vertex with the given labels.
463    /// Labels must be passed explicitly since the vertex may not yet be in L0.
464    /// Unknown labels (not in schema) are skipped.
465    async fn validate_vertex_constraints(
466        &self,
467        vid: Vid,
468        properties: &Properties,
469        labels: &[String],
470        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
471    ) -> Result<()> {
472        let schema = self.schema_manager.schema();
473
474        // Validate constraints only for known labels
475        for label in labels {
476            // Skip unknown labels (schemaless support)
477            if schema.get_label_case_insensitive(label).is_none() {
478                continue;
479            }
480            self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
481                .await?;
482        }
483
484        // Check global ext_id uniqueness if ext_id is provided
485        if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
486            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
487        }
488
489        Ok(())
490    }
491
492    /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
493    ///
494    /// Used to build a constraint key index from L0 buffers for batch validation.
495    fn collect_constraint_keys_from_properties<'a>(
496        properties_iter: impl Iterator<Item = &'a Properties>,
497        label: &str,
498        constraints: &[uni_common::core::schema::Constraint],
499        existing_keys: &mut HashMap<String, HashSet<String>>,
500        existing_extids: &mut HashSet<String>,
501    ) {
502        for props in properties_iter {
503            if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
504                existing_extids.insert(ext_id.to_string());
505            }
506
507            for constraint in constraints {
508                if !constraint.enabled {
509                    continue;
510                }
511                if let ConstraintTarget::Label(l) = &constraint.target {
512                    if l != label {
513                        continue;
514                    }
515                } else {
516                    continue;
517                }
518
519                if let ConstraintType::Unique {
520                    properties: unique_props,
521                } = &constraint.constraint_type
522                {
523                    let mut key_parts = Vec::new();
524                    let mut all_present = true;
525                    for prop in unique_props {
526                        if let Some(val) = props.get(prop) {
527                            key_parts.push(format!("{}:{}", prop, val));
528                        } else {
529                            all_present = false;
530                            break;
531                        }
532                    }
533                    if all_present {
534                        let key = key_parts.join("|");
535                        existing_keys
536                            .entry(constraint.name.clone())
537                            .or_default()
538                            .insert(key);
539                    }
540                }
541            }
542        }
543    }
544
545    /// Validates constraints for a batch of vertices efficiently.
546    ///
547    /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
548    /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
549    ///
550    /// # Arguments
551    /// * `vids` - VIDs of vertices being inserted
552    /// * `properties_batch` - Properties for each vertex
553    /// * `label` - Label for all vertices (assumes single label for now)
554    ///
555    /// # Performance
556    /// For N vertices with unique constraints:
557    /// - Old approach: O(N²) - scan L0 buffer N times
558    /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
559    async fn validate_vertex_batch_constraints(
560        &self,
561        vids: &[Vid],
562        properties_batch: &[Properties],
563        label: &str,
564        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
565    ) -> Result<()> {
566        if vids.len() != properties_batch.len() {
567            return Err(anyhow!("VID/properties length mismatch"));
568        }
569
570        let schema = self.schema_manager.schema();
571
572        // 1. Validate NOT NULL constraints for each vertex
573        if let Some(props_meta) = schema.properties.get(label) {
574            for (idx, properties) in properties_batch.iter().enumerate() {
575                for (prop_name, meta) in props_meta {
576                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
577                        return Err(anyhow!(
578                            "Constraint violation at index {}: Property '{}' cannot be null",
579                            idx,
580                            prop_name
581                        ));
582                    }
583                }
584            }
585        }
586
587        // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
588        let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
589        let mut existing_extids: HashSet<String> = HashSet::new();
590
591        // Scan current L0 buffer
592        {
593            let l0 = self.l0_manager.get_current();
594            let l0_guard = l0.read();
595            Self::collect_constraint_keys_from_properties(
596                l0_guard.vertex_properties.values(),
597                label,
598                &schema.constraints,
599                &mut existing_keys,
600                &mut existing_extids,
601            );
602        }
603
604        // Scan transaction L0 if present
605        if let Some(tx_l0) = tx_l0 {
606            let tx_l0_guard = tx_l0.read();
607            Self::collect_constraint_keys_from_properties(
608                tx_l0_guard.vertex_properties.values(),
609                label,
610                &schema.constraints,
611                &mut existing_keys,
612                &mut existing_extids,
613            );
614        }
615
616        // 3. Check batch vertices against index AND check for duplicates within batch
617        let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
618        let mut batch_extids: HashMap<String, usize> = HashMap::new();
619
620        for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
621            // Check ext_id uniqueness
622            if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
623                if existing_extids.contains(ext_id) {
624                    return Err(anyhow!(
625                        "Constraint violation at index {}: ext_id '{}' already exists",
626                        idx,
627                        ext_id
628                    ));
629                }
630                if let Some(first_idx) = batch_extids.get(ext_id) {
631                    return Err(anyhow!(
632                        "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
633                        ext_id,
634                        first_idx,
635                        idx
636                    ));
637                }
638                batch_extids.insert(ext_id.to_string(), idx);
639            }
640
641            // Check unique constraints
642            for constraint in &schema.constraints {
643                if !constraint.enabled {
644                    continue;
645                }
646                if let ConstraintTarget::Label(l) = &constraint.target {
647                    if l != label {
648                        continue;
649                    }
650                } else {
651                    continue;
652                }
653
654                match &constraint.constraint_type {
655                    ConstraintType::Unique {
656                        properties: unique_props,
657                    } => {
658                        let mut key_parts = Vec::new();
659                        let mut all_present = true;
660                        for prop in unique_props {
661                            if let Some(val) = properties.get(prop) {
662                                key_parts.push(format!("{}:{}", prop, val));
663                            } else {
664                                all_present = false;
665                                break;
666                            }
667                        }
668
669                        if all_present {
670                            let key = key_parts.join("|");
671
672                            // Check against existing L0 keys
673                            if let Some(keys) = existing_keys.get(&constraint.name)
674                                && keys.contains(&key)
675                            {
676                                return Err(anyhow!(
677                                    "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
678                                    idx,
679                                    label,
680                                    constraint.name
681                                ));
682                            }
683
684                            // Check for duplicates within batch
685                            let batch_constraint_keys =
686                                batch_keys.entry(constraint.name.clone()).or_default();
687                            if let Some(first_idx) = batch_constraint_keys.get(&key) {
688                                return Err(anyhow!(
689                                    "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
690                                    key,
691                                    first_idx,
692                                    idx
693                                ));
694                            }
695                            batch_constraint_keys.insert(key, idx);
696                        }
697                    }
698                    ConstraintType::Exists { property } => {
699                        if properties.get(property).is_none_or(|v| v.is_null()) {
700                            return Err(anyhow!(
701                                "Constraint violation at index {}: Property '{}' must exist",
702                                idx,
703                                property
704                            ));
705                        }
706                    }
707                    ConstraintType::Check { expression } => {
708                        if !self.evaluate_check_constraint(expression, properties)? {
709                            return Err(anyhow!(
710                                "Constraint violation at index {}: CHECK constraint '{}' violated",
711                                idx,
712                                constraint.name
713                            ));
714                        }
715                    }
716                    _ => {}
717                }
718            }
719        }
720
721        // 4. Check storage for unique constraints (can batch this into a single query)
722        for constraint in &schema.constraints {
723            if !constraint.enabled {
724                continue;
725            }
726            if let ConstraintTarget::Label(l) = &constraint.target {
727                if l != label {
728                    continue;
729                }
730            } else {
731                continue;
732            }
733
734            if let ConstraintType::Unique {
735                properties: unique_props,
736            } = &constraint.constraint_type
737            {
738                // Build compound OR filter for all batch vertices
739                let mut or_filters = Vec::new();
740                for properties in properties_batch.iter() {
741                    let mut and_parts = Vec::new();
742                    let mut all_present = true;
743                    for prop in unique_props {
744                        if let Some(val) = properties.get(prop) {
745                            let val_str = match val {
746                                Value::String(s) => format!("'{}'", s.replace('\'', "''")),
747                                Value::Int(n) => n.to_string(),
748                                Value::Float(f) => f.to_string(),
749                                Value::Bool(b) => b.to_string(),
750                                _ => {
751                                    all_present = false;
752                                    break;
753                                }
754                            };
755                            and_parts.push(format!("{} = {}", prop, val_str));
756                        } else {
757                            all_present = false;
758                            break;
759                        }
760                    }
761                    if all_present {
762                        or_filters.push(format!("({})", and_parts.join(" AND ")));
763                    }
764                }
765
766                #[cfg(feature = "lance-backend")]
767                if !or_filters.is_empty() {
768                    let vid_list: Vec<String> =
769                        vids.iter().map(|v| v.as_u64().to_string()).collect();
770                    let filter = format!(
771                        "({}) AND _deleted = false AND _vid NOT IN ({})",
772                        or_filters.join(" OR "),
773                        vid_list.join(", ")
774                    );
775
776                    if let Ok(ds) = self.storage.vertex_dataset(label)
777                        && let Ok(lance_ds) = ds.open_raw().await
778                    {
779                        let count = lance_ds.count_rows(Some(filter.clone())).await?;
780                        if count > 0 {
781                            return Err(anyhow!(
782                                "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
783                                label,
784                                constraint.name
785                            ));
786                        }
787                    }
788                }
789            }
790        }
791
792        Ok(())
793    }
794
795    /// Checks that ext_id is globally unique across all vertices.
796    ///
797    /// Searches L0 buffers (current, transaction, pending) and the main vertices table
798    /// to ensure no other vertex uses this ext_id.
799    ///
800    /// # Errors
801    ///
802    /// Returns error if another vertex with the same ext_id exists.
803    async fn check_extid_globally_unique(
804        &self,
805        ext_id: &str,
806        current_vid: Vid,
807        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
808    ) -> Result<()> {
809        // Check L0 buffers: current, transaction, and pending flush
810        let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
811            let mut buffers = vec![self.l0_manager.get_current()];
812            if let Some(tx_l0) = tx_l0 {
813                buffers.push(tx_l0.clone());
814            }
815            buffers.extend(self.l0_manager.get_pending_flush());
816            buffers
817        };
818
819        for l0 in &l0_buffers_to_check {
820            if let Some(vid) =
821                Self::find_extid_in_properties(&l0.read().vertex_properties, ext_id, current_vid)
822            {
823                return Err(anyhow!(
824                    "Constraint violation: ext_id '{}' already exists (vertex {:?})",
825                    ext_id,
826                    vid
827                ));
828            }
829        }
830
831        // Check main vertices table (if it exists)
832        // Pass None for global uniqueness check (not snapshot-isolated)
833        let backend = self.storage.backend();
834        if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
835            && found_vid != current_vid
836        {
837            return Err(anyhow!(
838                "Constraint violation: ext_id '{}' already exists (vertex {:?})",
839                ext_id,
840                found_vid
841            ));
842        }
843
844        Ok(())
845    }
846
847    /// Search vertex properties for a duplicate ext_id, excluding `current_vid`.
848    fn find_extid_in_properties(
849        vertex_properties: &HashMap<Vid, Properties>,
850        ext_id: &str,
851        current_vid: Vid,
852    ) -> Option<Vid> {
853        vertex_properties.iter().find_map(|(&vid, props)| {
854            if vid != current_vid && props.get("ext_id").and_then(|v| v.as_str()) == Some(ext_id) {
855                Some(vid)
856            } else {
857                None
858            }
859        })
860    }
861
862    /// Helper to get vertex labels from L0 buffer.
863    fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
864        let l0 = self.l0_manager.get_current();
865        let l0_guard = l0.read();
866        // Check if vertex is tombstoned (deleted) - if so, return None
867        if l0_guard.vertex_tombstones.contains(&vid) {
868            return None;
869        }
870        l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
871    }
872
873    /// Get vertex labels from all sources: current L0, pending L0s, and storage.
874    /// This is the proper way to read vertex labels after a flush, as it checks both
875    /// in-memory buffers and persisted storage.
876    pub async fn get_vertex_labels(
877        &self,
878        vid: Vid,
879        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
880    ) -> Option<Vec<String>> {
881        // 1. Check current L0
882        if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
883            return Some(labels);
884        }
885
886        // 2. Check transaction L0 if present
887        if let Some(tx_l0) = tx_l0 {
888            let guard = tx_l0.read();
889            if guard.vertex_tombstones.contains(&vid) {
890                return None;
891            }
892            if let Some(labels) = guard.get_vertex_labels(vid) {
893                return Some(labels.to_vec());
894            }
895        }
896
897        // 3. Check pending flush L0s
898        for pending_l0 in self.l0_manager.get_pending_flush() {
899            let guard = pending_l0.read();
900            if guard.vertex_tombstones.contains(&vid) {
901                return None;
902            }
903            if let Some(labels) = guard.get_vertex_labels(vid) {
904                return Some(labels.to_vec());
905            }
906        }
907
908        // 4. Check storage
909        self.find_vertex_labels_in_storage(vid).await.ok().flatten()
910    }
911
912    /// Helper to get edge type from L0 buffer.
913    fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
914        let l0 = self.l0_manager.get_current();
915        let l0_guard = l0.read();
916        l0_guard.get_edge_type(eid).map(|s| s.to_string())
917    }
918
919    /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
920    /// Falls back to the transaction L0 if available.
921    pub fn get_edge_type_id_from_l0(
922        &self,
923        eid: Eid,
924        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
925    ) -> Option<u32> {
926        // Check transaction L0 first
927        if let Some(tx_l0) = tx_l0 {
928            let guard = tx_l0.read();
929            if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
930                return Some(etype);
931            }
932        }
933        // Fall back to main L0
934        let l0 = self.l0_manager.get_current();
935        let l0_guard = l0.read();
936        l0_guard
937            .get_edge_endpoint_full(eid)
938            .map(|(_, _, etype)| etype)
939    }
940
941    /// Set the type name for an edge (used for schemaless edge types).
942    /// This is called during CREATE for edge types not found in the schema.
943    pub fn set_edge_type(
944        &self,
945        eid: Eid,
946        type_name: String,
947        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
948    ) {
949        self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
950    }
951
952    /// Evaluate a simple CHECK constraint expression.
953    /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
954    fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
955        let parts: Vec<&str> = expression.split_whitespace().collect();
956        if parts.len() != 3 {
957            // For now, only support "prop op val"
958            // Fallback to true if too complex to avoid breaking, but warn
959            log::warn!(
960                "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
961                expression
962            );
963            return Ok(true);
964        }
965
966        let prop_part = parts[0].trim_start_matches('(');
967        // Handle "variable.property" format - take the part after the dot
968        let prop_name = if let Some(idx) = prop_part.find('.') {
969            &prop_part[idx + 1..]
970        } else {
971            prop_part
972        };
973
974        let op = parts[1];
975        let val_str = parts[2].trim_end_matches(')');
976
977        let prop_val = match properties.get(prop_name) {
978            Some(v) => v,
979            None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
980        };
981
982        // Parse value string (handle quotes for strings)
983        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
984            || (val_str.starts_with('"') && val_str.ends_with('"'))
985        {
986            Value::String(val_str[1..val_str.len() - 1].to_string())
987        } else if let Ok(n) = val_str.parse::<i64>() {
988            Value::Int(n)
989        } else if let Ok(n) = val_str.parse::<f64>() {
990            Value::Float(n)
991        } else if let Ok(b) = val_str.parse::<bool>() {
992            Value::Bool(b)
993        } else {
994            // Check for internal format wrappers if they somehow leaked through
995            if val_str.starts_with("Number(") && val_str.ends_with(')') {
996                let n_str = &val_str[7..val_str.len() - 1];
997                if let Ok(n) = n_str.parse::<i64>() {
998                    Value::Int(n)
999                } else if let Ok(n) = n_str.parse::<f64>() {
1000                    Value::Float(n)
1001                } else {
1002                    Value::String(val_str.to_string())
1003                }
1004            } else {
1005                Value::String(val_str.to_string())
1006            }
1007        };
1008
1009        match op {
1010            "=" | "==" => Ok(prop_val == &target_val),
1011            "!=" | "<>" => Ok(prop_val != &target_val),
1012            ">" => self
1013                .compare_values(prop_val, &target_val)
1014                .map(|o| o.is_gt()),
1015            "<" => self
1016                .compare_values(prop_val, &target_val)
1017                .map(|o| o.is_lt()),
1018            ">=" => self
1019                .compare_values(prop_val, &target_val)
1020                .map(|o| o.is_ge()),
1021            "<=" => self
1022                .compare_values(prop_val, &target_val)
1023                .map(|o| o.is_le()),
1024            _ => {
1025                log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1026                Ok(true)
1027            }
1028        }
1029    }
1030
1031    fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
1032        use std::cmp::Ordering;
1033
1034        fn cmp_f64(x: f64, y: f64) -> Ordering {
1035            x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1036        }
1037
1038        match (a, b) {
1039            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1040            (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1041            (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1042            (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1043            (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1044            _ => Err(anyhow!(
1045                "Cannot compare incompatible types: {:?} vs {:?}",
1046                a,
1047                b
1048            )),
1049        }
1050    }
1051
1052    async fn check_unique_constraint_multi(
1053        &self,
1054        label: &str,
1055        key_values: &[(String, Value)],
1056        current_vid: Vid,
1057        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1058    ) -> Result<()> {
1059        // Serialize constraint key once for O(1) lookups
1060        let key = serialize_constraint_key(label, key_values);
1061
1062        // 1. Check L0 (in-memory) using O(1) constraint index
1063        {
1064            let l0 = self.l0_manager.get_current();
1065            let l0_guard = l0.read();
1066            if l0_guard.has_constraint_key(&key, current_vid) {
1067                return Err(anyhow!(
1068                    "Constraint violation: Duplicate composite key for label '{}'",
1069                    label
1070                ));
1071            }
1072        }
1073
1074        // Check Transaction L0
1075        if let Some(tx_l0) = tx_l0 {
1076            let tx_l0_guard = tx_l0.read();
1077            if tx_l0_guard.has_constraint_key(&key, current_vid) {
1078                return Err(anyhow!(
1079                    "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1080                    label
1081                ));
1082            }
1083        }
1084
1085        // 2. Check Storage (L1/L2)
1086        let filters: Vec<String> = key_values
1087            .iter()
1088            .map(|(prop, val)| {
1089                let val_str = match val {
1090                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1091                    Value::Int(n) => n.to_string(),
1092                    Value::Float(f) => f.to_string(),
1093                    Value::Bool(b) => b.to_string(),
1094                    _ => "NULL".to_string(),
1095                };
1096                format!("{} = {}", prop, val_str)
1097            })
1098            .collect();
1099
1100        let mut filter = filters.join(" AND ");
1101        filter.push_str(&format!(
1102            " AND _deleted = false AND _vid != {}",
1103            current_vid.as_u64()
1104        ));
1105
1106        #[cfg(feature = "lance-backend")]
1107        if let Ok(ds) = self.storage.vertex_dataset(label)
1108            && let Ok(lance_ds) = ds.open_raw().await
1109        {
1110            let count = lance_ds.count_rows(Some(filter.clone())).await?;
1111            if count > 0 {
1112                return Err(anyhow!(
1113                    "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
1114                    label,
1115                    filter
1116                ));
1117            }
1118        }
1119
1120        Ok(())
1121    }
1122
1123    async fn check_write_pressure(&self) -> Result<()> {
1124        let status = self
1125            .storage
1126            .compaction_status()
1127            .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
1128        let l1_runs = status.l1_runs;
1129        let throttle = &self.config.throttle;
1130
1131        if l1_runs >= throttle.hard_limit {
1132            log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
1133            // Simple polling for now
1134            while self
1135                .storage
1136                .compaction_status()
1137                .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
1138                .l1_runs
1139                >= throttle.hard_limit
1140            {
1141                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1142            }
1143        } else if l1_runs >= throttle.soft_limit {
1144            let excess = l1_runs - throttle.soft_limit;
1145            // Cap multiplier to avoid overflow
1146            let excess = std::cmp::min(excess, 31);
1147            let multiplier = 2_u32.pow(excess as u32);
1148            let delay = throttle.base_delay * multiplier;
1149            tokio::time::sleep(delay).await;
1150        }
1151        Ok(())
1152    }
1153
1154    /// Check transaction memory limit to prevent OOM.
1155    /// No-op when no transaction is active.
1156    fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
1157        if let Some(tx_l0) = tx_l0 {
1158            let size = tx_l0.read().estimated_size;
1159            if size > self.config.max_transaction_memory {
1160                return Err(anyhow!(
1161                    "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
1162                     Roll back or commit the current transaction.",
1163                    size,
1164                    self.config.max_transaction_memory
1165                ));
1166            }
1167        }
1168        Ok(())
1169    }
1170
1171    async fn get_query_context(
1172        &self,
1173        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1174    ) -> Option<QueryContext> {
1175        Some(QueryContext::new_with_pending(
1176            self.l0_manager.get_current(),
1177            tx_l0.cloned(),
1178            self.l0_manager.get_pending_flush(),
1179        ))
1180    }
1181
1182    /// Prepare a vertex for upsert by merging CRDT properties with existing values.
1183    ///
1184    /// When `label` is provided, uses it directly to look up property metadata.
1185    /// Otherwise falls back to discovering the label from L0 buffers and storage.
1186    ///
1187    /// # Errors
1188    ///
1189    /// Returns an error if CRDT property merging fails.
1190    async fn prepare_vertex_upsert(
1191        &self,
1192        vid: Vid,
1193        properties: &mut Properties,
1194        label: Option<&str>,
1195        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1196    ) -> Result<()> {
1197        let Some(pm) = &self.property_manager else {
1198            return Ok(());
1199        };
1200
1201        let schema = self.schema_manager.schema();
1202
1203        // Resolve label: use provided label or discover from L0/storage
1204        let discovered_labels;
1205        let label_name = if let Some(l) = label {
1206            Some(l)
1207        } else {
1208            discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
1209            discovered_labels
1210                .as_ref()
1211                .and_then(|l| l.first().map(|s| s.as_str()))
1212        };
1213
1214        let Some(label_str) = label_name else {
1215            return Ok(());
1216        };
1217        let Some(props_meta) = schema.properties.get(label_str) else {
1218            return Ok(());
1219        };
1220
1221        // Identify CRDT properties in the insert data
1222        let crdt_keys: Vec<String> = properties
1223            .keys()
1224            .filter(|key| {
1225                props_meta.get(*key).is_some_and(|meta| {
1226                    matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1227                })
1228            })
1229            .cloned()
1230            .collect();
1231
1232        if crdt_keys.is_empty() {
1233            return Ok(());
1234        }
1235
1236        let ctx = self.get_query_context(tx_l0).await;
1237        for key in crdt_keys {
1238            let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
1239            if !existing.is_null()
1240                && let Some(val) = properties.get_mut(&key)
1241            {
1242                *val = pm.merge_crdt_values(&existing, val)?;
1243            }
1244        }
1245
1246        Ok(())
1247    }
1248
1249    async fn prepare_edge_upsert(
1250        &self,
1251        eid: Eid,
1252        properties: &mut Properties,
1253        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1254    ) -> Result<()> {
1255        if let Some(pm) = &self.property_manager {
1256            let schema = self.schema_manager.schema();
1257            // Get edge type from L0 buffer instead of from EID
1258            let type_name = self.get_edge_type_from_l0(eid);
1259
1260            if let Some(ref t_name) = type_name
1261                && let Some(props_meta) = schema.properties.get(t_name)
1262            {
1263                let mut crdt_keys = Vec::new();
1264                for (key, _) in properties.iter() {
1265                    if let Some(meta) = props_meta.get(key)
1266                        && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1267                    {
1268                        crdt_keys.push(key.clone());
1269                    }
1270                }
1271
1272                if !crdt_keys.is_empty() {
1273                    let ctx = self.get_query_context(tx_l0).await;
1274                    for key in crdt_keys {
1275                        let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
1276
1277                        if !existing.is_null()
1278                            && let Some(val) = properties.get_mut(&key)
1279                        {
1280                            *val = pm.merge_crdt_values(&existing, val)?;
1281                        }
1282                    }
1283                }
1284            }
1285        }
1286        Ok(())
1287    }
1288
1289    #[instrument(skip(self, properties), level = "trace")]
1290    pub async fn insert_vertex(
1291        &mut self,
1292        vid: Vid,
1293        properties: Properties,
1294        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1295    ) -> Result<()> {
1296        self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
1297            .await?;
1298        Ok(())
1299    }
1300
1301    #[instrument(skip(self, properties, labels), level = "trace")]
1302    pub async fn insert_vertex_with_labels(
1303        &mut self,
1304        vid: Vid,
1305        mut properties: Properties,
1306        labels: &[String],
1307        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1308    ) -> Result<Properties> {
1309        let start = std::time::Instant::now();
1310        self.check_write_pressure().await?;
1311        self.check_transaction_memory(tx_l0)?;
1312        self.process_embeddings_for_labels(labels, &mut properties)
1313            .await?;
1314        self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
1315            .await?;
1316        self.prepare_vertex_upsert(
1317            vid,
1318            &mut properties,
1319            labels.first().map(|s| s.as_str()),
1320            tx_l0,
1321        )
1322        .await?;
1323
1324        // Clone properties and labels before moving into L0 to return them and populate constraint index
1325        let properties_copy = properties.clone();
1326        let labels_copy = labels.to_vec();
1327
1328        {
1329            let l0 = self.resolve_l0(tx_l0);
1330            let mut l0_guard = l0.write();
1331            l0_guard.insert_vertex_with_labels(vid, properties, labels);
1332
1333            // Populate constraint index for O(1) duplicate detection
1334            let schema = self.schema_manager.schema();
1335            for label in &labels_copy {
1336                // Skip unknown labels (schemaless support)
1337                if schema.get_label_case_insensitive(label).is_none() {
1338                    continue;
1339                }
1340
1341                // For each unique constraint on this label, insert into constraint index
1342                for constraint in &schema.constraints {
1343                    if !constraint.enabled {
1344                        continue;
1345                    }
1346                    if let ConstraintTarget::Label(l) = &constraint.target {
1347                        if l != label {
1348                            continue;
1349                        }
1350                    } else {
1351                        continue;
1352                    }
1353
1354                    if let ConstraintType::Unique {
1355                        properties: unique_props,
1356                    } = &constraint.constraint_type
1357                    {
1358                        let mut key_values = Vec::new();
1359                        let mut all_present = true;
1360                        for prop in unique_props {
1361                            if let Some(val) = properties_copy.get(prop) {
1362                                key_values.push((prop.clone(), val.clone()));
1363                            } else {
1364                                all_present = false;
1365                                break;
1366                            }
1367                        }
1368
1369                        if all_present {
1370                            let key = serialize_constraint_key(label, &key_values);
1371                            l0_guard.insert_constraint_key(key, vid);
1372                        }
1373                    }
1374                }
1375            }
1376        }
1377
1378        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1379        self.update_metrics();
1380
1381        if tx_l0.is_none() {
1382            self.check_flush().await?;
1383        }
1384        if start.elapsed().as_millis() > 100 {
1385            log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
1386        }
1387        Ok(properties_copy)
1388    }
1389
1390    /// Insert multiple vertices with batched operations.
1391    ///
1392    /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
1393    /// for bulk inserts with unique constraints.
1394    ///
1395    /// # Performance Improvements
1396    /// - Batch VID allocation: 1 call instead of N calls
1397    /// - Batch constraint validation: O(N) instead of O(N²)
1398    /// - Batch embedding generation: 1 API call per config instead of N calls
1399    /// - Transaction wrapping: Automatic flush deferral, atomicity
1400    ///
1401    /// # Arguments
1402    /// * `vids` - Pre-allocated VIDs for the vertices
1403    /// * `properties_batch` - Properties for each vertex
1404    /// * `labels` - Labels for all vertices (assumes single label for simplicity)
1405    ///
1406    /// # Errors
1407    /// Returns error if:
1408    /// - VID/properties length mismatch
1409    /// - Constraint violation detected
1410    /// - Embedding generation fails
1411    /// - Transaction commit fails
1412    ///
1413    /// # Atomicity
1414    /// If this method fails, all changes are rolled back (if transaction was started here).
1415    pub async fn insert_vertices_batch(
1416        &mut self,
1417        vids: Vec<Vid>,
1418        mut properties_batch: Vec<Properties>,
1419        labels: Vec<String>,
1420        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1421    ) -> Result<Vec<Properties>> {
1422        let start = std::time::Instant::now();
1423
1424        // Validate inputs
1425        if vids.len() != properties_batch.len() {
1426            return Err(anyhow!(
1427                "VID/properties size mismatch: {} vids, {} properties",
1428                vids.len(),
1429                properties_batch.len()
1430            ));
1431        }
1432
1433        if vids.is_empty() {
1434            return Ok(Vec::new());
1435        }
1436
1437        // Batch operations — writes go directly to the resolved L0.
1438        // Atomicity is guaranteed by the caller holding the writer lock.
1439        let result = async {
1440            self.check_write_pressure().await?;
1441            self.check_transaction_memory(tx_l0)?;
1442
1443            // Batch embedding generation (1 API call per config)
1444            self.process_embeddings_for_batch(&labels, &mut properties_batch)
1445                .await?;
1446
1447            // Batch constraint validation (O(N) instead of O(N²))
1448            let label = labels
1449                .first()
1450                .ok_or_else(|| anyhow!("No labels provided"))?;
1451            self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
1452                .await?;
1453
1454            // Batch prepare (CRDT merging if needed)
1455            // Check schema once: skip entirely if no CRDT properties for this label.
1456            // For new vertices (freshly allocated VIDs), there are no existing CRDT
1457            // values to merge, so the per-vertex lookup is unnecessary in that case.
1458            let has_crdt_fields = {
1459                let schema = self.schema_manager.schema();
1460                schema
1461                    .properties
1462                    .get(label.as_str())
1463                    .is_some_and(|props_meta| {
1464                        props_meta.values().any(|meta| {
1465                            matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1466                        })
1467                    })
1468            };
1469
1470            if has_crdt_fields {
1471                // Batch fetch existing CRDT values: collect VIDs that need merging,
1472                // then query once via PropertyManager instead of per-vertex lookups.
1473                let schema = self.schema_manager.schema();
1474                let crdt_keys: Vec<String> = schema
1475                    .properties
1476                    .get(label.as_str())
1477                    .map(|props_meta| {
1478                        props_meta
1479                            .iter()
1480                            .filter(|(_, meta)| {
1481                                matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1482                            })
1483                            .map(|(key, _)| key.clone())
1484                            .collect()
1485                    })
1486                    .unwrap_or_default();
1487
1488                if let Some(pm) = &self.property_manager {
1489                    let ctx = self.get_query_context(tx_l0).await;
1490                    for (vid, props) in vids.iter().zip(&mut properties_batch) {
1491                        for key in &crdt_keys {
1492                            if props.contains_key(key) {
1493                                let existing =
1494                                    pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
1495                                if !existing.is_null()
1496                                    && let Some(val) = props.get_mut(key)
1497                                {
1498                                    *val = pm.merge_crdt_values(&existing, val)?;
1499                                }
1500                            }
1501                        }
1502                    }
1503                }
1504            }
1505
1506            // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
1507            let target_l0 = self.resolve_l0(tx_l0);
1508
1509            let properties_result = properties_batch.clone();
1510            {
1511                let mut l0_guard = target_l0.write();
1512                for (vid, props) in vids.iter().zip(properties_batch.iter()) {
1513                    l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
1514                }
1515            }
1516
1517            // Update metrics (batch increment)
1518            metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
1519            self.update_metrics();
1520
1521            Ok::<Vec<Properties>, anyhow::Error>(properties_result)
1522        }
1523        .await;
1524
1525        let props = result?;
1526
1527        if start.elapsed().as_millis() > 100 {
1528            log::warn!(
1529                "Slow insert_vertices_batch ({} vertices): {}ms",
1530                vids.len(),
1531                start.elapsed().as_millis()
1532            );
1533        }
1534
1535        Ok(props)
1536    }
1537
1538    /// Delete a vertex by VID.
1539    ///
1540    /// When `labels` is provided, uses them directly to populate L0 for
1541    /// correct tombstone flushing. Otherwise discovers labels from L0
1542    /// buffers and storage (which can be slow for many vertices).
1543    ///
1544    /// # Errors
1545    ///
1546    /// Returns an error if write pressure stalls, label lookup fails, or
1547    /// the L0 delete operation fails.
1548    #[instrument(skip(self, labels), level = "trace")]
1549    pub async fn delete_vertex(
1550        &mut self,
1551        vid: Vid,
1552        labels: Option<Vec<String>>,
1553        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1554    ) -> Result<()> {
1555        let start = std::time::Instant::now();
1556        self.check_write_pressure().await?;
1557        self.check_transaction_memory(tx_l0)?;
1558        let l0 = self.resolve_l0(tx_l0);
1559
1560        // Before deleting, ensure we have the vertex's labels stored in L0
1561        // so the tombstone can be properly flushed to the correct label datasets.
1562        let has_labels = {
1563            let l0_guard = l0.read();
1564            l0_guard.vertex_labels.contains_key(&vid)
1565        };
1566
1567        if !has_labels {
1568            let resolved_labels = if let Some(provided) = labels {
1569                // Caller provided labels — skip the lookup entirely
1570                Some(provided)
1571            } else {
1572                // Discover labels from pending flush L0s, then storage
1573                let mut found = None;
1574                for pending_l0 in self.l0_manager.get_pending_flush() {
1575                    let pending_guard = pending_l0.read();
1576                    if let Some(l) = pending_guard.get_vertex_labels(vid) {
1577                        found = Some(l.to_vec());
1578                        break;
1579                    }
1580                }
1581                if found.is_none() {
1582                    found = self.find_vertex_labels_in_storage(vid).await?;
1583                }
1584                found
1585            };
1586
1587            if let Some(found_labels) = resolved_labels {
1588                let mut l0_guard = l0.write();
1589                l0_guard.vertex_labels.insert(vid, found_labels);
1590            }
1591        }
1592
1593        l0.write().delete_vertex(vid)?;
1594        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1595        self.update_metrics();
1596
1597        if tx_l0.is_none() {
1598            self.check_flush().await?;
1599        }
1600        if start.elapsed().as_millis() > 100 {
1601            log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
1602        }
1603        Ok(())
1604    }
1605
1606    /// Find vertex labels from storage by querying the main vertices table.
1607    /// Returns the labels from the latest non-deleted version of the vertex.
1608    async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1609        use crate::backend::types::ScanRequest;
1610        use arrow_array::Array;
1611        use arrow_array::cast::AsArray;
1612
1613        let backend = self.storage.backend();
1614        let table_name = MainVertexDataset::table_name();
1615
1616        // Check if table exists first; if not, vertex hasn't been flushed to storage yet
1617        if !backend.table_exists(table_name).await? {
1618            return Ok(None);
1619        }
1620
1621        // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
1622        let filter = format!("_vid = {}", vid.as_u64());
1623        let batches = backend
1624            .scan(
1625                ScanRequest::all(table_name)
1626                    .with_filter(filter)
1627                    .with_columns(vec![
1628                        "_vid".to_string(),
1629                        "labels".to_string(),
1630                        "_version".to_string(),
1631                        "_deleted".to_string(),
1632                    ]),
1633            )
1634            .await
1635            .unwrap_or_default();
1636
1637        // Find the row with the highest version number
1638        let mut max_version: Option<u64> = None;
1639        let mut labels: Option<Vec<String>> = None;
1640        let mut is_deleted = false;
1641
1642        for batch in batches {
1643            if batch.num_rows() == 0 {
1644                continue;
1645            }
1646
1647            let version_array = batch
1648                .column_by_name("_version")
1649                .unwrap()
1650                .as_primitive::<arrow_array::types::UInt64Type>();
1651
1652            let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
1653
1654            let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
1655
1656            for row_idx in 0..batch.num_rows() {
1657                let version = version_array.value(row_idx);
1658
1659                if max_version.is_none_or(|mv| version > mv) {
1660                    is_deleted = deleted_array.value(row_idx);
1661
1662                    let labels_list = labels_array.value(row_idx);
1663                    let string_array = labels_list.as_string::<i32>();
1664                    let vertex_labels: Vec<String> = (0..string_array.len())
1665                        .filter(|&i| !string_array.is_null(i))
1666                        .map(|i| string_array.value(i).to_string())
1667                        .collect();
1668
1669                    max_version = Some(version);
1670                    labels = Some(vertex_labels);
1671                }
1672            }
1673        }
1674
1675        // If the latest version is deleted, return None
1676        if is_deleted { Ok(None) } else { Ok(labels) }
1677    }
1678
1679    #[expect(clippy::too_many_arguments)]
1680    #[instrument(skip(self, properties), level = "trace")]
1681    pub async fn insert_edge(
1682        &mut self,
1683        src_vid: Vid,
1684        dst_vid: Vid,
1685        edge_type: u32,
1686        eid: Eid,
1687        mut properties: Properties,
1688        edge_type_name: Option<String>,
1689        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1690    ) -> Result<()> {
1691        let start = std::time::Instant::now();
1692        self.check_write_pressure().await?;
1693        self.check_transaction_memory(tx_l0)?;
1694        self.prepare_edge_upsert(eid, &mut properties, tx_l0)
1695            .await?;
1696
1697        let l0 = self.resolve_l0(tx_l0);
1698        l0.write()
1699            .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
1700
1701        // Dual-write to AdjacencyManager overlay (survives flush).
1702        // Skip for transaction-local L0 -- transaction edges are overlaid separately.
1703        if tx_l0.is_none() {
1704            let version = l0.read().current_version;
1705            self.adjacency_manager
1706                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
1707        }
1708
1709        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1710        self.update_metrics();
1711
1712        if tx_l0.is_none() {
1713            self.check_flush().await?;
1714        }
1715        if start.elapsed().as_millis() > 100 {
1716            log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
1717        }
1718        Ok(())
1719    }
1720
1721    #[instrument(skip(self), level = "trace")]
1722    pub async fn delete_edge(
1723        &mut self,
1724        eid: Eid,
1725        src_vid: Vid,
1726        dst_vid: Vid,
1727        edge_type: u32,
1728        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1729    ) -> Result<()> {
1730        let start = std::time::Instant::now();
1731        self.check_write_pressure().await?;
1732        self.check_transaction_memory(tx_l0)?;
1733        let l0 = self.resolve_l0(tx_l0);
1734
1735        l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
1736
1737        // Dual-write tombstone to AdjacencyManager overlay.
1738        if tx_l0.is_none() {
1739            let version = l0.read().current_version;
1740            self.adjacency_manager
1741                .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
1742        }
1743        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1744        self.update_metrics();
1745
1746        if tx_l0.is_none() {
1747            self.check_flush().await?;
1748        }
1749        if start.elapsed().as_millis() > 100 {
1750            log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
1751        }
1752        Ok(())
1753    }
1754
1755    /// Check if flush should be triggered based on mutation count or time elapsed.
1756    /// This method is called after each write operation and can also be called
1757    /// by a background task for time-based flushing.
1758    pub async fn check_flush(&mut self) -> Result<()> {
1759        let count = self.l0_manager.get_current().read().mutation_count;
1760
1761        // Skip if no mutations
1762        if count == 0 {
1763            return Ok(());
1764        }
1765
1766        // Flush on mutation count threshold (10,000 default)
1767        if count >= self.config.auto_flush_threshold {
1768            self.flush_to_l1(None).await?;
1769            return Ok(());
1770        }
1771
1772        // Flush on time interval IF minimum mutations met
1773        if let Some(interval) = self.config.auto_flush_interval
1774            && self.last_flush_time.elapsed() >= interval
1775            && count >= self.config.auto_flush_min_mutations
1776        {
1777            self.flush_to_l1(None).await?;
1778        }
1779
1780        Ok(())
1781    }
1782
1783    /// Process embeddings for a vertex using labels passed directly.
1784    /// Use this when labels haven't been stored to L0 yet.
1785    async fn process_embeddings_for_labels(
1786        &self,
1787        labels: &[String],
1788        properties: &mut Properties,
1789    ) -> Result<()> {
1790        let label_name = labels.first().map(|s| s.as_str());
1791        self.process_embeddings_impl(label_name, properties).await
1792    }
1793
1794    /// Process embeddings for a batch of vertices efficiently.
1795    ///
1796    /// Groups vertices by embedding config and makes batched API calls to the
1797    /// embedding service instead of calling once per vertex.
1798    ///
1799    /// # Performance
1800    /// For N vertices with embedding config:
1801    /// - Old approach: N API calls to embedding service
1802    /// - New approach: 1 API call per embedding config (usually 1 total)
1803    async fn process_embeddings_for_batch(
1804        &self,
1805        labels: &[String],
1806        properties_batch: &mut [Properties],
1807    ) -> Result<()> {
1808        let label_name = labels.first().map(|s| s.as_str());
1809        let schema = self.schema_manager.schema();
1810
1811        if let Some(label) = label_name {
1812            // Find vector indexes with embedding config for this label
1813            let mut configs = Vec::new();
1814            for idx in &schema.indexes {
1815                if let IndexDefinition::Vector(v_config) = idx
1816                    && v_config.label == label
1817                    && let Some(emb_config) = &v_config.embedding_config
1818                {
1819                    configs.push((v_config.property.clone(), emb_config.clone()));
1820                }
1821            }
1822
1823            if configs.is_empty() {
1824                return Ok(());
1825            }
1826
1827            for (target_prop, emb_config) in configs {
1828                // Collect input texts from all vertices that need embeddings
1829                let mut input_texts: Vec<String> = Vec::new();
1830                let mut needs_embedding: Vec<usize> = Vec::new();
1831
1832                for (idx, properties) in properties_batch.iter().enumerate() {
1833                    // Skip if target property already exists
1834                    if properties.contains_key(&target_prop) {
1835                        continue;
1836                    }
1837
1838                    // Check if source properties exist
1839                    let mut inputs = Vec::new();
1840                    for src_prop in &emb_config.source_properties {
1841                        if let Some(val) = properties.get(src_prop)
1842                            && let Some(s) = val.as_str()
1843                        {
1844                            inputs.push(s.to_string());
1845                        }
1846                    }
1847
1848                    if !inputs.is_empty() {
1849                        let input_text = inputs.join(" ");
1850                        input_texts.push(input_text);
1851                        needs_embedding.push(idx);
1852                    }
1853                }
1854
1855                if input_texts.is_empty() {
1856                    continue;
1857                }
1858
1859                let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1860                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1861                })?;
1862                let embedder = runtime.embedding(&emb_config.alias).await?;
1863
1864                // Batch generate embeddings (single API call)
1865                let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
1866                let embeddings = embedder.embed(input_refs).await?;
1867
1868                // Distribute results back to properties
1869                for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
1870                    if let Some(vec) = embeddings.get(embedding_idx) {
1871                        let vals: Vec<Value> =
1872                            vec.iter().map(|f| Value::Float(*f as f64)).collect();
1873                        properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
1874                    }
1875                }
1876            }
1877        }
1878
1879        Ok(())
1880    }
1881
1882    async fn process_embeddings_impl(
1883        &self,
1884        label_name: Option<&str>,
1885        properties: &mut Properties,
1886    ) -> Result<()> {
1887        let schema = self.schema_manager.schema();
1888
1889        if let Some(label) = label_name {
1890            // Find vector indexes with embedding config for this label
1891            let mut configs = Vec::new();
1892            for idx in &schema.indexes {
1893                if let IndexDefinition::Vector(v_config) = idx
1894                    && v_config.label == label
1895                    && let Some(emb_config) = &v_config.embedding_config
1896                {
1897                    configs.push((v_config.property.clone(), emb_config.clone()));
1898                }
1899            }
1900
1901            if configs.is_empty() {
1902                log::info!("No embedding config found for label {}", label);
1903            }
1904
1905            for (target_prop, emb_config) in configs {
1906                // If target property already exists, skip (assume user provided it)
1907                if properties.contains_key(&target_prop) {
1908                    continue;
1909                }
1910
1911                // Check if source properties exist
1912                let mut inputs = Vec::new();
1913                for src_prop in &emb_config.source_properties {
1914                    if let Some(val) = properties.get(src_prop)
1915                        && let Some(s) = val.as_str()
1916                    {
1917                        inputs.push(s.to_string());
1918                    }
1919                }
1920
1921                if inputs.is_empty() {
1922                    continue;
1923                }
1924
1925                let input_text = inputs.join(" "); // Simple concatenation
1926
1927                let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1928                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1929                })?;
1930                let embedder = runtime.embedding(&emb_config.alias).await?;
1931
1932                // Generate
1933                let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
1934                if let Some(vec) = embeddings.first() {
1935                    // Store as array of floats
1936                    let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
1937                    properties.insert(target_prop.clone(), Value::List(vals));
1938                }
1939            }
1940        }
1941        Ok(())
1942    }
1943
1944    /// Flushes the current in-memory L0 buffer to L1 storage.
1945    ///
1946    /// # Lock Ordering
1947    ///
1948    /// To prevent deadlocks, locks must be acquired in the following order:
1949    /// 1. `Writer` lock (held by caller)
1950    /// 2. `L0Manager` lock (via `begin_flush` / `get_current`)
1951    /// 3. `L0Buffer` lock (individual buffer RWLocks)
1952    /// 4. `Index` / `Storage` locks (during actual flush)
1953    #[instrument(
1954        skip(self),
1955        fields(snapshot_id, mutations_count, size_bytes),
1956        level = "info"
1957    )]
1958    pub async fn flush_to_l1(&mut self, name: Option<String>) -> Result<String> {
1959        let start = std::time::Instant::now();
1960        let schema = self.schema_manager.schema();
1961
1962        let (initial_size, initial_count) = {
1963            let l0_arc = self.l0_manager.get_current();
1964            let l0 = l0_arc.read();
1965            (l0.estimated_size, l0.mutation_count)
1966        };
1967        tracing::Span::current().record("size_bytes", initial_size);
1968        tracing::Span::current().record("mutations_count", initial_count);
1969
1970        debug!("Starting L0 flush to L1");
1971
1972        // 1. Flush WAL BEFORE rotating L0
1973        // This ensures that if WAL flush fails, the current L0 is still active
1974        // and mutations are retained in memory until restart/retry.
1975        // Capture the LSN of the flushed segment for the snapshot's wal_high_water_mark.
1976        let wal_for_truncate = {
1977            let current_l0 = self.l0_manager.get_current();
1978            let l0_guard = current_l0.read();
1979            l0_guard.wal.clone()
1980        };
1981
1982        let wal_lsn = if let Some(ref w) = wal_for_truncate {
1983            w.flush().await?
1984        } else {
1985            0
1986        };
1987
1988        // 2. Begin flush: rotate L0 and keep old L0 visible to reads
1989        // The old L0 stays in pending_flush list until complete_flush is called,
1990        // ensuring data remains visible even if L1 writes fail.
1991        let old_l0_arc = self.l0_manager.begin_flush(0, None);
1992        metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
1993
1994        let current_version;
1995        {
1996            // Acquire Write lock to take WAL and version
1997            let mut old_l0_guard = old_l0_arc.write();
1998            current_version = old_l0_guard.current_version;
1999
2000            // Record the WAL LSN for this L0 so we don't truncate past it
2001            // if this flush fails and a subsequent flush succeeds.
2002            old_l0_guard.wal_lsn_at_flush = wal_lsn;
2003
2004            let wal = old_l0_guard.wal.take();
2005
2006            // Give WAL to new L0
2007            let new_l0_arc = self.l0_manager.get_current();
2008            let mut new_l0_guard = new_l0_arc.write();
2009            new_l0_guard.wal = wal;
2010            new_l0_guard.current_version = current_version;
2011        } // Drop locks
2012
2013        // 2. Acquire Read lock on Old L0 for flushing
2014        let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
2015        // (Vid, labels, properties, deleted, version)
2016        type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
2017        let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
2018        // Collect vertex timestamps from L0 for flushing to storage
2019        let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
2020        let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
2021        // Track tombstones missing labels for storage query fallback
2022        let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
2023
2024        {
2025            let old_l0 = old_l0_arc.read();
2026
2027            // 1. Collect all edges and tombstones from L0
2028            for edge in old_l0.graph.edges() {
2029                let properties = old_l0
2030                    .edge_properties
2031                    .get(&edge.eid)
2032                    .cloned()
2033                    .unwrap_or_default();
2034                let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
2035
2036                // Get timestamps from L0 buffer (populated during insert)
2037                let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
2038                let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
2039
2040                entries_by_type
2041                    .entry(edge.edge_type)
2042                    .or_default()
2043                    .push(L1Entry {
2044                        src_vid: edge.src_vid,
2045                        dst_vid: edge.dst_vid,
2046                        eid: edge.eid,
2047                        op: Op::Insert,
2048                        version,
2049                        properties,
2050                        created_at,
2051                        updated_at,
2052                    });
2053            }
2054
2055            // From tombstones
2056            for tombstone in old_l0.tombstones.values() {
2057                let version = old_l0
2058                    .edge_versions
2059                    .get(&tombstone.eid)
2060                    .copied()
2061                    .unwrap_or(0);
2062                // Get timestamps - for deletes, updated_at reflects deletion time
2063                let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
2064                let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
2065
2066                entries_by_type
2067                    .entry(tombstone.edge_type)
2068                    .or_default()
2069                    .push(L1Entry {
2070                        src_vid: tombstone.src_vid,
2071                        dst_vid: tombstone.dst_vid,
2072                        eid: tombstone.eid,
2073                        op: Op::Delete,
2074                        version,
2075                        properties: HashMap::new(),
2076                        created_at,
2077                        updated_at,
2078                    });
2079            }
2080
2081            // 2.5 Flush Vertices - Collect by label (using vertex_labels from L0)
2082            //
2083            // Helper: fan-out a single vertex entry into per-label buckets.
2084            // Each per-label table row carries the full label set so multi-label
2085            // info is preserved after flush.
2086            let push_vertex_to_labels =
2087                |vid: Vid,
2088                 all_labels: &[String],
2089                 props: Properties,
2090                 deleted: bool,
2091                 version: u64,
2092                 out: &mut HashMap<u16, Vec<VertexEntry>>| {
2093                    for label in all_labels {
2094                        if let Some(label_id) = schema.label_id_by_name(label) {
2095                            out.entry(label_id).or_default().push((
2096                                vid,
2097                                all_labels.to_vec(),
2098                                props.clone(),
2099                                deleted,
2100                                version,
2101                            ));
2102                        }
2103                    }
2104                };
2105
2106            for (vid, props) in &old_l0.vertex_properties {
2107                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2108                // Collect timestamps for this vertex
2109                if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
2110                    vertex_created_at.insert(*vid, ts);
2111                }
2112                if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
2113                    vertex_updated_at.insert(*vid, ts);
2114                }
2115                if let Some(labels) = old_l0.vertex_labels.get(vid) {
2116                    push_vertex_to_labels(
2117                        *vid,
2118                        labels,
2119                        props.clone(),
2120                        false,
2121                        version,
2122                        &mut vertices_by_label,
2123                    );
2124                }
2125            }
2126            for &vid in &old_l0.vertex_tombstones {
2127                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2128                if let Some(labels) = old_l0.vertex_labels.get(&vid) {
2129                    push_vertex_to_labels(
2130                        vid,
2131                        labels,
2132                        HashMap::new(),
2133                        true,
2134                        version,
2135                        &mut vertices_by_label,
2136                    );
2137                } else {
2138                    // Tombstone missing labels (old WAL format) - collect for storage query fallback
2139                    orphaned_tombstones.push((vid, version));
2140                }
2141            }
2142        } // Drop read lock
2143
2144        // Resolve orphaned tombstones (missing labels) from storage
2145        if !orphaned_tombstones.is_empty() {
2146            tracing::warn!(
2147                count = orphaned_tombstones.len(),
2148                "Tombstones missing labels in L0, querying storage as fallback"
2149            );
2150            for (vid, version) in orphaned_tombstones {
2151                if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
2152                    && !labels.is_empty()
2153                {
2154                    for label in &labels {
2155                        if let Some(label_id) = schema.label_id_by_name(label) {
2156                            vertices_by_label.entry(label_id).or_default().push((
2157                                vid,
2158                                labels.clone(),
2159                                HashMap::new(),
2160                                true,
2161                                version,
2162                            ));
2163                        }
2164                    }
2165                }
2166            }
2167        }
2168
2169        // 0. Load previous snapshot or create new
2170        let mut manifest = self
2171            .storage
2172            .snapshot_manager()
2173            .load_latest_snapshot()
2174            .await?
2175            .unwrap_or_else(|| {
2176                SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
2177            });
2178
2179        // Update snapshot metadata
2180        // Save parent snapshot ID before generating new one (for lineage tracking)
2181        let parent_id = manifest.snapshot_id.clone();
2182        manifest.parent_snapshot = Some(parent_id);
2183        manifest.snapshot_id = Uuid::new_v4().to_string();
2184        manifest.name = name;
2185        manifest.created_at = Utc::now();
2186        manifest.version_high_water_mark = current_version;
2187        manifest.wal_high_water_mark = wal_lsn;
2188        let snapshot_id = manifest.snapshot_id.clone();
2189
2190        tracing::Span::current().record("snapshot_id", &snapshot_id);
2191
2192        // 2. For each edge type, write FWD and BWD runs
2193        for (&edge_type_id, entries) in entries_by_type.iter() {
2194            // Get edge type name from unified lookup (handles both schema'd and schemaless)
2195            let edge_type_name = self
2196                .storage
2197                .schema_manager()
2198                .edge_type_name_by_id_unified(edge_type_id)
2199                .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
2200
2201            // FWD Run (sorted by src_vid)
2202            let mut fwd_entries = entries.clone();
2203            fwd_entries.sort_by_key(|e| e.src_vid);
2204            let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
2205            let fwd_batch = fwd_ds.build_record_batch(&fwd_entries, &schema)?;
2206
2207            // Write using backend
2208            let backend = self.storage.backend();
2209            fwd_ds.write_run(backend, fwd_batch).await?;
2210            fwd_ds.ensure_eid_index(backend).await?;
2211
2212            // BWD Run (sorted by dst_vid)
2213            let mut bwd_entries = entries.clone();
2214            bwd_entries.sort_by_key(|e| e.dst_vid);
2215            let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
2216            let bwd_batch = bwd_ds.build_record_batch(&bwd_entries, &schema)?;
2217
2218            let backend = self.storage.backend();
2219            bwd_ds.write_run(backend, bwd_batch).await?;
2220            bwd_ds.ensure_eid_index(backend).await?;
2221
2222            // Update Manifest
2223            let current_snap =
2224                manifest
2225                    .edges
2226                    .entry(edge_type_name.to_string())
2227                    .or_insert(EdgeSnapshot {
2228                        version: 0,
2229                        count: 0,
2230                        lance_version: 0,
2231                    });
2232            current_snap.version += 1;
2233            current_snap.count += entries.len() as u64;
2234            // LanceDB tables don't expose Lance version directly
2235            current_snap.lance_version = 0;
2236
2237            // Note: No CSR invalidation needed. AdjacencyManager's overlay
2238            // already has these edges via dual-write in insert_edge/delete_edge.
2239        }
2240
2241        // 2.5 Flush Vertices
2242        for (label_id, vertices) in vertices_by_label {
2243            let label_name = schema
2244                .label_name_by_id(label_id)
2245                .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
2246
2247            let ds = self.storage.vertex_dataset(label_name)?;
2248
2249            // Collect inverted index updates before consuming vertices
2250            // Maps: cfg.property -> (added, removed)
2251            type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
2252            let mut inverted_updates: InvertedUpdateMap = HashMap::new();
2253
2254            for idx in &schema.indexes {
2255                if let IndexDefinition::Inverted(cfg) = idx
2256                    && cfg.label == label_name
2257                {
2258                    let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
2259                    let mut removed: HashSet<Vid> = HashSet::new();
2260
2261                    for (vid, _labels, props, deleted, _version) in &vertices {
2262                        if *deleted {
2263                            removed.insert(*vid);
2264                        } else if let Some(prop_value) = props.get(&cfg.property) {
2265                            // Extract terms from the property value (List<String>)
2266                            if let Some(arr) = prop_value.as_array() {
2267                                let terms: Vec<String> = arr
2268                                    .iter()
2269                                    .filter_map(|v| v.as_str().map(ToString::to_string))
2270                                    .collect();
2271                                if !terms.is_empty() {
2272                                    added.insert(*vid, terms);
2273                                }
2274                            }
2275                        }
2276                    }
2277
2278                    if !added.is_empty() || !removed.is_empty() {
2279                        inverted_updates.insert(cfg.property.clone(), (added, removed));
2280                    }
2281                }
2282            }
2283
2284            let mut v_data = Vec::new();
2285            let mut d_data = Vec::new();
2286            let mut ver_data = Vec::new();
2287            for (vid, labels, props, deleted, version) in vertices {
2288                v_data.push((vid, labels, props));
2289                d_data.push(deleted);
2290                ver_data.push(version);
2291            }
2292
2293            let batch = ds.build_record_batch_with_timestamps(
2294                &v_data,
2295                &d_data,
2296                &ver_data,
2297                &schema,
2298                Some(&vertex_created_at),
2299                Some(&vertex_updated_at),
2300            )?;
2301
2302            // Write using backend
2303            let backend = self.storage.backend();
2304            ds.write_batch(backend, batch, &schema).await?;
2305            ds.ensure_default_indexes(backend).await?;
2306
2307            // Update VidLabelsIndex (if enabled)
2308            for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
2309                if deleted {
2310                    self.storage.remove_from_vid_labels_index(*vid);
2311                } else {
2312                    self.storage.update_vid_labels_index(*vid, labels.clone());
2313                }
2314            }
2315
2316            // Update Manifest
2317            let current_snap =
2318                manifest
2319                    .vertices
2320                    .entry(label_name.to_string())
2321                    .or_insert(LabelSnapshot {
2322                        version: 0,
2323                        count: 0,
2324                        lance_version: 0,
2325                    });
2326            current_snap.version += 1;
2327            current_snap.count += v_data.len() as u64;
2328            // LanceDB tables don't expose Lance version directly
2329            current_snap.lance_version = 0;
2330
2331            // Invalidate table cache to ensure next read picks up new version
2332            self.storage.invalidate_table_cache(label_name);
2333
2334            // Apply inverted index updates incrementally
2335            #[cfg(feature = "lance-backend")]
2336            for idx in &schema.indexes {
2337                if let IndexDefinition::Inverted(cfg) = idx
2338                    && cfg.label == label_name
2339                    && let Some((added, removed)) = inverted_updates.get(&cfg.property)
2340                {
2341                    self.storage
2342                        .index_manager()
2343                        .update_inverted_index_incremental(cfg, added, removed)
2344                        .await?;
2345                }
2346            }
2347
2348            // Update UID index with new vertex mappings
2349            // Collect (UniId, Vid) mappings from non-deleted vertices
2350            #[cfg(feature = "lance-backend")]
2351            {
2352                let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
2353                for (vid, _labels, props) in &v_data {
2354                    let ext_id = props.get("ext_id").and_then(|v| v.as_str());
2355                    let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
2356                        label_name, ext_id, props,
2357                    );
2358                    uid_mappings.push((uid, *vid));
2359                }
2360
2361                if !uid_mappings.is_empty()
2362                    && let Ok(uid_index) = self.storage.uid_index(label_name)
2363                {
2364                    uid_index.write_mapping(&uid_mappings).await?;
2365                }
2366            }
2367        }
2368
2369        // 3. Write to main unified tables (dual-write for fast ID-based lookups)
2370        // 3.1 Write to main edges table
2371        // Collect data while holding the lock, then release before async operations
2372        let (main_edges, edge_created_at_map, edge_updated_at_map) = {
2373            let _old_l0 = old_l0_arc.read();
2374            let mut main_edges: Vec<(
2375                uni_common::core::id::Eid,
2376                Vid,
2377                Vid,
2378                String,
2379                Properties,
2380                bool,
2381                u64,
2382            )> = Vec::new();
2383            let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2384            let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2385
2386            for (&edge_type_id, entries) in entries_by_type.iter() {
2387                for entry in entries {
2388                    // Get edge type name from unified lookup (handles both schema'd and schemaless)
2389                    let edge_type_name = self
2390                        .storage
2391                        .schema_manager()
2392                        .edge_type_name_by_id_unified(edge_type_id)
2393                        .unwrap_or_else(|| "unknown".to_string());
2394
2395                    let deleted = matches!(entry.op, Op::Delete);
2396                    main_edges.push((
2397                        entry.eid,
2398                        entry.src_vid,
2399                        entry.dst_vid,
2400                        edge_type_name,
2401                        entry.properties.clone(),
2402                        deleted,
2403                        entry.version,
2404                    ));
2405
2406                    if let Some(ts) = entry.created_at {
2407                        edge_created_at_map.insert(entry.eid, ts);
2408                    }
2409                    if let Some(ts) = entry.updated_at {
2410                        edge_updated_at_map.insert(entry.eid, ts);
2411                    }
2412                }
2413            }
2414
2415            (main_edges, edge_created_at_map, edge_updated_at_map)
2416        }; // Lock released here
2417
2418        if !main_edges.is_empty() {
2419            let main_edge_batch = MainEdgeDataset::build_record_batch(
2420                &main_edges,
2421                Some(&edge_created_at_map),
2422                Some(&edge_updated_at_map),
2423            )?;
2424            MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
2425            MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
2426        }
2427
2428        // 3.2 Write to main vertices table
2429        // Collect data while holding the lock, then release before async operations
2430        let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
2431            let old_l0 = old_l0_arc.read();
2432            let mut vertices = Vec::new();
2433
2434            // Collect all vertices from vertex_properties
2435            for (vid, props) in &old_l0.vertex_properties {
2436                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2437                let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
2438                vertices.push((*vid, labels, props.clone(), false, version));
2439            }
2440
2441            // Collect tombstones
2442            for &vid in &old_l0.vertex_tombstones {
2443                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2444                let labels = old_l0.vertex_labels.get(&vid).cloned().unwrap_or_default();
2445                vertices.push((vid, labels, HashMap::new(), true, version));
2446            }
2447
2448            vertices
2449        }; // Lock released here
2450
2451        if !main_vertices.is_empty() {
2452            let main_vertex_batch = MainVertexDataset::build_record_batch(
2453                &main_vertices,
2454                Some(&vertex_created_at),
2455                Some(&vertex_updated_at),
2456            )?;
2457            MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
2458            MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
2459        }
2460
2461        // Save Snapshot
2462        self.storage
2463            .snapshot_manager()
2464            .save_snapshot(&manifest)
2465            .await?;
2466        self.storage
2467            .snapshot_manager()
2468            .set_latest_snapshot(&manifest.snapshot_id)
2469            .await?;
2470
2471        // Complete flush: remove old L0 from pending list now that L1 writes succeeded.
2472        // This must happen BEFORE WAL truncation so min_pending_wal_lsn is accurate.
2473        self.l0_manager.complete_flush(&old_l0_arc);
2474
2475        // Truncate WAL segments, but only up to the minimum LSN of any remaining pending L0s.
2476        // This prevents data loss if earlier flushes failed and left L0s in pending_flush.
2477        if let Some(w) = wal_for_truncate {
2478            // Determine safe truncation point: the minimum of our LSN and any pending L0s
2479            let safe_lsn = self
2480                .l0_manager
2481                .min_pending_wal_lsn()
2482                .map(|min_pending| min_pending.min(wal_lsn))
2483                .unwrap_or(wal_lsn);
2484            w.truncate_before(safe_lsn).await?;
2485        }
2486
2487        // Invalidate property cache after flush to prevent stale reads.
2488        // Once L0 data moves to storage, cached values from storage may be outdated.
2489        if let Some(ref pm) = self.property_manager {
2490            pm.clear_cache().await;
2491        }
2492
2493        // Reset last flush time for time-based auto-flush
2494        self.last_flush_time = std::time::Instant::now();
2495
2496        info!(
2497            snapshot_id,
2498            mutations_count = initial_count,
2499            size_bytes = initial_size,
2500            "L0 flush to L1 completed successfully"
2501        );
2502        metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
2503        metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
2504        metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
2505
2506        // Trigger CSR compaction if enough frozen segments have accumulated.
2507        // After flush, the old L0 data is now in L1; the overlay segments can be merged
2508        // into the Main CSR to reduce lookup overhead.
2509        let am = self.adjacency_manager.clone();
2510        if am.should_compact(4) {
2511            let previous_still_running = {
2512                let guard = self.compaction_handle.read();
2513                guard.as_ref().is_some_and(|h| !h.is_finished())
2514            };
2515
2516            if previous_still_running {
2517                info!("Skipping compaction: previous compaction still in progress");
2518            } else {
2519                let handle = tokio::spawn(async move {
2520                    am.compact();
2521                });
2522                *self.compaction_handle.write() = Some(handle);
2523            }
2524        }
2525
2526        // Post-flush: check if any indexes need rebuilding based on thresholds
2527        if let Some(ref rebuild_mgr) = self.index_rebuild_manager
2528            && self.config.index_rebuild.auto_rebuild_enabled
2529        {
2530            self.schedule_index_rebuilds_if_needed(&manifest, rebuild_mgr.clone());
2531        }
2532
2533        Ok(snapshot_id)
2534    }
2535
2536    /// Check rebuild thresholds and schedule background index rebuilds for
2537    /// labels that exceed growth or age limits. Marks affected indexes as
2538    /// `Stale` and spawns an async task to schedule the rebuild.
2539    fn schedule_index_rebuilds_if_needed(
2540        &self,
2541        manifest: &SnapshotManifest,
2542        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
2543    ) {
2544        let checker = crate::storage::index_rebuild::RebuildTriggerChecker::new(
2545            self.config.index_rebuild.clone(),
2546        );
2547        let schema = self.schema_manager.schema();
2548        let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
2549
2550        if labels.is_empty() {
2551            return;
2552        }
2553
2554        // Mark affected indexes as Stale
2555        for label in &labels {
2556            for idx in &schema.indexes {
2557                if idx.label() == label {
2558                    let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
2559                        m.status = uni_common::core::schema::IndexStatus::Stale;
2560                    });
2561                }
2562            }
2563        }
2564
2565        tokio::spawn(async move {
2566            if let Err(e) = rebuild_mgr.schedule(labels).await {
2567                tracing::warn!("Failed to schedule index rebuild: {e}");
2568            }
2569        });
2570    }
2571
2572    /// Set the property manager for cache invalidation.
2573    pub fn set_property_manager(&mut self, pm: Arc<PropertyManager>) {
2574        self.property_manager = Some(pm);
2575    }
2576}
2577
2578#[cfg(test)]
2579mod tests {
2580    use super::*;
2581    use tempfile::tempdir;
2582
2583    /// Test that commit_transaction writes mutations to WAL before merging to main L0.
2584    /// This verifies fix for issue #137 (transaction commit atomicity).
2585    #[tokio::test]
2586    async fn test_commit_transaction_wal_before_merge() -> Result<()> {
2587        use crate::runtime::wal::WriteAheadLog;
2588        use crate::storage::manager::StorageManager;
2589        use object_store::local::LocalFileSystem;
2590        use object_store::path::Path as ObjectStorePath;
2591        use uni_common::core::schema::SchemaManager;
2592
2593        let dir = tempdir()?;
2594        let path = dir.path().to_str().unwrap();
2595        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2596        let schema_path = ObjectStorePath::from("schema.json");
2597
2598        let schema_manager =
2599            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2600        let _label_id = schema_manager.add_label("Test")?;
2601        schema_manager.save().await?;
2602
2603        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2604
2605        // Create WAL for main L0
2606        let wal_path = ObjectStorePath::from("wal");
2607        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2608
2609        let mut writer = Writer::new_with_config(
2610            storage.clone(),
2611            schema_manager.clone(),
2612            1,
2613            UniConfig::default(),
2614            Some(wal),
2615            None,
2616        )
2617        .await?;
2618
2619        // Begin transaction — create a transaction L0
2620        let tx_l0 = writer.create_transaction_l0();
2621
2622        // Insert data in transaction
2623        let vid_a = writer.next_vid().await?;
2624        let vid_b = writer.next_vid().await?;
2625
2626        let mut props = std::collections::HashMap::new();
2627        props.insert("test".to_string(), Value::String("data".to_string()));
2628
2629        writer
2630            .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
2631            .await?;
2632        writer
2633            .insert_vertex_with_labels(
2634                vid_b,
2635                std::collections::HashMap::new(),
2636                &["Test".to_string()],
2637                Some(&tx_l0),
2638            )
2639            .await?;
2640
2641        let eid = writer.next_eid(1).await?;
2642        writer
2643            .insert_edge(
2644                vid_a,
2645                vid_b,
2646                1,
2647                eid,
2648                std::collections::HashMap::new(),
2649                None,
2650                Some(&tx_l0),
2651            )
2652            .await?;
2653
2654        // Get WAL before commit
2655        let l0 = writer.l0_manager.get_current();
2656        let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
2657        let mutations_before = wal.replay().await?;
2658        let count_before = mutations_before.len();
2659
2660        // Commit transaction - this should write to WAL first
2661        writer.commit_transaction_l0(tx_l0).await?;
2662
2663        // Verify WAL has the new mutations
2664        let mutations_after = wal.replay().await?;
2665        assert!(
2666            mutations_after.len() > count_before,
2667            "WAL should contain transaction mutations after commit"
2668        );
2669
2670        // Verify mutations are in correct order: vertices first, then edges
2671        let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
2672
2673        let mut saw_vertex_a = false;
2674        let mut saw_vertex_b = false;
2675        let mut saw_edge = false;
2676
2677        for mutation in &new_mutations {
2678            match mutation {
2679                crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
2680                    if *vid == vid_a {
2681                        saw_vertex_a = true;
2682                    }
2683                    if *vid == vid_b {
2684                        saw_vertex_b = true;
2685                    }
2686                    // Vertices should come before edges
2687                    assert!(!saw_edge, "Vertices should be logged to WAL before edges");
2688                }
2689                crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
2690                    if *e == eid {
2691                        saw_edge = true;
2692                    }
2693                    // Edges should come after vertices
2694                    assert!(
2695                        saw_vertex_a && saw_vertex_b,
2696                        "Edge should be logged after both vertices"
2697                    );
2698                }
2699                _ => {}
2700            }
2701        }
2702
2703        assert!(saw_vertex_a, "Vertex A should be in WAL");
2704        assert!(saw_vertex_b, "Vertex B should be in WAL");
2705        assert!(saw_edge, "Edge should be in WAL");
2706
2707        // Verify data is also in main L0
2708        let l0_read = l0.read();
2709        assert!(
2710            l0_read.vertex_properties.contains_key(&vid_a),
2711            "Vertex A should be in main L0"
2712        );
2713        assert!(
2714            l0_read.vertex_properties.contains_key(&vid_b),
2715            "Vertex B should be in main L0"
2716        );
2717        assert!(
2718            l0_read.edge_endpoints.contains_key(&eid),
2719            "Edge should be in main L0"
2720        );
2721
2722        Ok(())
2723    }
2724
2725    /// Test that failed WAL flush leaves transaction intact for retry or rollback.
2726    #[tokio::test]
2727    async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
2728        use crate::runtime::wal::WriteAheadLog;
2729        use crate::storage::manager::StorageManager;
2730        use object_store::local::LocalFileSystem;
2731        use object_store::path::Path as ObjectStorePath;
2732        use uni_common::core::schema::SchemaManager;
2733
2734        let dir = tempdir()?;
2735        let path = dir.path().to_str().unwrap();
2736        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2737        let schema_path = ObjectStorePath::from("schema.json");
2738
2739        let schema_manager =
2740            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2741        let _label_id = schema_manager.add_label("Test")?;
2742        let _baseline_label_id = schema_manager.add_label("Baseline")?;
2743        let _txdata_label_id = schema_manager.add_label("TxData")?;
2744        schema_manager.save().await?;
2745
2746        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2747
2748        // Create WAL for main L0
2749        let wal_path = ObjectStorePath::from("wal");
2750        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2751
2752        let mut writer = Writer::new_with_config(
2753            storage.clone(),
2754            schema_manager.clone(),
2755            1,
2756            UniConfig::default(),
2757            Some(wal),
2758            None,
2759        )
2760        .await?;
2761
2762        // Insert baseline data (outside transaction)
2763        let baseline_vid = writer.next_vid().await?;
2764        writer
2765            .insert_vertex_with_labels(
2766                baseline_vid,
2767                [("baseline".to_string(), Value::Bool(true))]
2768                    .into_iter()
2769                    .collect(),
2770                &["Baseline".to_string()],
2771                None,
2772            )
2773            .await?;
2774
2775        // Begin transaction — create a transaction L0
2776        let tx_l0 = writer.create_transaction_l0();
2777
2778        // Insert data in transaction
2779        let tx_vid = writer.next_vid().await?;
2780        writer
2781            .insert_vertex_with_labels(
2782                tx_vid,
2783                [("tx_data".to_string(), Value::Bool(true))]
2784                    .into_iter()
2785                    .collect(),
2786                &["TxData".to_string()],
2787                Some(&tx_l0),
2788            )
2789            .await?;
2790
2791        // Capture main L0 state before rollback
2792        let l0 = writer.l0_manager.get_current();
2793        let vertex_count_before = l0.read().vertex_properties.len();
2794
2795        // Rollback transaction (simulating what would happen after WAL flush failure)
2796        drop(tx_l0);
2797
2798        // Verify main L0 is unchanged
2799        let vertex_count_after = l0.read().vertex_properties.len();
2800        assert_eq!(
2801            vertex_count_before, vertex_count_after,
2802            "Main L0 should not change after rollback"
2803        );
2804
2805        // Baseline should still be present
2806        assert!(
2807            l0.read().vertex_properties.contains_key(&baseline_vid),
2808            "Baseline data should remain"
2809        );
2810
2811        // Transaction data should NOT be in main L0
2812        assert!(
2813            !l0.read().vertex_properties.contains_key(&tx_vid),
2814            "Transaction data should not be in main L0 after rollback"
2815        );
2816
2817        Ok(())
2818    }
2819
2820    /// Test that batch insert with shared labels does not clone labels per vertex.
2821    /// This verifies fix for issue #161 (redundant label cloning).
2822    #[tokio::test]
2823    async fn test_batch_insert_shared_labels() -> Result<()> {
2824        use crate::storage::manager::StorageManager;
2825        use object_store::local::LocalFileSystem;
2826        use object_store::path::Path as ObjectStorePath;
2827        use uni_common::core::schema::SchemaManager;
2828
2829        let dir = tempdir()?;
2830        let path = dir.path().to_str().unwrap();
2831        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2832        let schema_path = ObjectStorePath::from("schema.json");
2833
2834        let schema_manager =
2835            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2836        let _label_id = schema_manager.add_label("Person")?;
2837        schema_manager.save().await?;
2838
2839        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2840
2841        let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2842
2843        // Shared labels - should not be cloned per vertex
2844        let labels = &["Person".to_string()];
2845
2846        // Insert batch of vertices with same labels
2847        let mut vids = Vec::new();
2848        for i in 0..100 {
2849            let vid = writer.next_vid().await?;
2850            let mut props = std::collections::HashMap::new();
2851            props.insert("id".to_string(), Value::Int(i));
2852            writer
2853                .insert_vertex_with_labels(vid, props, labels, None)
2854                .await?;
2855            vids.push(vid);
2856        }
2857
2858        // Verify all vertices have the correct labels
2859        let l0 = writer.l0_manager.get_current();
2860        for vid in vids {
2861            let l0_guard = l0.read();
2862            let vertex_labels = l0_guard.vertex_labels.get(&vid);
2863            assert!(vertex_labels.is_some(), "Vertex should have labels");
2864            assert_eq!(
2865                vertex_labels.unwrap(),
2866                &vec!["Person".to_string()],
2867                "Labels should match"
2868            );
2869        }
2870
2871        Ok(())
2872    }
2873
2874    /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
2875    /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
2876    #[tokio::test]
2877    async fn test_estimated_size_tracks_mutations() -> Result<()> {
2878        use crate::storage::manager::StorageManager;
2879        use object_store::local::LocalFileSystem;
2880        use object_store::path::Path as ObjectStorePath;
2881        use uni_common::core::schema::SchemaManager;
2882
2883        let dir = tempdir()?;
2884        let path = dir.path().to_str().unwrap();
2885        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2886        let schema_path = ObjectStorePath::from("schema.json");
2887
2888        let schema_manager =
2889            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2890        let _label_id = schema_manager.add_label("Test")?;
2891        schema_manager.save().await?;
2892
2893        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2894
2895        let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2896
2897        let l0 = writer.l0_manager.get_current();
2898
2899        // Initial state should be empty
2900        let initial_estimated = l0.read().estimated_size;
2901        let initial_actual = l0.read().size_bytes();
2902        assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
2903        assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
2904
2905        // Insert vertices with properties
2906        let mut vids = Vec::new();
2907        for i in 0..10 {
2908            let vid = writer.next_vid().await?;
2909            let mut props = std::collections::HashMap::new();
2910            props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
2911            props.insert("index".to_string(), Value::Int(i));
2912            writer
2913                .insert_vertex_with_labels(vid, props, &[], None)
2914                .await?;
2915            vids.push(vid);
2916        }
2917
2918        // Verify estimated_size grew
2919        let after_vertices_estimated = l0.read().estimated_size;
2920        let after_vertices_actual = l0.read().size_bytes();
2921        assert!(
2922            after_vertices_estimated > 0,
2923            "estimated_size should grow after insertions"
2924        );
2925
2926        // Verify estimated_size is within reasonable bounds of actual size (within 2x)
2927        let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
2928        assert!(
2929            (0.5..=2.0).contains(&ratio),
2930            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2931            after_vertices_estimated,
2932            after_vertices_actual,
2933            ratio
2934        );
2935
2936        // Insert edges with a simple edge type
2937        let edge_type = 1u32;
2938        for i in 0..9 {
2939            let eid = writer.next_eid(edge_type).await?;
2940            writer
2941                .insert_edge(
2942                    vids[i],
2943                    vids[i + 1],
2944                    edge_type,
2945                    eid,
2946                    std::collections::HashMap::new(),
2947                    Some("NEXT".to_string()),
2948                    None,
2949                )
2950                .await?;
2951        }
2952
2953        // Verify estimated_size grew further
2954        let after_edges_estimated = l0.read().estimated_size;
2955        let after_edges_actual = l0.read().size_bytes();
2956        assert!(
2957            after_edges_estimated > after_vertices_estimated,
2958            "estimated_size should grow after edge insertions"
2959        );
2960
2961        // Verify still within reasonable bounds
2962        let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
2963        assert!(
2964            (0.5..=2.0).contains(&ratio),
2965            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2966            after_edges_estimated,
2967            after_edges_actual,
2968            ratio
2969        );
2970
2971        Ok(())
2972    }
2973}