Skip to main content

uni_db/api/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8
9pub mod appender;
10pub mod builder;
11pub mod bulk;
12pub mod compaction;
13pub mod functions;
14pub mod hooks;
15pub mod impl_locy;
16pub mod impl_query;
17pub mod indexes;
18pub mod locy_builder;
19pub mod locy_result;
20pub mod multi_agent;
21pub mod notifications;
22pub mod prepared;
23pub mod query_builder;
24pub mod rule_registry;
25pub mod schema;
26pub mod session;
27pub mod sync;
28pub mod template;
29pub mod transaction;
30pub mod xervo;
31
32use object_store::ObjectStore;
33use object_store::local::LocalFileSystem;
34use tracing::info;
35use uni_common::core::snapshot::SnapshotManifest;
36use uni_common::{CloudStorageConfig, UniConfig};
37use uni_common::{Result, UniError};
38use uni_store::cloud::build_cloud_store;
39use uni_xervo::api::{ModelAliasSpec, ModelTask};
40use uni_xervo::runtime::ModelRuntime;
41
42use uni_common::core::schema::SchemaManager;
43use uni_store::runtime::id_allocator::IdAllocator;
44use uni_store::runtime::property_manager::PropertyManager;
45use uni_store::runtime::wal::WriteAheadLog;
46use uni_store::storage::manager::StorageManager;
47
48use tokio::sync::RwLock;
49use uni_store::runtime::writer::Writer;
50
51use crate::shutdown::ShutdownHandle;
52
53use std::collections::HashMap;
54
55/// Shared inner state of a Uni database instance.
56///
57/// Wrapped in `Arc` by [`Uni`] so that [`Session`](session::Session) and
58/// [`Transaction`](transaction::Transaction) can hold cheap, owned references
59/// without lifetime parameters.
60/// Shared inner state of a Uni database instance. Not intended for direct use.
61#[doc(hidden)]
62pub struct UniInner {
63    pub(crate) storage: Arc<StorageManager>,
64    pub(crate) schema: Arc<SchemaManager>,
65    pub(crate) properties: Arc<PropertyManager>,
66    pub(crate) writer: Option<Arc<RwLock<Writer>>>,
67    pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
68    pub(crate) config: UniConfig,
69    pub(crate) procedure_registry: Arc<uni_query::ProcedureRegistry>,
70    pub(crate) shutdown_handle: Arc<ShutdownHandle>,
71    /// Global registry of pre-compiled Locy rules.
72    ///
73    /// Cloned into every new Session. Use `db.register_rules()` to add rules
74    /// globally, or `session.register_rules()` for session-scoped rules.
75    pub(crate) locy_rule_registry: Arc<std::sync::RwLock<impl_locy::LocyRuleRegistry>>,
76    /// Timestamp when this database instance was built.
77    pub(crate) start_time: Instant,
78    /// Broadcast channel for commit notifications.
79    pub(crate) commit_tx: tokio::sync::broadcast::Sender<Arc<notifications::CommitNotification>>,
80    /// Write lease configuration for multi-agent access.
81    pub(crate) write_lease: Option<multi_agent::WriteLease>,
82    /// Number of currently active sessions.
83    pub(crate) active_session_count: AtomicUsize,
84    /// Total queries executed across all sessions.
85    pub(crate) total_queries: AtomicU64,
86    /// Total transactions committed across all sessions.
87    pub(crate) total_commits: AtomicU64,
88    /// Database-level registry of custom scalar functions.
89    pub(crate) custom_functions: Arc<std::sync::RwLock<uni_query::CustomFunctionRegistry>>,
90
91    // ── Cached metrics (updated on commit, read by sync `metrics()`) ─────
92    /// Cached L0 mutation count (updated after every commit).
93    pub(crate) cached_l0_mutation_count: AtomicUsize,
94    /// Cached L0 estimated size in bytes (updated after every commit).
95    pub(crate) cached_l0_estimated_size: AtomicUsize,
96    /// Cached WAL log sequence number (updated after every commit).
97    pub(crate) cached_wal_lsn: AtomicU64,
98}
99
100/// Write throttle pressure as a value in 0.0–1.0.
101///
102/// Indicates how much back-pressure the storage layer is exerting.
103/// 0.0 means no throttling; 1.0 means fully throttled.
104#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
105pub struct ThrottlePressure(f64);
106
107impl ThrottlePressure {
108    /// Create a new throttle pressure value, clamped to 0.0–1.0.
109    pub fn new(value: f64) -> Self {
110        Self(value.clamp(0.0, 1.0))
111    }
112
113    /// The raw pressure value (0.0–1.0).
114    pub fn value(&self) -> f64 {
115        self.0
116    }
117
118    /// Returns `true` if any throttle pressure is active.
119    pub fn is_throttled(&self) -> bool {
120        self.0 > 0.0
121    }
122}
123
124impl std::fmt::Display for ThrottlePressure {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        write!(f, "{:.1}%", self.0 * 100.0)
127    }
128}
129
130impl Default for ThrottlePressure {
131    fn default() -> Self {
132        Self(0.0)
133    }
134}
135
136/// Snapshot of database-level metrics.
137#[derive(Debug, Clone)]
138pub struct DatabaseMetrics {
139    /// Current L0 mutation count (cumulative since last flush).
140    pub l0_mutation_count: usize,
141    /// Estimated L0 buffer size in bytes.
142    pub l0_estimated_size_bytes: usize,
143    /// Schema version number.
144    pub schema_version: u64,
145    /// Time since the database instance was created.
146    pub uptime: Duration,
147    /// Number of currently active sessions.
148    pub active_sessions: usize,
149    /// Number of L1 compaction runs completed (0 until storage instrumentation).
150    pub l1_run_count: usize,
151    /// Write throttle pressure (0.0–1.0, 0 until instrumentation).
152    pub write_throttle_pressure: ThrottlePressure,
153    /// Current compaction status.
154    pub compaction_status: uni_store::CompactionStatus,
155    /// WAL size in bytes (0 until storage instrumentation).
156    pub wal_size_bytes: u64,
157    /// Highest WAL log sequence number that has been flushed (0 when no WAL is configured).
158    pub wal_lsn: u64,
159    /// Total queries executed across all sessions.
160    pub total_queries: u64,
161    /// Total transactions committed across all sessions.
162    pub total_commits: u64,
163}
164
165/// Main entry point for Uni embedded database.
166///
167/// `Uni` is the lifecycle and admin handle. All data access goes through
168/// [`Session`](session::Session) (reads) and [`Transaction`](transaction::Transaction) (writes).
169///
170/// # Examples
171///
172/// ```no_run
173/// use uni_db::Uni;
174///
175/// #[tokio::main]
176/// async fn main() -> Result<(), uni_db::UniError> {
177///     let db = Uni::open("./my_db").build().await?;
178///
179///     // All data access goes through sessions
180///     let session = db.session();
181///     let results = session.query("MATCH (n) RETURN count(n)").await?;
182///     println!("Count: {:?}", results);
183///     Ok(())
184/// }
185/// ```
186pub struct Uni {
187    pub(crate) inner: Arc<UniInner>,
188}
189
190// No Deref<Target = UniInner> — Uni is an opaque handle.
191// All field access goes through `self.inner.field` explicitly.
192
193impl UniInner {
194    /// Open a point-in-time view of the database at the given snapshot.
195    ///
196    /// Returns a new `UniInner` that is pinned to the specified snapshot state.
197    /// The returned instance is read-only.
198    pub(crate) async fn at_snapshot(&self, snapshot_id: &str) -> Result<UniInner> {
199        let manifest = self
200            .storage
201            .snapshot_manager()
202            .load_snapshot(snapshot_id)
203            .await
204            .map_err(UniError::Internal)?;
205
206        let pinned_storage = Arc::new(self.storage.pinned(manifest));
207
208        let prop_manager = Arc::new(PropertyManager::new(
209            pinned_storage.clone(),
210            self.schema.clone(),
211            self.properties.cache_size(),
212        ));
213
214        let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
215
216        let (commit_tx, _) = tokio::sync::broadcast::channel(256);
217        Ok(UniInner {
218            storage: pinned_storage,
219            schema: self.schema.clone(),
220            properties: prop_manager,
221            writer: None,
222            xervo_runtime: self.xervo_runtime.clone(),
223            config: self.config.clone(),
224            procedure_registry: self.procedure_registry.clone(),
225            shutdown_handle,
226            locy_rule_registry: Arc::new(std::sync::RwLock::new(
227                impl_locy::LocyRuleRegistry::default(),
228            )),
229            start_time: Instant::now(),
230            commit_tx,
231            write_lease: None,
232            active_session_count: AtomicUsize::new(0),
233            total_queries: AtomicU64::new(0),
234            total_commits: AtomicU64::new(0),
235            custom_functions: self.custom_functions.clone(),
236            cached_l0_mutation_count: AtomicUsize::new(0),
237            cached_l0_estimated_size: AtomicUsize::new(0),
238            cached_wal_lsn: AtomicU64::new(0),
239        })
240    }
241}
242
243impl Uni {
244    /// Open or create a database at the given path.
245    ///
246    /// If the database does not exist, it will be created.
247    ///
248    /// # Arguments
249    ///
250    /// * `uri` - Local path or object store URI.
251    ///
252    /// # Returns
253    ///
254    /// A [`UniBuilder`] to configure and build the database instance.
255    pub fn open(uri: impl Into<String>) -> UniBuilder {
256        UniBuilder::new(uri.into())
257    }
258
259    /// Open an existing database at the given path. Fails if it does not exist.
260    pub fn open_existing(uri: impl Into<String>) -> UniBuilder {
261        let mut builder = UniBuilder::new(uri.into());
262        builder.create_if_missing = false;
263        builder
264    }
265
266    /// Create a new database at the given path. Fails if it already exists.
267    pub fn create(uri: impl Into<String>) -> UniBuilder {
268        let mut builder = UniBuilder::new(uri.into());
269        builder.fail_if_exists = true;
270        builder
271    }
272
273    /// Create a temporary database that is deleted when dropped.
274    ///
275    /// Useful for tests and short-lived processing.
276    /// Note: Currently uses a temporary directory on the filesystem.
277    pub fn temporary() -> UniBuilder {
278        let temp_dir = std::env::temp_dir().join(format!("uni_mem_{}", uuid::Uuid::new_v4()));
279        UniBuilder::new(temp_dir.to_string_lossy().to_string())
280    }
281
282    /// Open an in-memory database (alias for temporary).
283    pub fn in_memory() -> UniBuilder {
284        Self::temporary()
285    }
286
287    // ── Session Factory (primary entry point for data access) ────────
288
289    /// Create a new Session for data access.
290    ///
291    /// Sessions are cheap, synchronous, and infallible. All reads go through
292    /// sessions, and sessions are the factory for transactions (writes).
293    ///
294    /// # Examples
295    ///
296    /// ```no_run
297    /// # use uni_db::Uni;
298    /// # async fn example(db: &Uni) -> uni_db::Result<()> {
299    /// let session = db.session();
300    /// let rows = session.query("MATCH (n) RETURN n LIMIT 10").await?;
301    /// # Ok(())
302    /// # }
303    /// ```
304    pub fn session(&self) -> session::Session {
305        session::Session::new(self.inner.clone())
306    }
307
308    /// Create a session template builder for pre-configured session factories.
309    ///
310    /// Templates pre-compile Locy rules, bind parameters, and attach hooks
311    /// once, then cheaply stamp out sessions per-request.
312    pub fn session_template(&self) -> template::SessionTemplateBuilder {
313        template::SessionTemplateBuilder::new(self.inner.clone())
314    }
315
316    // ── Database Metrics ──────────────────────────────────────────────
317
318    /// Snapshot the database-level metrics.
319    ///
320    /// This is a cheap, synchronous read of cached atomic values.
321    /// L0 metrics (`l0_mutation_count`, `l0_estimated_size_bytes`, `wal_lsn`)
322    /// reflect the state as of the last successful commit.
323    pub fn metrics(&self) -> DatabaseMetrics {
324        let schema_version = self.inner.schema.schema().schema_version as u64;
325        let compaction_status = self.inner.storage.compaction_status().unwrap_or_default();
326        DatabaseMetrics {
327            l0_mutation_count: self.inner.cached_l0_mutation_count.load(Ordering::Relaxed),
328            l0_estimated_size_bytes: self.inner.cached_l0_estimated_size.load(Ordering::Relaxed),
329            schema_version,
330            uptime: self.inner.start_time.elapsed(),
331            active_sessions: self.inner.active_session_count.load(Ordering::Relaxed),
332            l1_run_count: compaction_status.l1_runs,
333            write_throttle_pressure: ThrottlePressure::default(),
334            compaction_status,
335            wal_size_bytes: 0u64,
336            wal_lsn: self.inner.cached_wal_lsn.load(Ordering::Relaxed),
337            total_queries: self.inner.total_queries.load(Ordering::Relaxed),
338            total_commits: self.inner.total_commits.load(Ordering::Relaxed),
339        }
340    }
341
342    /// Returns the write lease configuration, if any.
343    /// Write lease enforcement is Phase 2.
344    pub fn write_lease(&self) -> Option<&multi_agent::WriteLease> {
345        self.inner.write_lease.as_ref()
346    }
347
348    // ── Global Locy Rule Management ───────────────────────────────────
349
350    /// Access the global rule registry for managing pre-compiled Locy rules.
351    ///
352    /// Rules registered here are cloned into every new Session.
353    pub fn rules(&self) -> rule_registry::RuleRegistry<'_> {
354        rule_registry::RuleRegistry::new(&self.inner.locy_rule_registry)
355    }
356
357    // ── Configuration & Introspection ─────────────────────────────────
358
359    /// Get configuration.
360    pub fn config(&self) -> &UniConfig {
361        &self.inner.config
362    }
363
364    /// Returns the procedure registry for registering test procedures.
365    #[doc(hidden)]
366    pub fn procedure_registry(&self) -> &Arc<uni_query::ProcedureRegistry> {
367        &self.inner.procedure_registry
368    }
369
370    /// Get schema manager.
371    #[doc(hidden)]
372    pub fn schema_manager(&self) -> Arc<SchemaManager> {
373        self.inner.schema.clone()
374    }
375
376    #[doc(hidden)]
377    pub fn writer(&self) -> Option<Arc<RwLock<Writer>>> {
378        self.inner.writer.clone()
379    }
380
381    #[doc(hidden)]
382    pub fn storage(&self) -> Arc<StorageManager> {
383        self.inner.storage.clone()
384    }
385
386    /// Flush all uncommitted changes to persistent storage (L1).
387    ///
388    /// This forces a write of the current in-memory buffer (L0) to columnar files.
389    /// It also creates a new snapshot.
390    pub async fn flush(&self) -> Result<()> {
391        if let Some(writer_lock) = &self.inner.writer {
392            let mut writer = writer_lock.write().await;
393            writer
394                .flush_to_l1(None)
395                .await
396                .map(|_| ())
397                .map_err(UniError::Internal)
398        } else {
399            Err(UniError::ReadOnly {
400                operation: "flush".to_string(),
401            })
402        }
403    }
404
405    /// Create a named point-in-time snapshot of the database.
406    ///
407    /// Flushes current changes, records the state, and persists the snapshot
408    /// under the given name so it can be retrieved later.
409    /// Returns the snapshot ID.
410    pub async fn create_snapshot(&self, name: &str) -> Result<String> {
411        if name.is_empty() {
412            return Err(UniError::Internal(anyhow::anyhow!(
413                "Snapshot name cannot be empty"
414            )));
415        }
416
417        let snapshot_id = if let Some(writer_lock) = &self.inner.writer {
418            let mut writer = writer_lock.write().await;
419            writer
420                .flush_to_l1(Some(name.to_string()))
421                .await
422                .map_err(UniError::Internal)?
423        } else {
424            return Err(UniError::ReadOnly {
425                operation: "create_snapshot".to_string(),
426            });
427        };
428
429        self.inner
430            .storage
431            .snapshot_manager()
432            .save_named_snapshot(name, &snapshot_id)
433            .await
434            .map_err(UniError::Internal)?;
435
436        Ok(snapshot_id)
437    }
438
439    /// List all available snapshots.
440    pub async fn list_snapshots(&self) -> Result<Vec<SnapshotManifest>> {
441        let sm = self.inner.storage.snapshot_manager();
442        let ids = sm.list_snapshots().await.map_err(UniError::Internal)?;
443        let mut manifests = Vec::new();
444        for id in ids {
445            if let Ok(m) = sm.load_snapshot(&id).await {
446                manifests.push(m);
447            }
448        }
449        Ok(manifests)
450    }
451
452    /// Restore the database to a specific snapshot.
453    ///
454    /// **Note**: This currently requires a restart or re-opening of Uni to fully take effect
455    /// as it only updates the latest pointer.
456    pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
457        self.inner
458            .storage
459            .snapshot_manager()
460            .set_latest_snapshot(snapshot_id)
461            .await
462            .map_err(UniError::Internal)
463    }
464
465    /// Check if a label exists in the schema.
466    pub async fn label_exists(&self, name: &str) -> Result<bool> {
467        Ok(self
468            .inner
469            .schema
470            .schema()
471            .labels
472            .get(name)
473            .is_some_and(|l| {
474                matches!(
475                    l.state,
476                    uni_common::core::schema::SchemaElementState::Active
477                )
478            }))
479    }
480
481    /// Check if an edge type exists in the schema.
482    pub async fn edge_type_exists(&self, name: &str) -> Result<bool> {
483        Ok(self
484            .inner
485            .schema
486            .schema()
487            .edge_types
488            .get(name)
489            .is_some_and(|e| {
490                matches!(
491                    e.state,
492                    uni_common::core::schema::SchemaElementState::Active
493                )
494            }))
495    }
496
497    /// Get all label names.
498    /// Returns the union of schema-registered labels (Active state) and labels
499    /// discovered from data (for schemaless mode where labels may not be in the
500    /// schema). This is consistent with `list_edge_types()` for schema labels
501    /// while also supporting schemaless workflows.
502    pub async fn list_labels(&self) -> Result<Vec<String>> {
503        let mut all_labels = std::collections::HashSet::new();
504
505        // Schema labels (covers schema-defined labels that may not have data yet)
506        for (name, label) in self.inner.schema.schema().labels.iter() {
507            if matches!(
508                label.state,
509                uni_common::core::schema::SchemaElementState::Active
510            ) {
511                all_labels.insert(name.clone());
512            }
513        }
514
515        // Data labels (covers schemaless labels that aren't in the schema)
516        let query = "MATCH (n) RETURN DISTINCT labels(n) AS labels";
517        let result = self.inner.execute_internal(query, HashMap::new()).await?;
518        for row in result.rows() {
519            if let Ok(labels_list) = row.get::<Vec<String>>("labels") {
520                for label in labels_list {
521                    all_labels.insert(label);
522                }
523            }
524        }
525
526        Ok(all_labels.into_iter().collect())
527    }
528
529    /// Get all edge type names.
530    pub async fn list_edge_types(&self) -> Result<Vec<String>> {
531        Ok(self
532            .inner
533            .schema
534            .schema()
535            .edge_types
536            .iter()
537            .filter(|(_, e)| {
538                matches!(
539                    e.state,
540                    uni_common::core::schema::SchemaElementState::Active
541                )
542            })
543            .map(|(name, _)| name.clone())
544            .collect())
545    }
546
547    /// Get detailed information about a label.
548    pub async fn get_label_info(
549        &self,
550        name: &str,
551    ) -> Result<Option<crate::api::schema::LabelInfo>> {
552        let schema = self.inner.schema.schema();
553        if schema.labels.contains_key(name) {
554            let count = if let Ok(ds) = self.inner.storage.vertex_dataset(name) {
555                if let Ok(raw) = ds.open_raw().await {
556                    raw.count_rows(None)
557                        .await
558                        .map_err(|e| UniError::Internal(anyhow::anyhow!(e)))?
559                } else {
560                    0
561                }
562            } else {
563                0
564            };
565
566            let mut properties = Vec::new();
567            if let Some(props) = schema.properties.get(name) {
568                for (prop_name, prop_meta) in props {
569                    let is_indexed = schema.indexes.iter().any(|idx| match idx {
570                        uni_common::core::schema::IndexDefinition::Vector(v) => {
571                            v.label == name && v.property == *prop_name
572                        }
573                        uni_common::core::schema::IndexDefinition::Scalar(s) => {
574                            s.label == name && s.properties.contains(prop_name)
575                        }
576                        uni_common::core::schema::IndexDefinition::FullText(f) => {
577                            f.label == name && f.properties.contains(prop_name)
578                        }
579                        uni_common::core::schema::IndexDefinition::Inverted(inv) => {
580                            inv.label == name && inv.property == *prop_name
581                        }
582                        uni_common::core::schema::IndexDefinition::JsonFullText(j) => {
583                            j.label == name
584                        }
585                        _ => false,
586                    });
587
588                    properties.push(crate::api::schema::PropertyInfo {
589                        name: prop_name.clone(),
590                        data_type: format!("{:?}", prop_meta.r#type),
591                        nullable: prop_meta.nullable,
592                        is_indexed,
593                    });
594                }
595            }
596
597            let mut indexes = Vec::new();
598            for idx in schema.indexes.iter().filter(|i| i.label() == name) {
599                use uni_common::core::schema::IndexDefinition;
600                let (idx_type, idx_props) = match idx {
601                    IndexDefinition::Vector(v) => ("VECTOR", vec![v.property.clone()]),
602                    IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
603                    IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
604                    IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
605                    IndexDefinition::JsonFullText(j) => ("JSON_FTS", vec![j.column.clone()]),
606                    _ => continue,
607                };
608
609                indexes.push(crate::api::schema::IndexInfo {
610                    name: idx.name().to_string(),
611                    index_type: idx_type.to_string(),
612                    properties: idx_props,
613                    status: "ONLINE".to_string(), // TODO: Check actual status
614                });
615            }
616
617            let mut constraints = Vec::new();
618            for c in &schema.constraints {
619                if let uni_common::core::schema::ConstraintTarget::Label(l) = &c.target
620                    && l == name
621                {
622                    let (ctype, cprops) = match &c.constraint_type {
623                        uni_common::core::schema::ConstraintType::Unique { properties } => {
624                            ("UNIQUE", properties.clone())
625                        }
626                        uni_common::core::schema::ConstraintType::Exists { property } => {
627                            ("EXISTS", vec![property.clone()])
628                        }
629                        uni_common::core::schema::ConstraintType::Check { expression } => {
630                            ("CHECK", vec![expression.clone()])
631                        }
632                        _ => ("UNKNOWN", vec![]),
633                    };
634
635                    constraints.push(crate::api::schema::ConstraintInfo {
636                        name: c.name.clone(),
637                        constraint_type: ctype.to_string(),
638                        properties: cprops,
639                        enabled: c.enabled,
640                    });
641                }
642            }
643
644            Ok(Some(crate::api::schema::LabelInfo {
645                name: name.to_string(),
646                count,
647                properties,
648                indexes,
649                constraints,
650            }))
651        } else {
652            Ok(None)
653        }
654    }
655
656    /// Get detailed information about an edge type.
657    pub async fn get_edge_type_info(
658        &self,
659        name: &str,
660    ) -> Result<Option<crate::api::schema::EdgeTypeInfo>> {
661        let schema = self.inner.schema.schema();
662        let edge_meta = match schema.edge_types.get(name) {
663            Some(meta) => meta,
664            None => return Ok(None),
665        };
666
667        // Count edges via internal query
668        let count = {
669            let query = format!("MATCH ()-[r:{}]->() RETURN count(r) AS cnt", name);
670            match self.inner.execute_internal(&query, HashMap::new()).await {
671                Ok(result) => result
672                    .rows()
673                    .first()
674                    .and_then(|r| r.get::<i64>("cnt").ok())
675                    .unwrap_or(0) as usize,
676                Err(_) => 0,
677            }
678        };
679
680        let source_labels = edge_meta.src_labels.clone();
681        let target_labels = edge_meta.dst_labels.clone();
682
683        let mut properties = Vec::new();
684        if let Some(props) = schema.properties.get(name) {
685            for (prop_name, prop_meta) in props {
686                let is_indexed = schema.indexes.iter().any(|idx| match idx {
687                    uni_common::core::schema::IndexDefinition::Scalar(s) => {
688                        s.label == name && s.properties.contains(prop_name)
689                    }
690                    uni_common::core::schema::IndexDefinition::FullText(f) => {
691                        f.label == name && f.properties.contains(prop_name)
692                    }
693                    uni_common::core::schema::IndexDefinition::Inverted(inv) => {
694                        inv.label == name && inv.property == *prop_name
695                    }
696                    _ => false,
697                });
698
699                properties.push(crate::api::schema::PropertyInfo {
700                    name: prop_name.clone(),
701                    data_type: format!("{:?}", prop_meta.r#type),
702                    nullable: prop_meta.nullable,
703                    is_indexed,
704                });
705            }
706        }
707
708        let mut indexes = Vec::new();
709        for idx in schema.indexes.iter().filter(|i| i.label() == name) {
710            use uni_common::core::schema::IndexDefinition;
711            let (idx_type, idx_props) = match idx {
712                IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
713                IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
714                IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
715                _ => continue,
716            };
717
718            indexes.push(crate::api::schema::IndexInfo {
719                name: idx.name().to_string(),
720                index_type: idx_type.to_string(),
721                properties: idx_props,
722                status: "ONLINE".to_string(),
723            });
724        }
725
726        let mut constraints = Vec::new();
727        for c in &schema.constraints {
728            if let uni_common::core::schema::ConstraintTarget::EdgeType(et) = &c.target
729                && et == name
730            {
731                let (ctype, cprops) = match &c.constraint_type {
732                    uni_common::core::schema::ConstraintType::Unique { properties } => {
733                        ("UNIQUE", properties.clone())
734                    }
735                    uni_common::core::schema::ConstraintType::Exists { property } => {
736                        ("EXISTS", vec![property.clone()])
737                    }
738                    uni_common::core::schema::ConstraintType::Check { expression } => {
739                        ("CHECK", vec![expression.clone()])
740                    }
741                    _ => ("UNKNOWN", vec![]),
742                };
743
744                constraints.push(crate::api::schema::ConstraintInfo {
745                    name: c.name.clone(),
746                    constraint_type: ctype.to_string(),
747                    properties: cprops,
748                    enabled: c.enabled,
749                });
750            }
751        }
752
753        Ok(Some(crate::api::schema::EdgeTypeInfo {
754            name: name.to_string(),
755            count,
756            source_labels,
757            target_labels,
758            properties,
759            indexes,
760            constraints,
761        }))
762    }
763
764    // ── Compaction ──────────────────────────────────────────────────────
765
766    /// Access compaction operations.
767    pub fn compaction(&self) -> compaction::Compaction<'_> {
768        compaction::Compaction { inner: &self.inner }
769    }
770
771    // ── Indexes ──────────────────────────────────────────────────────────
772
773    /// Access index management operations.
774    pub fn indexes(&self) -> indexes::Indexes<'_> {
775        indexes::Indexes { inner: &self.inner }
776    }
777
778    // ── Custom Functions ──────────────────────────────────────────────
779
780    /// Access custom Cypher function management.
781    pub fn functions(&self) -> functions::Functions<'_> {
782        functions::Functions { inner: &self.inner }
783    }
784
785    /// Shutdown the database gracefully, flushing pending data and stopping background tasks.
786    ///
787    /// This method flushes any pending data and waits for all background tasks to complete
788    /// (with a timeout). After calling this method, the database instance should not be used.
789    pub async fn shutdown(self) -> Result<()> {
790        // Flush pending data
791        if let Some(ref writer) = self.inner.writer {
792            let mut w = writer.write().await;
793            if let Err(e) = w.flush_to_l1(None).await {
794                tracing::error!("Error flushing during shutdown: {}", e);
795            }
796        }
797
798        self.inner
799            .shutdown_handle
800            .shutdown_async()
801            .await
802            .map_err(UniError::Internal)
803    }
804}
805
806impl Drop for Uni {
807    fn drop(&mut self) {
808        self.inner.shutdown_handle.shutdown_blocking();
809        tracing::debug!("Uni dropped, shutdown signal sent");
810    }
811}
812
813/// Builder for configuring and opening a `Uni` database instance.
814#[must_use = "builders do nothing until .build() is called"]
815pub struct UniBuilder {
816    uri: String,
817    config: UniConfig,
818    schema_file: Option<PathBuf>,
819    xervo_catalog: Option<Vec<ModelAliasSpec>>,
820    hybrid_remote_url: Option<String>,
821    cloud_config: Option<CloudStorageConfig>,
822    create_if_missing: bool,
823    fail_if_exists: bool,
824    read_only: bool,
825    write_lease: Option<multi_agent::WriteLease>,
826}
827
828impl UniBuilder {
829    /// Creates a new builder for the given URI.
830    pub fn new(uri: String) -> Self {
831        Self {
832            uri,
833            config: UniConfig::default(),
834            schema_file: None,
835            xervo_catalog: None,
836            hybrid_remote_url: None,
837            cloud_config: None,
838            create_if_missing: true,
839            fail_if_exists: false,
840            read_only: false,
841            write_lease: None,
842        }
843    }
844
845    /// Load schema from JSON file on initialization.
846    pub fn schema_file(mut self, path: impl AsRef<Path>) -> Self {
847        self.schema_file = Some(path.as_ref().to_path_buf());
848        self
849    }
850
851    /// Set Uni-Xervo catalog explicitly.
852    pub fn xervo_catalog(mut self, catalog: Vec<ModelAliasSpec>) -> Self {
853        self.xervo_catalog = Some(catalog);
854        self
855    }
856
857    /// Configure remote storage for data, keeping local path for WAL/IDs.
858    ///
859    /// # Examples
860    ///
861    /// ```ignore
862    /// use uni_common::CloudStorageConfig;
863    ///
864    /// let config = CloudStorageConfig::S3 {
865    ///     bucket: "my-bucket".to_string(),
866    ///     region: Some("us-east-1".to_string()),
867    ///     endpoint: None,
868    ///     access_key_id: None,
869    ///     secret_access_key: None,
870    ///     session_token: None,
871    ///     virtual_hosted_style: false,
872    /// };
873    ///
874    /// let db = Uni::open("./local_meta")
875    ///     .remote_storage("s3://my-bucket/graph-data", config)
876    ///     .build()
877    ///     .await?;
878    /// ```
879    pub fn remote_storage(mut self, remote_url: &str, config: CloudStorageConfig) -> Self {
880        self.hybrid_remote_url = Some(remote_url.to_string());
881        self.cloud_config = Some(config);
882        self
883    }
884
885    /// Open the database in read-only mode.
886    ///
887    /// In read-only mode, no writer is created. All write operations
888    /// (`tx()`, `execute()`, `bulk_writer()`, `appender()`) will return
889    /// `ReadOnly` errors. Reads work normally.
890    pub fn read_only(mut self) -> Self {
891        self.read_only = true;
892        self
893    }
894
895    /// Set the write lease strategy for multi-agent access.
896    ///
897    /// This configures how write access is coordinated when multiple
898    /// processes share the same database.
899    pub fn write_lease(mut self, lease: multi_agent::WriteLease) -> Self {
900        self.write_lease = Some(lease);
901        self
902    }
903
904    /// Configure database options using `UniConfig`.
905    pub fn config(mut self, config: UniConfig) -> Self {
906        self.config = config;
907        self
908    }
909
910    /// Open the database (async).
911    pub async fn build(self) -> Result<Uni> {
912        let uri = self.uri.clone();
913        let is_remote_uri = uri.contains("://");
914        let is_hybrid = self.hybrid_remote_url.is_some();
915
916        if is_hybrid && is_remote_uri {
917            return Err(UniError::Internal(anyhow::anyhow!(
918                "Hybrid mode requires a local path as primary URI, found: {}",
919                uri
920            )));
921        }
922
923        let (storage_uri, data_store, local_store_opt) = if is_hybrid {
924            let remote_url = self.hybrid_remote_url.as_ref().unwrap();
925
926            // Remote Store (Data) - use explicit cloud_config if provided
927            let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
928                build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
929            } else {
930                let url = url::Url::parse(remote_url).map_err(|e| {
931                    UniError::Io(std::io::Error::new(
932                        std::io::ErrorKind::InvalidInput,
933                        e.to_string(),
934                    ))
935                })?;
936                let (os, _path) =
937                    object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
938                Arc::from(os)
939            };
940
941            // Local Store (WAL, IDs)
942            let path = PathBuf::from(&uri);
943            if path.exists() {
944                if self.fail_if_exists {
945                    return Err(UniError::Internal(anyhow::anyhow!(
946                        "Database already exists at {}",
947                        uri
948                    )));
949                }
950            } else {
951                if !self.create_if_missing {
952                    return Err(UniError::NotFound { path: path.clone() });
953                }
954                std::fs::create_dir_all(&path).map_err(UniError::Io)?;
955            }
956
957            let local_store = Arc::new(
958                LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
959            );
960
961            // For hybrid, storage_uri is the remote URL (since StorageManager loads datasets from there)
962            // But we must provide the correct store to other components manually.
963            (
964                remote_url.clone(),
965                remote_store,
966                Some(local_store as Arc<dyn ObjectStore>),
967            )
968        } else if is_remote_uri {
969            // Remote Only - use explicit cloud_config if provided
970            let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
971                build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
972            } else {
973                let url = url::Url::parse(&uri).map_err(|e| {
974                    UniError::Io(std::io::Error::new(
975                        std::io::ErrorKind::InvalidInput,
976                        e.to_string(),
977                    ))
978                })?;
979                let (os, _path) =
980                    object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
981                Arc::from(os)
982            };
983
984            (uri.clone(), remote_store, None)
985        } else {
986            // Local Only
987            let path = PathBuf::from(&uri);
988            let storage_path = path.join("storage");
989
990            if path.exists() {
991                if self.fail_if_exists {
992                    return Err(UniError::Internal(anyhow::anyhow!(
993                        "Database already exists at {}",
994                        uri
995                    )));
996                }
997            } else {
998                if !self.create_if_missing {
999                    return Err(UniError::NotFound { path: path.clone() });
1000                }
1001                std::fs::create_dir_all(&path).map_err(UniError::Io)?;
1002            }
1003
1004            // Ensure storage directory exists
1005            if !storage_path.exists() {
1006                std::fs::create_dir_all(&storage_path).map_err(UniError::Io)?;
1007            }
1008
1009            let store = Arc::new(
1010                LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
1011            );
1012            (
1013                storage_path.to_string_lossy().to_string(),
1014                store.clone() as Arc<dyn ObjectStore>,
1015                Some(store as Arc<dyn ObjectStore>),
1016            )
1017        };
1018
1019        // Canonical schema location in metadata catalog.
1020        let schema_obj_path = object_store::path::Path::from("catalog/schema.json");
1021        // Legacy schema location used by older builds.
1022        let legacy_schema_obj_path = object_store::path::Path::from("schema.json");
1023
1024        // Backward-compatible schema path migration:
1025        // if catalog/schema.json is missing but root schema.json exists,
1026        // copy root schema.json to catalog/schema.json.
1027        let has_catalog_schema = match data_store.get(&schema_obj_path).await {
1028            Ok(_) => true,
1029            Err(object_store::Error::NotFound { .. }) => false,
1030            Err(e) => return Err(UniError::Internal(e.into())),
1031        };
1032        if !has_catalog_schema {
1033            match data_store.get(&legacy_schema_obj_path).await {
1034                Ok(result) => {
1035                    let bytes = result
1036                        .bytes()
1037                        .await
1038                        .map_err(|e| UniError::Internal(e.into()))?;
1039                    data_store
1040                        .put(&schema_obj_path, bytes.into())
1041                        .await
1042                        .map_err(|e| UniError::Internal(e.into()))?;
1043                    info!(
1044                        legacy = %legacy_schema_obj_path,
1045                        target = %schema_obj_path,
1046                        "Migrated legacy schema path to catalog path"
1047                    );
1048                }
1049                Err(object_store::Error::NotFound { .. }) => {}
1050                Err(e) => return Err(UniError::Internal(e.into())),
1051            }
1052        }
1053
1054        // Load schema (SchemaManager::load creates a default if missing)
1055        // Schema is always in data_store (Remote or Local)
1056        let schema_manager = Arc::new(
1057            SchemaManager::load_from_store(data_store.clone(), &schema_obj_path)
1058                .await
1059                .map_err(UniError::Internal)?,
1060        );
1061
1062        let lancedb_storage_options = self
1063            .cloud_config
1064            .as_ref()
1065            .map(Self::cloud_config_to_lancedb_storage_options);
1066
1067        let storage = if is_hybrid || is_remote_uri {
1068            // Preserve explicit cloud settings (endpoint, credentials, path style)
1069            // by reusing the constructed remote store.
1070            StorageManager::new_with_store_and_storage_options(
1071                &storage_uri,
1072                data_store.clone(),
1073                schema_manager.clone(),
1074                self.config.clone(),
1075                lancedb_storage_options.clone(),
1076            )
1077            .await
1078            .map_err(UniError::Internal)?
1079        } else {
1080            // Local mode keeps using a storage-path-scoped local store.
1081            StorageManager::new_with_config(
1082                &storage_uri,
1083                schema_manager.clone(),
1084                self.config.clone(),
1085            )
1086            .await
1087            .map_err(UniError::Internal)?
1088        };
1089
1090        let storage = Arc::new(storage);
1091
1092        // Create shutdown handle
1093        let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
1094
1095        // Start background compaction with shutdown signal
1096        let compaction_handle = storage
1097            .clone()
1098            .start_background_compaction(shutdown_handle.subscribe());
1099        shutdown_handle.track_task(compaction_handle);
1100
1101        // Initialize property manager
1102        let prop_cache_capacity = self.config.cache_size / 1024;
1103
1104        let prop_manager = Arc::new(PropertyManager::new(
1105            storage.clone(),
1106            schema_manager.clone(),
1107            prop_cache_capacity,
1108        ));
1109
1110        // Setup stores for WAL and IdAllocator (needed for version recovery check)
1111        let id_store = local_store_opt
1112            .clone()
1113            .unwrap_or_else(|| data_store.clone());
1114        let wal_store = local_store_opt
1115            .clone()
1116            .unwrap_or_else(|| data_store.clone());
1117
1118        // Determine start version and WAL high water mark from latest snapshot.
1119        // Detects and recovers from a lost manifest pointer.
1120        let latest_snapshot = storage
1121            .snapshot_manager()
1122            .load_latest_snapshot()
1123            .await
1124            .map_err(UniError::Internal)?;
1125
1126        let (start_version, wal_high_water_mark) = if let Some(ref snapshot) = latest_snapshot {
1127            (
1128                snapshot.version_high_water_mark + 1,
1129                snapshot.wal_high_water_mark,
1130            )
1131        } else {
1132            // No latest snapshot — fresh DB or lost manifest?
1133            let has_manifests = storage
1134                .snapshot_manager()
1135                .has_any_manifests()
1136                .await
1137                .unwrap_or(false);
1138
1139            let wal_check =
1140                WriteAheadLog::new(wal_store.clone(), object_store::path::Path::from("wal"));
1141            let has_wal = wal_check.has_segments().await.unwrap_or(false);
1142
1143            if has_manifests {
1144                // Manifests exist but latest pointer is missing — try to recover from manifests
1145                let snapshot_ids = storage
1146                    .snapshot_manager()
1147                    .list_snapshots()
1148                    .await
1149                    .map_err(UniError::Internal)?;
1150                if let Some(last_id) = snapshot_ids.last() {
1151                    let manifest = storage
1152                        .snapshot_manager()
1153                        .load_snapshot(last_id)
1154                        .await
1155                        .map_err(UniError::Internal)?;
1156                    tracing::warn!(
1157                        "Latest snapshot pointer missing but found manifest '{}'. \
1158                         Recovering version {}.",
1159                        last_id,
1160                        manifest.version_high_water_mark
1161                    );
1162                    (
1163                        manifest.version_high_water_mark + 1,
1164                        manifest.wal_high_water_mark,
1165                    )
1166                } else {
1167                    return Err(UniError::Internal(anyhow::anyhow!(
1168                        "Snapshot manifests directory exists but contains no valid manifests. \
1169                         Possible data corruption."
1170                    )));
1171                }
1172            } else if has_wal {
1173                // WAL exists but no manifests at all — data exists but unrecoverable version
1174                return Err(UniError::Internal(anyhow::anyhow!(
1175                    "Database has WAL segments but no snapshot manifest. \
1176                     Cannot safely determine version counter -- starting at 0 would cause \
1177                     version conflicts and data corruption. \
1178                     Restore the snapshot manifest or delete WAL to start fresh."
1179                )));
1180            } else {
1181                // Truly fresh database
1182                (0, 0)
1183            }
1184        };
1185
1186        let allocator = Arc::new(
1187            IdAllocator::new(
1188                id_store,
1189                object_store::path::Path::from("id_allocator.json"),
1190                1000,
1191            )
1192            .await
1193            .map_err(UniError::Internal)?,
1194        );
1195
1196        let wal = if !self.config.wal_enabled {
1197            // WAL disabled by config
1198            None
1199        } else if is_remote_uri && !is_hybrid {
1200            // Remote-only WAL (ObjectStoreWal)
1201            Some(Arc::new(WriteAheadLog::new(
1202                wal_store,
1203                object_store::path::Path::from("wal"),
1204            )))
1205        } else if is_hybrid || !is_remote_uri {
1206            // Local WAL (using local_store)
1207            // Even if local_store uses ObjectStore trait, it maps to FS.
1208            Some(Arc::new(WriteAheadLog::new(
1209                wal_store,
1210                object_store::path::Path::from("wal"),
1211            )))
1212        } else {
1213            None
1214        };
1215
1216        let writer = Arc::new(RwLock::new(
1217            Writer::new_with_config(
1218                storage.clone(),
1219                schema_manager.clone(),
1220                start_version,
1221                self.config.clone(),
1222                wal,
1223                Some(allocator),
1224            )
1225            .await
1226            .map_err(UniError::Internal)?,
1227        ));
1228
1229        let required_embed_aliases: std::collections::BTreeSet<String> = schema_manager
1230            .schema()
1231            .indexes
1232            .iter()
1233            .filter_map(|idx| {
1234                if let uni_common::core::schema::IndexDefinition::Vector(cfg) = idx {
1235                    cfg.embedding_config.as_ref().map(|emb| emb.alias.clone())
1236                } else {
1237                    None
1238                }
1239            })
1240            .collect();
1241
1242        if !required_embed_aliases.is_empty() && self.xervo_catalog.is_none() {
1243            return Err(UniError::Internal(anyhow::anyhow!(
1244                "Uni-Xervo catalog is required because schema has vector indexes with embedding aliases"
1245            )));
1246        }
1247
1248        let xervo_runtime = if let Some(catalog) = self.xervo_catalog {
1249            for alias in &required_embed_aliases {
1250                let spec = catalog.iter().find(|s| &s.alias == alias).ok_or_else(|| {
1251                    UniError::Internal(anyhow::anyhow!(
1252                        "Missing Uni-Xervo alias '{}' referenced by vector index embedding config",
1253                        alias
1254                    ))
1255                })?;
1256                if spec.task != ModelTask::Embed {
1257                    return Err(UniError::Internal(anyhow::anyhow!(
1258                        "Uni-Xervo alias '{}' must be an embed task",
1259                        alias
1260                    )));
1261                }
1262            }
1263
1264            let mut runtime_builder = ModelRuntime::builder().catalog(catalog);
1265            #[cfg(feature = "provider-candle")]
1266            {
1267                runtime_builder = runtime_builder
1268                    .register_provider(uni_xervo::provider::LocalCandleProvider::new());
1269            }
1270            #[cfg(feature = "provider-fastembed")]
1271            {
1272                runtime_builder = runtime_builder
1273                    .register_provider(uni_xervo::provider::LocalFastEmbedProvider::new());
1274            }
1275            #[cfg(feature = "provider-openai")]
1276            {
1277                runtime_builder = runtime_builder
1278                    .register_provider(uni_xervo::provider::RemoteOpenAIProvider::new());
1279            }
1280            #[cfg(feature = "provider-gemini")]
1281            {
1282                runtime_builder = runtime_builder
1283                    .register_provider(uni_xervo::provider::RemoteGeminiProvider::new());
1284            }
1285            #[cfg(feature = "provider-vertexai")]
1286            {
1287                runtime_builder = runtime_builder
1288                    .register_provider(uni_xervo::provider::RemoteVertexAIProvider::new());
1289            }
1290            #[cfg(feature = "provider-mistral")]
1291            {
1292                runtime_builder = runtime_builder
1293                    .register_provider(uni_xervo::provider::RemoteMistralProvider::new());
1294            }
1295            #[cfg(feature = "provider-anthropic")]
1296            {
1297                runtime_builder = runtime_builder
1298                    .register_provider(uni_xervo::provider::RemoteAnthropicProvider::new());
1299            }
1300            #[cfg(feature = "provider-voyageai")]
1301            {
1302                runtime_builder = runtime_builder
1303                    .register_provider(uni_xervo::provider::RemoteVoyageAIProvider::new());
1304            }
1305            #[cfg(feature = "provider-cohere")]
1306            {
1307                runtime_builder = runtime_builder
1308                    .register_provider(uni_xervo::provider::RemoteCohereProvider::new());
1309            }
1310            #[cfg(feature = "provider-azure-openai")]
1311            {
1312                runtime_builder = runtime_builder
1313                    .register_provider(uni_xervo::provider::RemoteAzureOpenAIProvider::new());
1314            }
1315            #[cfg(feature = "provider-mistralrs")]
1316            {
1317                runtime_builder = runtime_builder
1318                    .register_provider(uni_xervo::provider::LocalMistralRsProvider::new());
1319            }
1320
1321            Some(
1322                runtime_builder
1323                    .build()
1324                    .await
1325                    .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?,
1326            )
1327        } else {
1328            None
1329        };
1330
1331        if let Some(ref runtime) = xervo_runtime {
1332            let mut writer_guard = writer.write().await;
1333            writer_guard.set_xervo_runtime(runtime.clone());
1334        }
1335
1336        // Replay WAL to restore any uncommitted mutations from previous session
1337        // Only replay mutations with LSN > wal_high_water_mark to avoid double-applying
1338        {
1339            let w = writer.read().await;
1340            let replayed = w
1341                .replay_wal(wal_high_water_mark)
1342                .await
1343                .map_err(UniError::Internal)?;
1344            if replayed > 0 {
1345                info!("WAL recovery: replayed {} mutations", replayed);
1346            }
1347        }
1348
1349        // Wire up IndexRebuildManager for post-flush automatic rebuild scheduling
1350        if self.config.index_rebuild.auto_rebuild_enabled {
1351            let rebuild_manager = Arc::new(
1352                uni_store::storage::IndexRebuildManager::new(
1353                    storage.clone(),
1354                    schema_manager.clone(),
1355                    self.config.index_rebuild.clone(),
1356                )
1357                .await
1358                .map_err(UniError::Internal)?,
1359            );
1360
1361            let handle = rebuild_manager
1362                .clone()
1363                .start_background_worker(shutdown_handle.subscribe());
1364            shutdown_handle.track_task(handle);
1365
1366            {
1367                let mut writer_guard = writer.write().await;
1368                writer_guard.set_index_rebuild_manager(rebuild_manager);
1369            }
1370        }
1371
1372        // Start background flush checker for time-based auto-flush
1373        if let Some(interval) = self.config.auto_flush_interval {
1374            let writer_clone = writer.clone();
1375            let mut shutdown_rx = shutdown_handle.subscribe();
1376
1377            let handle = tokio::spawn(async move {
1378                let mut ticker = tokio::time::interval(interval);
1379                loop {
1380                    tokio::select! {
1381                        _ = ticker.tick() => {
1382                            let mut w = writer_clone.write().await;
1383                            if let Err(e) = w.check_flush().await {
1384                                tracing::warn!("Background flush check failed: {}", e);
1385                            }
1386                        }
1387                        _ = shutdown_rx.recv() => {
1388                            tracing::info!("Auto-flush shutting down, performing final flush");
1389                            let mut w = writer_clone.write().await;
1390                            let _ = w.flush_to_l1(None).await;
1391                            break;
1392                        }
1393                    }
1394                }
1395            });
1396
1397            shutdown_handle.track_task(handle);
1398        }
1399
1400        let (commit_tx, _) = tokio::sync::broadcast::channel(256);
1401        let writer_field = if self.read_only { None } else { Some(writer) };
1402
1403        Ok(Uni {
1404            inner: Arc::new(UniInner {
1405                storage,
1406                schema: schema_manager,
1407                properties: prop_manager,
1408                writer: writer_field,
1409                xervo_runtime,
1410                config: self.config,
1411                procedure_registry: Arc::new(uni_query::ProcedureRegistry::new()),
1412                shutdown_handle,
1413                locy_rule_registry: Arc::new(std::sync::RwLock::new(
1414                    impl_locy::LocyRuleRegistry::default(),
1415                )),
1416                start_time: Instant::now(),
1417                commit_tx,
1418                write_lease: self.write_lease,
1419                active_session_count: AtomicUsize::new(0),
1420                total_queries: AtomicU64::new(0),
1421                total_commits: AtomicU64::new(0),
1422                custom_functions: Arc::new(std::sync::RwLock::new(
1423                    uni_query::CustomFunctionRegistry::new(),
1424                )),
1425                cached_l0_mutation_count: AtomicUsize::new(0),
1426                cached_l0_estimated_size: AtomicUsize::new(0),
1427                cached_wal_lsn: AtomicU64::new(0),
1428            }),
1429        })
1430    }
1431
1432    /// Open the database (blocking)
1433    pub fn build_sync(self) -> Result<Uni> {
1434        let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
1435        rt.block_on(self.build())
1436    }
1437
1438    fn cloud_config_to_lancedb_storage_options(
1439        config: &CloudStorageConfig,
1440    ) -> std::collections::HashMap<String, String> {
1441        let mut opts = std::collections::HashMap::new();
1442
1443        match config {
1444            CloudStorageConfig::S3 {
1445                bucket,
1446                region,
1447                endpoint,
1448                access_key_id,
1449                secret_access_key,
1450                session_token,
1451                virtual_hosted_style,
1452            } => {
1453                opts.insert("bucket".to_string(), bucket.clone());
1454                opts.insert(
1455                    "virtual_hosted_style_request".to_string(),
1456                    virtual_hosted_style.to_string(),
1457                );
1458
1459                if let Some(r) = region {
1460                    opts.insert("region".to_string(), r.clone());
1461                }
1462                if let Some(ep) = endpoint {
1463                    opts.insert("endpoint".to_string(), ep.clone());
1464                    if ep.starts_with("http://") {
1465                        opts.insert("allow_http".to_string(), "true".to_string());
1466                    }
1467                }
1468                if let Some(v) = access_key_id {
1469                    opts.insert("access_key_id".to_string(), v.clone());
1470                }
1471                if let Some(v) = secret_access_key {
1472                    opts.insert("secret_access_key".to_string(), v.clone());
1473                }
1474                if let Some(v) = session_token {
1475                    opts.insert("session_token".to_string(), v.clone());
1476                }
1477            }
1478            CloudStorageConfig::Gcs {
1479                bucket,
1480                service_account_path,
1481                service_account_key,
1482            } => {
1483                opts.insert("bucket".to_string(), bucket.clone());
1484                if let Some(v) = service_account_path {
1485                    opts.insert("service_account".to_string(), v.clone());
1486                    opts.insert("application_credentials".to_string(), v.clone());
1487                }
1488                if let Some(v) = service_account_key {
1489                    opts.insert("service_account_key".to_string(), v.clone());
1490                }
1491            }
1492            CloudStorageConfig::Azure {
1493                container,
1494                account,
1495                access_key,
1496                sas_token,
1497            } => {
1498                opts.insert("account_name".to_string(), account.clone());
1499                opts.insert("container_name".to_string(), container.clone());
1500                if let Some(v) = access_key {
1501                    opts.insert("access_key".to_string(), v.clone());
1502                }
1503                if let Some(v) = sas_token {
1504                    opts.insert("sas_token".to_string(), v.clone());
1505                }
1506            }
1507        }
1508
1509        opts
1510    }
1511}