Skip to main content

uni_db/api/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8use tempfile::TempDir;
9
10pub mod appender;
11pub mod builder;
12pub mod bulk;
13pub mod compaction;
14pub mod functions;
15pub mod hooks;
16pub mod impl_locy;
17pub mod impl_query;
18pub mod indexes;
19pub mod locy_builder;
20pub mod locy_result;
21pub mod multi_agent;
22pub mod notifications;
23pub mod prepared;
24pub mod query_builder;
25pub mod rule_registry;
26pub mod schema;
27pub mod session;
28pub mod sync;
29pub mod template;
30pub mod transaction;
31pub mod xervo;
32
33use object_store::ObjectStore;
34use object_store::local::LocalFileSystem;
35use tracing::info;
36use uni_common::core::snapshot::SnapshotManifest;
37use uni_common::{CloudStorageConfig, UniConfig};
38use uni_common::{Result, UniError};
39use uni_store::cloud::build_cloud_store;
40use uni_xervo::api::{ModelAliasSpec, ModelTask};
41use uni_xervo::runtime::ModelRuntime;
42
43use uni_common::core::schema::SchemaManager;
44use uni_store::runtime::id_allocator::IdAllocator;
45use uni_store::runtime::property_manager::PropertyManager;
46use uni_store::runtime::wal::WriteAheadLog;
47use uni_store::storage::manager::StorageManager;
48
49use tokio::sync::RwLock;
50use uni_store::runtime::writer::Writer;
51
52use crate::shutdown::ShutdownHandle;
53
54use std::collections::HashMap;
55
56/// Shared inner state of a Uni database instance.
57///
58/// Wrapped in `Arc` by [`Uni`] so that [`Session`](session::Session) and
59/// [`Transaction`](transaction::Transaction) can hold cheap, owned references
60/// without lifetime parameters.
61/// Shared inner state of a Uni database instance. Not intended for direct use.
62#[doc(hidden)]
63pub struct UniInner {
64    pub(crate) storage: Arc<StorageManager>,
65    pub(crate) schema: Arc<SchemaManager>,
66    pub(crate) properties: Arc<PropertyManager>,
67    pub(crate) writer: Option<Arc<RwLock<Writer>>>,
68    pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
69    pub(crate) config: UniConfig,
70    pub(crate) procedure_registry: Arc<uni_query::ProcedureRegistry>,
71    pub(crate) shutdown_handle: Arc<ShutdownHandle>,
72    /// Global registry of pre-compiled Locy rules.
73    ///
74    /// Cloned into every new Session. Use `db.register_rules()` to add rules
75    /// globally, or `session.register_rules()` for session-scoped rules.
76    pub(crate) locy_rule_registry: Arc<std::sync::RwLock<impl_locy::LocyRuleRegistry>>,
77    /// Timestamp when this database instance was built.
78    pub(crate) start_time: Instant,
79    /// Broadcast channel for commit notifications.
80    pub(crate) commit_tx: tokio::sync::broadcast::Sender<Arc<notifications::CommitNotification>>,
81    /// Write lease configuration for multi-agent access.
82    pub(crate) write_lease: Option<multi_agent::WriteLease>,
83    /// Number of currently active sessions.
84    pub(crate) active_session_count: AtomicUsize,
85    /// Total queries executed across all sessions.
86    pub(crate) total_queries: AtomicU64,
87    /// Total transactions committed across all sessions.
88    pub(crate) total_commits: AtomicU64,
89    /// Database-level registry of custom scalar functions.
90    pub(crate) custom_functions: Arc<std::sync::RwLock<uni_query::CustomFunctionRegistry>>,
91
92    // ── Cached metrics (updated on commit, read by sync `metrics()`) ─────
93    /// Cached L0 mutation count (updated after every commit).
94    pub(crate) cached_l0_mutation_count: AtomicUsize,
95    /// Cached L0 estimated size in bytes (updated after every commit).
96    pub(crate) cached_l0_estimated_size: AtomicUsize,
97    /// Cached WAL log sequence number (updated after every commit).
98    pub(crate) cached_wal_lsn: AtomicU64,
99    /// Temp directory guard — auto-deletes on drop. Only set for `Uni::temporary()`.
100    pub(crate) _temp_dir: Option<TempDir>,
101}
102
103/// Write throttle pressure as a value in 0.0–1.0.
104///
105/// Indicates how much back-pressure the storage layer is exerting.
106/// 0.0 means no throttling; 1.0 means fully throttled.
107#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
108pub struct ThrottlePressure(f64);
109
110impl ThrottlePressure {
111    /// Create a new throttle pressure value, clamped to 0.0–1.0.
112    pub fn new(value: f64) -> Self {
113        Self(value.clamp(0.0, 1.0))
114    }
115
116    /// The raw pressure value (0.0–1.0).
117    pub fn value(&self) -> f64 {
118        self.0
119    }
120
121    /// Returns `true` if any throttle pressure is active.
122    pub fn is_throttled(&self) -> bool {
123        self.0 > 0.0
124    }
125}
126
127impl std::fmt::Display for ThrottlePressure {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        write!(f, "{:.1}%", self.0 * 100.0)
130    }
131}
132
133impl Default for ThrottlePressure {
134    fn default() -> Self {
135        Self(0.0)
136    }
137}
138
139/// Snapshot of database-level metrics.
140#[derive(Debug, Clone)]
141pub struct DatabaseMetrics {
142    /// Current L0 mutation count (cumulative since last flush).
143    pub l0_mutation_count: usize,
144    /// Estimated L0 buffer size in bytes.
145    pub l0_estimated_size_bytes: usize,
146    /// Schema version number.
147    pub schema_version: u64,
148    /// Time since the database instance was created.
149    pub uptime: Duration,
150    /// Number of currently active sessions.
151    pub active_sessions: usize,
152    /// Number of L1 compaction runs completed (0 until storage instrumentation).
153    pub l1_run_count: usize,
154    /// Write throttle pressure (0.0–1.0, 0 until instrumentation).
155    pub write_throttle_pressure: ThrottlePressure,
156    /// Current compaction status.
157    pub compaction_status: uni_store::CompactionStatus,
158    /// WAL size in bytes (0 until storage instrumentation).
159    pub wal_size_bytes: u64,
160    /// Highest WAL log sequence number that has been flushed (0 when no WAL is configured).
161    pub wal_lsn: u64,
162    /// Total queries executed across all sessions.
163    pub total_queries: u64,
164    /// Total transactions committed across all sessions.
165    pub total_commits: u64,
166}
167
168/// Main entry point for Uni embedded database.
169///
170/// `Uni` is the lifecycle and admin handle. All data access goes through
171/// [`Session`](session::Session) (reads) and [`Transaction`](transaction::Transaction) (writes).
172///
173/// # Examples
174///
175/// ```no_run
176/// use uni_db::Uni;
177///
178/// #[tokio::main]
179/// async fn main() -> Result<(), uni_db::UniError> {
180///     let db = Uni::open("./my_db").build().await?;
181///
182///     // All data access goes through sessions
183///     let session = db.session();
184///     let results = session.query("MATCH (n) RETURN count(n)").await?;
185///     println!("Count: {:?}", results);
186///     Ok(())
187/// }
188/// ```
189pub struct Uni {
190    pub(crate) inner: Arc<UniInner>,
191}
192
193// No Deref<Target = UniInner> — Uni is an opaque handle.
194// All field access goes through `self.inner.field` explicitly.
195
196impl UniInner {
197    /// Open a point-in-time view of the database at the given snapshot.
198    ///
199    /// Returns a new `UniInner` that is pinned to the specified snapshot state.
200    /// The returned instance is read-only.
201    pub(crate) async fn at_snapshot(&self, snapshot_id: &str) -> Result<UniInner> {
202        let manifest = self
203            .storage
204            .snapshot_manager()
205            .load_snapshot(snapshot_id)
206            .await
207            .map_err(UniError::Internal)?;
208
209        let pinned_storage = Arc::new(self.storage.pinned(manifest));
210
211        let prop_manager = Arc::new(PropertyManager::new(
212            pinned_storage.clone(),
213            self.schema.clone(),
214            self.properties.cache_size(),
215        ));
216
217        let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
218
219        let (commit_tx, _) = tokio::sync::broadcast::channel(256);
220        Ok(UniInner {
221            storage: pinned_storage,
222            schema: self.schema.clone(),
223            properties: prop_manager,
224            writer: None,
225            xervo_runtime: self.xervo_runtime.clone(),
226            config: self.config.clone(),
227            procedure_registry: self.procedure_registry.clone(),
228            shutdown_handle,
229            locy_rule_registry: Arc::new(std::sync::RwLock::new(
230                impl_locy::LocyRuleRegistry::default(),
231            )),
232            start_time: Instant::now(),
233            commit_tx,
234            write_lease: None,
235            active_session_count: AtomicUsize::new(0),
236            total_queries: AtomicU64::new(0),
237            total_commits: AtomicU64::new(0),
238            custom_functions: self.custom_functions.clone(),
239            cached_l0_mutation_count: AtomicUsize::new(0),
240            cached_l0_estimated_size: AtomicUsize::new(0),
241            cached_wal_lsn: AtomicU64::new(0),
242            _temp_dir: None,
243        })
244    }
245}
246
247impl Uni {
248    /// Open or create a database at the given path.
249    ///
250    /// If the database does not exist, it will be created.
251    ///
252    /// # Arguments
253    ///
254    /// * `uri` - Local path or object store URI.
255    ///
256    /// # Returns
257    ///
258    /// A [`UniBuilder`] to configure and build the database instance.
259    pub fn open(uri: impl Into<String>) -> UniBuilder {
260        UniBuilder::new(uri.into())
261    }
262
263    /// Open an existing database at the given path. Fails if it does not exist.
264    pub fn open_existing(uri: impl Into<String>) -> UniBuilder {
265        let mut builder = UniBuilder::new(uri.into());
266        builder.create_if_missing = false;
267        builder
268    }
269
270    /// Create a new database at the given path. Fails if it already exists.
271    pub fn create(uri: impl Into<String>) -> UniBuilder {
272        let mut builder = UniBuilder::new(uri.into());
273        builder.fail_if_exists = true;
274        builder
275    }
276
277    /// Create a temporary database that is deleted when dropped.
278    ///
279    /// Useful for tests and short-lived processing.
280    /// The underlying directory is automatically cleaned up when the `Uni` is dropped.
281    pub fn temporary() -> UniBuilder {
282        let temp_dir = tempfile::Builder::new()
283            .prefix("uni_mem_")
284            .tempdir()
285            .expect("failed to create temporary directory");
286        let uri = temp_dir.path().to_string_lossy().to_string();
287        let mut builder = UniBuilder::new(uri);
288        builder.temp_dir = Some(temp_dir);
289        builder
290    }
291
292    /// Open an in-memory database (alias for temporary).
293    pub fn in_memory() -> UniBuilder {
294        Self::temporary()
295    }
296
297    // ── Session Factory (primary entry point for data access) ────────
298
299    /// Create a new Session for data access.
300    ///
301    /// Sessions are cheap, synchronous, and infallible. All reads go through
302    /// sessions, and sessions are the factory for transactions (writes).
303    ///
304    /// # Examples
305    ///
306    /// ```no_run
307    /// # use uni_db::Uni;
308    /// # async fn example(db: &Uni) -> uni_db::Result<()> {
309    /// let session = db.session();
310    /// let rows = session.query("MATCH (n) RETURN n LIMIT 10").await?;
311    /// # Ok(())
312    /// # }
313    /// ```
314    pub fn session(&self) -> session::Session {
315        session::Session::new(self.inner.clone())
316    }
317
318    /// Create a session template builder for pre-configured session factories.
319    ///
320    /// Templates pre-compile Locy rules, bind parameters, and attach hooks
321    /// once, then cheaply stamp out sessions per-request.
322    pub fn session_template(&self) -> template::SessionTemplateBuilder {
323        template::SessionTemplateBuilder::new(self.inner.clone())
324    }
325
326    // ── Database Metrics ──────────────────────────────────────────────
327
328    /// Snapshot the database-level metrics.
329    ///
330    /// This is a cheap, synchronous read of cached atomic values.
331    /// L0 metrics (`l0_mutation_count`, `l0_estimated_size_bytes`, `wal_lsn`)
332    /// reflect the state as of the last successful commit.
333    pub fn metrics(&self) -> DatabaseMetrics {
334        let schema_version = self.inner.schema.schema().schema_version as u64;
335        let compaction_status = self.inner.storage.compaction_status().unwrap_or_default();
336        DatabaseMetrics {
337            l0_mutation_count: self.inner.cached_l0_mutation_count.load(Ordering::Relaxed),
338            l0_estimated_size_bytes: self.inner.cached_l0_estimated_size.load(Ordering::Relaxed),
339            schema_version,
340            uptime: self.inner.start_time.elapsed(),
341            active_sessions: self.inner.active_session_count.load(Ordering::Relaxed),
342            l1_run_count: compaction_status.l1_runs,
343            write_throttle_pressure: ThrottlePressure::default(),
344            compaction_status,
345            wal_size_bytes: 0u64,
346            wal_lsn: self.inner.cached_wal_lsn.load(Ordering::Relaxed),
347            total_queries: self.inner.total_queries.load(Ordering::Relaxed),
348            total_commits: self.inner.total_commits.load(Ordering::Relaxed),
349        }
350    }
351
352    /// Returns the write lease configuration, if any.
353    /// Write lease enforcement is Phase 2.
354    pub fn write_lease(&self) -> Option<&multi_agent::WriteLease> {
355        self.inner.write_lease.as_ref()
356    }
357
358    // ── Global Locy Rule Management ───────────────────────────────────
359
360    /// Access the global rule registry for managing pre-compiled Locy rules.
361    ///
362    /// Rules registered here are cloned into every new Session.
363    pub fn rules(&self) -> rule_registry::RuleRegistry<'_> {
364        rule_registry::RuleRegistry::new(&self.inner.locy_rule_registry)
365    }
366
367    // ── Configuration & Introspection ─────────────────────────────────
368
369    /// Get configuration.
370    pub fn config(&self) -> &UniConfig {
371        &self.inner.config
372    }
373
374    /// Returns the procedure registry for registering test procedures.
375    #[doc(hidden)]
376    pub fn procedure_registry(&self) -> &Arc<uni_query::ProcedureRegistry> {
377        &self.inner.procedure_registry
378    }
379
380    /// Get schema manager.
381    #[doc(hidden)]
382    pub fn schema_manager(&self) -> Arc<SchemaManager> {
383        self.inner.schema.clone()
384    }
385
386    #[doc(hidden)]
387    pub fn writer(&self) -> Option<Arc<RwLock<Writer>>> {
388        self.inner.writer.clone()
389    }
390
391    #[doc(hidden)]
392    pub fn storage(&self) -> Arc<StorageManager> {
393        self.inner.storage.clone()
394    }
395
396    /// Flush all uncommitted changes to persistent storage (L1).
397    ///
398    /// This forces a write of the current in-memory buffer (L0) to columnar files.
399    /// It also creates a new snapshot.
400    pub async fn flush(&self) -> Result<()> {
401        if let Some(writer_lock) = &self.inner.writer {
402            let mut writer = writer_lock.write().await;
403            writer
404                .flush_to_l1(None)
405                .await
406                .map(|_| ())
407                .map_err(UniError::Internal)
408        } else {
409            Err(UniError::ReadOnly {
410                operation: "flush".to_string(),
411            })
412        }
413    }
414
415    /// Create a named point-in-time snapshot of the database.
416    ///
417    /// Flushes current changes, records the state, and persists the snapshot
418    /// under the given name so it can be retrieved later.
419    /// Returns the snapshot ID.
420    pub async fn create_snapshot(&self, name: &str) -> Result<String> {
421        if name.is_empty() {
422            return Err(UniError::Internal(anyhow::anyhow!(
423                "Snapshot name cannot be empty"
424            )));
425        }
426
427        let snapshot_id = if let Some(writer_lock) = &self.inner.writer {
428            let mut writer = writer_lock.write().await;
429            writer
430                .flush_to_l1(Some(name.to_string()))
431                .await
432                .map_err(UniError::Internal)?
433        } else {
434            return Err(UniError::ReadOnly {
435                operation: "create_snapshot".to_string(),
436            });
437        };
438
439        self.inner
440            .storage
441            .snapshot_manager()
442            .save_named_snapshot(name, &snapshot_id)
443            .await
444            .map_err(UniError::Internal)?;
445
446        Ok(snapshot_id)
447    }
448
449    /// List all available snapshots.
450    pub async fn list_snapshots(&self) -> Result<Vec<SnapshotManifest>> {
451        let sm = self.inner.storage.snapshot_manager();
452        let ids = sm.list_snapshots().await.map_err(UniError::Internal)?;
453        let mut manifests = Vec::new();
454        for id in ids {
455            if let Ok(m) = sm.load_snapshot(&id).await {
456                manifests.push(m);
457            }
458        }
459        Ok(manifests)
460    }
461
462    /// Restore the database to a specific snapshot.
463    ///
464    /// **Note**: This currently requires a restart or re-opening of Uni to fully take effect
465    /// as it only updates the latest pointer.
466    pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
467        self.inner
468            .storage
469            .snapshot_manager()
470            .set_latest_snapshot(snapshot_id)
471            .await
472            .map_err(UniError::Internal)
473    }
474
475    /// Check if a label exists in the schema.
476    pub async fn label_exists(&self, name: &str) -> Result<bool> {
477        Ok(self
478            .inner
479            .schema
480            .schema()
481            .labels
482            .get(name)
483            .is_some_and(|l| {
484                matches!(
485                    l.state,
486                    uni_common::core::schema::SchemaElementState::Active
487                )
488            }))
489    }
490
491    /// Check if an edge type exists in the schema.
492    pub async fn edge_type_exists(&self, name: &str) -> Result<bool> {
493        Ok(self
494            .inner
495            .schema
496            .schema()
497            .edge_types
498            .get(name)
499            .is_some_and(|e| {
500                matches!(
501                    e.state,
502                    uni_common::core::schema::SchemaElementState::Active
503                )
504            }))
505    }
506
507    /// Get all label names.
508    /// Returns the union of schema-registered labels (Active state) and labels
509    /// discovered from data (for schemaless mode where labels may not be in the
510    /// schema). This is consistent with `list_edge_types()` for schema labels
511    /// while also supporting schemaless workflows.
512    pub async fn list_labels(&self) -> Result<Vec<String>> {
513        let mut all_labels = std::collections::HashSet::new();
514
515        // Schema labels (covers schema-defined labels that may not have data yet)
516        for (name, label) in self.inner.schema.schema().labels.iter() {
517            if matches!(
518                label.state,
519                uni_common::core::schema::SchemaElementState::Active
520            ) {
521                all_labels.insert(name.clone());
522            }
523        }
524
525        // Data labels (covers schemaless labels that aren't in the schema)
526        let query = "MATCH (n) RETURN DISTINCT labels(n) AS labels";
527        let result = self.inner.execute_internal(query, HashMap::new()).await?;
528        for row in result.rows() {
529            if let Ok(labels_list) = row.get::<Vec<String>>("labels") {
530                for label in labels_list {
531                    all_labels.insert(label);
532                }
533            }
534        }
535
536        Ok(all_labels.into_iter().collect())
537    }
538
539    /// Get all edge type names.
540    pub async fn list_edge_types(&self) -> Result<Vec<String>> {
541        Ok(self
542            .inner
543            .schema
544            .schema()
545            .edge_types
546            .iter()
547            .filter(|(_, e)| {
548                matches!(
549                    e.state,
550                    uni_common::core::schema::SchemaElementState::Active
551                )
552            })
553            .map(|(name, _)| name.clone())
554            .collect())
555    }
556
557    /// Get detailed information about a label.
558    pub async fn get_label_info(
559        &self,
560        name: &str,
561    ) -> Result<Option<crate::api::schema::LabelInfo>> {
562        let schema = self.inner.schema.schema();
563        if schema.labels.contains_key(name) {
564            let count = if let Ok(ds) = self.inner.storage.vertex_dataset(name) {
565                if let Ok(raw) = ds.open_raw().await {
566                    raw.count_rows(None)
567                        .await
568                        .map_err(|e| UniError::Internal(anyhow::anyhow!(e)))?
569                } else {
570                    0
571                }
572            } else {
573                0
574            };
575
576            let mut properties = Vec::new();
577            if let Some(props) = schema.properties.get(name) {
578                for (prop_name, prop_meta) in props {
579                    let is_indexed = schema.indexes.iter().any(|idx| match idx {
580                        uni_common::core::schema::IndexDefinition::Vector(v) => {
581                            v.label == name && v.property == *prop_name
582                        }
583                        uni_common::core::schema::IndexDefinition::Scalar(s) => {
584                            s.label == name && s.properties.contains(prop_name)
585                        }
586                        uni_common::core::schema::IndexDefinition::FullText(f) => {
587                            f.label == name && f.properties.contains(prop_name)
588                        }
589                        uni_common::core::schema::IndexDefinition::Inverted(inv) => {
590                            inv.label == name && inv.property == *prop_name
591                        }
592                        uni_common::core::schema::IndexDefinition::JsonFullText(j) => {
593                            j.label == name
594                        }
595                        _ => false,
596                    });
597
598                    properties.push(crate::api::schema::PropertyInfo {
599                        name: prop_name.clone(),
600                        data_type: format!("{:?}", prop_meta.r#type),
601                        nullable: prop_meta.nullable,
602                        is_indexed,
603                    });
604                }
605            }
606
607            let mut indexes = Vec::new();
608            for idx in schema.indexes.iter().filter(|i| i.label() == name) {
609                use uni_common::core::schema::IndexDefinition;
610                let (idx_type, idx_props) = match idx {
611                    IndexDefinition::Vector(v) => ("VECTOR", vec![v.property.clone()]),
612                    IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
613                    IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
614                    IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
615                    IndexDefinition::JsonFullText(j) => ("JSON_FTS", vec![j.column.clone()]),
616                    _ => continue,
617                };
618
619                indexes.push(crate::api::schema::IndexInfo {
620                    name: idx.name().to_string(),
621                    index_type: idx_type.to_string(),
622                    properties: idx_props,
623                    status: "ONLINE".to_string(), // TODO: Check actual status
624                });
625            }
626
627            let mut constraints = Vec::new();
628            for c in &schema.constraints {
629                if let uni_common::core::schema::ConstraintTarget::Label(l) = &c.target
630                    && l == name
631                {
632                    let (ctype, cprops) = match &c.constraint_type {
633                        uni_common::core::schema::ConstraintType::Unique { properties } => {
634                            ("UNIQUE", properties.clone())
635                        }
636                        uni_common::core::schema::ConstraintType::Exists { property } => {
637                            ("EXISTS", vec![property.clone()])
638                        }
639                        uni_common::core::schema::ConstraintType::Check { expression } => {
640                            ("CHECK", vec![expression.clone()])
641                        }
642                        _ => ("UNKNOWN", vec![]),
643                    };
644
645                    constraints.push(crate::api::schema::ConstraintInfo {
646                        name: c.name.clone(),
647                        constraint_type: ctype.to_string(),
648                        properties: cprops,
649                        enabled: c.enabled,
650                    });
651                }
652            }
653
654            Ok(Some(crate::api::schema::LabelInfo {
655                name: name.to_string(),
656                count,
657                properties,
658                indexes,
659                constraints,
660            }))
661        } else {
662            Ok(None)
663        }
664    }
665
666    /// Get detailed information about an edge type.
667    pub async fn get_edge_type_info(
668        &self,
669        name: &str,
670    ) -> Result<Option<crate::api::schema::EdgeTypeInfo>> {
671        let schema = self.inner.schema.schema();
672        let edge_meta = match schema.edge_types.get(name) {
673            Some(meta) => meta,
674            None => return Ok(None),
675        };
676
677        // Count edges via internal query
678        let count = {
679            let query = format!("MATCH ()-[r:{}]->() RETURN count(r) AS cnt", name);
680            match self.inner.execute_internal(&query, HashMap::new()).await {
681                Ok(result) => result
682                    .rows()
683                    .first()
684                    .and_then(|r| r.get::<i64>("cnt").ok())
685                    .unwrap_or(0) as usize,
686                Err(_) => 0,
687            }
688        };
689
690        let source_labels = edge_meta.src_labels.clone();
691        let target_labels = edge_meta.dst_labels.clone();
692
693        let mut properties = Vec::new();
694        if let Some(props) = schema.properties.get(name) {
695            for (prop_name, prop_meta) in props {
696                let is_indexed = schema.indexes.iter().any(|idx| match idx {
697                    uni_common::core::schema::IndexDefinition::Scalar(s) => {
698                        s.label == name && s.properties.contains(prop_name)
699                    }
700                    uni_common::core::schema::IndexDefinition::FullText(f) => {
701                        f.label == name && f.properties.contains(prop_name)
702                    }
703                    uni_common::core::schema::IndexDefinition::Inverted(inv) => {
704                        inv.label == name && inv.property == *prop_name
705                    }
706                    _ => false,
707                });
708
709                properties.push(crate::api::schema::PropertyInfo {
710                    name: prop_name.clone(),
711                    data_type: format!("{:?}", prop_meta.r#type),
712                    nullable: prop_meta.nullable,
713                    is_indexed,
714                });
715            }
716        }
717
718        let mut indexes = Vec::new();
719        for idx in schema.indexes.iter().filter(|i| i.label() == name) {
720            use uni_common::core::schema::IndexDefinition;
721            let (idx_type, idx_props) = match idx {
722                IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
723                IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
724                IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
725                _ => continue,
726            };
727
728            indexes.push(crate::api::schema::IndexInfo {
729                name: idx.name().to_string(),
730                index_type: idx_type.to_string(),
731                properties: idx_props,
732                status: "ONLINE".to_string(),
733            });
734        }
735
736        let mut constraints = Vec::new();
737        for c in &schema.constraints {
738            if let uni_common::core::schema::ConstraintTarget::EdgeType(et) = &c.target
739                && et == name
740            {
741                let (ctype, cprops) = match &c.constraint_type {
742                    uni_common::core::schema::ConstraintType::Unique { properties } => {
743                        ("UNIQUE", properties.clone())
744                    }
745                    uni_common::core::schema::ConstraintType::Exists { property } => {
746                        ("EXISTS", vec![property.clone()])
747                    }
748                    uni_common::core::schema::ConstraintType::Check { expression } => {
749                        ("CHECK", vec![expression.clone()])
750                    }
751                    _ => ("UNKNOWN", vec![]),
752                };
753
754                constraints.push(crate::api::schema::ConstraintInfo {
755                    name: c.name.clone(),
756                    constraint_type: ctype.to_string(),
757                    properties: cprops,
758                    enabled: c.enabled,
759                });
760            }
761        }
762
763        Ok(Some(crate::api::schema::EdgeTypeInfo {
764            name: name.to_string(),
765            count,
766            source_labels,
767            target_labels,
768            properties,
769            indexes,
770            constraints,
771        }))
772    }
773
774    // ── Compaction ──────────────────────────────────────────────────────
775
776    /// Access compaction operations.
777    pub fn compaction(&self) -> compaction::Compaction<'_> {
778        compaction::Compaction { inner: &self.inner }
779    }
780
781    // ── Indexes ──────────────────────────────────────────────────────────
782
783    /// Access index management operations.
784    pub fn indexes(&self) -> indexes::Indexes<'_> {
785        indexes::Indexes { inner: &self.inner }
786    }
787
788    // ── Custom Functions ──────────────────────────────────────────────
789
790    /// Access custom Cypher function management.
791    pub fn functions(&self) -> functions::Functions<'_> {
792        functions::Functions { inner: &self.inner }
793    }
794
795    /// Shutdown the database gracefully, flushing pending data and stopping background tasks.
796    ///
797    /// This method flushes any pending data and waits for all background tasks to complete
798    /// (with a timeout). After calling this method, the database instance should not be used.
799    pub async fn shutdown(self) -> Result<()> {
800        // Flush pending data
801        if let Some(ref writer) = self.inner.writer {
802            let mut w = writer.write().await;
803            if let Err(e) = w.flush_to_l1(None).await {
804                tracing::error!("Error flushing during shutdown: {}", e);
805            }
806        }
807
808        self.inner
809            .shutdown_handle
810            .shutdown_async()
811            .await
812            .map_err(UniError::Internal)
813    }
814}
815
816impl Drop for Uni {
817    fn drop(&mut self) {
818        self.inner.shutdown_handle.shutdown_blocking();
819        tracing::debug!("Uni dropped, shutdown signal sent");
820    }
821}
822
823/// Builder for configuring and opening a `Uni` database instance.
824#[must_use = "builders do nothing until .build() is called"]
825pub struct UniBuilder {
826    uri: String,
827    config: UniConfig,
828    schema_file: Option<PathBuf>,
829    xervo_catalog: Option<Vec<ModelAliasSpec>>,
830    hybrid_remote_url: Option<String>,
831    cloud_config: Option<CloudStorageConfig>,
832    create_if_missing: bool,
833    fail_if_exists: bool,
834    read_only: bool,
835    write_lease: Option<multi_agent::WriteLease>,
836    temp_dir: Option<TempDir>,
837}
838
839impl UniBuilder {
840    /// Creates a new builder for the given URI.
841    pub fn new(uri: String) -> Self {
842        Self {
843            uri,
844            config: UniConfig::default(),
845            schema_file: None,
846            xervo_catalog: None,
847            hybrid_remote_url: None,
848            cloud_config: None,
849            create_if_missing: true,
850            fail_if_exists: false,
851            read_only: false,
852            write_lease: None,
853            temp_dir: None,
854        }
855    }
856
857    /// Load schema from JSON file on initialization.
858    pub fn schema_file(mut self, path: impl AsRef<Path>) -> Self {
859        self.schema_file = Some(path.as_ref().to_path_buf());
860        self
861    }
862
863    /// Set Uni-Xervo catalog explicitly.
864    pub fn xervo_catalog(mut self, catalog: Vec<ModelAliasSpec>) -> Self {
865        self.xervo_catalog = Some(catalog);
866        self
867    }
868
869    /// Configure remote storage for data, keeping local path for WAL/IDs.
870    ///
871    /// # Examples
872    ///
873    /// ```ignore
874    /// use uni_common::CloudStorageConfig;
875    ///
876    /// let config = CloudStorageConfig::S3 {
877    ///     bucket: "my-bucket".to_string(),
878    ///     region: Some("us-east-1".to_string()),
879    ///     endpoint: None,
880    ///     access_key_id: None,
881    ///     secret_access_key: None,
882    ///     session_token: None,
883    ///     virtual_hosted_style: false,
884    /// };
885    ///
886    /// let db = Uni::open("./local_meta")
887    ///     .remote_storage("s3://my-bucket/graph-data", config)
888    ///     .build()
889    ///     .await?;
890    /// ```
891    pub fn remote_storage(mut self, remote_url: &str, config: CloudStorageConfig) -> Self {
892        self.hybrid_remote_url = Some(remote_url.to_string());
893        self.cloud_config = Some(config);
894        self
895    }
896
897    /// Open the database in read-only mode.
898    ///
899    /// In read-only mode, no writer is created. All write operations
900    /// (`tx()`, `execute()`, `bulk_writer()`, `appender()`) will return
901    /// `ReadOnly` errors. Reads work normally.
902    pub fn read_only(mut self) -> Self {
903        self.read_only = true;
904        self
905    }
906
907    /// Set the write lease strategy for multi-agent access.
908    ///
909    /// This configures how write access is coordinated when multiple
910    /// processes share the same database.
911    pub fn write_lease(mut self, lease: multi_agent::WriteLease) -> Self {
912        self.write_lease = Some(lease);
913        self
914    }
915
916    /// Configure database options using `UniConfig`.
917    pub fn config(mut self, config: UniConfig) -> Self {
918        self.config = config;
919        self
920    }
921
922    /// Open the database (async).
923    pub async fn build(self) -> Result<Uni> {
924        let uri = self.uri.clone();
925        let is_remote_uri = uri.contains("://");
926        let is_hybrid = self.hybrid_remote_url.is_some();
927
928        if is_hybrid && is_remote_uri {
929            return Err(UniError::Internal(anyhow::anyhow!(
930                "Hybrid mode requires a local path as primary URI, found: {}",
931                uri
932            )));
933        }
934
935        let (storage_uri, data_store, local_store_opt) = if is_hybrid {
936            let remote_url = self.hybrid_remote_url.as_ref().unwrap();
937
938            // Remote Store (Data) - use explicit cloud_config if provided
939            let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
940                build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
941            } else {
942                let url = url::Url::parse(remote_url).map_err(|e| {
943                    UniError::Io(std::io::Error::new(
944                        std::io::ErrorKind::InvalidInput,
945                        e.to_string(),
946                    ))
947                })?;
948                let (os, _path) =
949                    object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
950                Arc::from(os)
951            };
952
953            // Local Store (WAL, IDs)
954            let path = PathBuf::from(&uri);
955            if path.exists() {
956                if self.fail_if_exists {
957                    return Err(UniError::Internal(anyhow::anyhow!(
958                        "Database already exists at {}",
959                        uri
960                    )));
961                }
962            } else {
963                if !self.create_if_missing {
964                    return Err(UniError::NotFound { path: path.clone() });
965                }
966                std::fs::create_dir_all(&path).map_err(UniError::Io)?;
967            }
968
969            let local_store = Arc::new(
970                LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
971            );
972
973            // For hybrid, storage_uri is the remote URL (since StorageManager loads datasets from there)
974            // But we must provide the correct store to other components manually.
975            (
976                remote_url.clone(),
977                remote_store,
978                Some(local_store as Arc<dyn ObjectStore>),
979            )
980        } else if is_remote_uri {
981            // Remote Only - use explicit cloud_config if provided
982            let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
983                build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
984            } else {
985                let url = url::Url::parse(&uri).map_err(|e| {
986                    UniError::Io(std::io::Error::new(
987                        std::io::ErrorKind::InvalidInput,
988                        e.to_string(),
989                    ))
990                })?;
991                let (os, _path) =
992                    object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
993                Arc::from(os)
994            };
995
996            (uri.clone(), remote_store, None)
997        } else {
998            // Local Only
999            let path = PathBuf::from(&uri);
1000            let storage_path = path.join("storage");
1001
1002            if path.exists() {
1003                if self.fail_if_exists {
1004                    return Err(UniError::Internal(anyhow::anyhow!(
1005                        "Database already exists at {}",
1006                        uri
1007                    )));
1008                }
1009            } else {
1010                if !self.create_if_missing {
1011                    return Err(UniError::NotFound { path: path.clone() });
1012                }
1013                std::fs::create_dir_all(&path).map_err(UniError::Io)?;
1014            }
1015
1016            // Ensure storage directory exists
1017            if !storage_path.exists() {
1018                std::fs::create_dir_all(&storage_path).map_err(UniError::Io)?;
1019            }
1020
1021            let store = Arc::new(
1022                LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
1023            );
1024            (
1025                storage_path.to_string_lossy().to_string(),
1026                store.clone() as Arc<dyn ObjectStore>,
1027                Some(store as Arc<dyn ObjectStore>),
1028            )
1029        };
1030
1031        // Canonical schema location in metadata catalog.
1032        let schema_obj_path = object_store::path::Path::from("catalog/schema.json");
1033        // Legacy schema location used by older builds.
1034        let legacy_schema_obj_path = object_store::path::Path::from("schema.json");
1035
1036        // Backward-compatible schema path migration:
1037        // if catalog/schema.json is missing but root schema.json exists,
1038        // copy root schema.json to catalog/schema.json.
1039        let has_catalog_schema = match data_store.get(&schema_obj_path).await {
1040            Ok(_) => true,
1041            Err(object_store::Error::NotFound { .. }) => false,
1042            Err(e) => return Err(UniError::Internal(e.into())),
1043        };
1044        if !has_catalog_schema {
1045            match data_store.get(&legacy_schema_obj_path).await {
1046                Ok(result) => {
1047                    let bytes = result
1048                        .bytes()
1049                        .await
1050                        .map_err(|e| UniError::Internal(e.into()))?;
1051                    data_store
1052                        .put(&schema_obj_path, bytes.into())
1053                        .await
1054                        .map_err(|e| UniError::Internal(e.into()))?;
1055                    info!(
1056                        legacy = %legacy_schema_obj_path,
1057                        target = %schema_obj_path,
1058                        "Migrated legacy schema path to catalog path"
1059                    );
1060                }
1061                Err(object_store::Error::NotFound { .. }) => {}
1062                Err(e) => return Err(UniError::Internal(e.into())),
1063            }
1064        }
1065
1066        // Load schema (SchemaManager::load creates a default if missing)
1067        // Schema is always in data_store (Remote or Local)
1068        let schema_manager = Arc::new(
1069            SchemaManager::load_from_store(data_store.clone(), &schema_obj_path)
1070                .await
1071                .map_err(UniError::Internal)?,
1072        );
1073
1074        let lancedb_storage_options = self
1075            .cloud_config
1076            .as_ref()
1077            .map(Self::cloud_config_to_lancedb_storage_options);
1078
1079        let storage = if is_hybrid || is_remote_uri {
1080            // Preserve explicit cloud settings (endpoint, credentials, path style)
1081            // by reusing the constructed remote store.
1082            StorageManager::new_with_store_and_storage_options(
1083                &storage_uri,
1084                data_store.clone(),
1085                schema_manager.clone(),
1086                self.config.clone(),
1087                lancedb_storage_options.clone(),
1088            )
1089            .await
1090            .map_err(UniError::Internal)?
1091        } else {
1092            // Local mode keeps using a storage-path-scoped local store.
1093            StorageManager::new_with_config(
1094                &storage_uri,
1095                schema_manager.clone(),
1096                self.config.clone(),
1097            )
1098            .await
1099            .map_err(UniError::Internal)?
1100        };
1101
1102        let storage = Arc::new(storage);
1103
1104        // Create shutdown handle
1105        let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
1106
1107        // Start background compaction with shutdown signal
1108        let compaction_handle = storage
1109            .clone()
1110            .start_background_compaction(shutdown_handle.subscribe());
1111        shutdown_handle.track_task(compaction_handle);
1112
1113        // Initialize property manager
1114        let prop_cache_capacity = self.config.cache_size / 1024;
1115
1116        let prop_manager = Arc::new(PropertyManager::new(
1117            storage.clone(),
1118            schema_manager.clone(),
1119            prop_cache_capacity,
1120        ));
1121
1122        // Setup stores for WAL and IdAllocator (needed for version recovery check)
1123        let id_store = local_store_opt
1124            .clone()
1125            .unwrap_or_else(|| data_store.clone());
1126        let wal_store = local_store_opt
1127            .clone()
1128            .unwrap_or_else(|| data_store.clone());
1129
1130        // Determine start version and WAL high water mark from latest snapshot.
1131        // Detects and recovers from a lost manifest pointer.
1132        let latest_snapshot = storage
1133            .snapshot_manager()
1134            .load_latest_snapshot()
1135            .await
1136            .map_err(UniError::Internal)?;
1137
1138        let (start_version, wal_high_water_mark) = if let Some(ref snapshot) = latest_snapshot {
1139            (
1140                snapshot.version_high_water_mark + 1,
1141                snapshot.wal_high_water_mark,
1142            )
1143        } else {
1144            // No latest snapshot — fresh DB or lost manifest?
1145            let has_manifests = storage
1146                .snapshot_manager()
1147                .has_any_manifests()
1148                .await
1149                .unwrap_or(false);
1150
1151            let wal_check =
1152                WriteAheadLog::new(wal_store.clone(), object_store::path::Path::from("wal"));
1153            let has_wal = wal_check.has_segments().await.unwrap_or(false);
1154
1155            if has_manifests {
1156                // Manifests exist but latest pointer is missing — try to recover from manifests
1157                let snapshot_ids = storage
1158                    .snapshot_manager()
1159                    .list_snapshots()
1160                    .await
1161                    .map_err(UniError::Internal)?;
1162                if let Some(last_id) = snapshot_ids.last() {
1163                    let manifest = storage
1164                        .snapshot_manager()
1165                        .load_snapshot(last_id)
1166                        .await
1167                        .map_err(UniError::Internal)?;
1168                    tracing::warn!(
1169                        "Latest snapshot pointer missing but found manifest '{}'. \
1170                         Recovering version {}.",
1171                        last_id,
1172                        manifest.version_high_water_mark
1173                    );
1174                    (
1175                        manifest.version_high_water_mark + 1,
1176                        manifest.wal_high_water_mark,
1177                    )
1178                } else {
1179                    return Err(UniError::Internal(anyhow::anyhow!(
1180                        "Snapshot manifests directory exists but contains no valid manifests. \
1181                         Possible data corruption."
1182                    )));
1183                }
1184            } else if has_wal {
1185                // WAL exists but no manifests at all — data exists but unrecoverable version
1186                return Err(UniError::Internal(anyhow::anyhow!(
1187                    "Database has WAL segments but no snapshot manifest. \
1188                     Cannot safely determine version counter -- starting at 0 would cause \
1189                     version conflicts and data corruption. \
1190                     Restore the snapshot manifest or delete WAL to start fresh."
1191                )));
1192            } else {
1193                // Truly fresh database
1194                (0, 0)
1195            }
1196        };
1197
1198        let allocator = Arc::new(
1199            IdAllocator::new(
1200                id_store,
1201                object_store::path::Path::from("id_allocator.json"),
1202                1000,
1203            )
1204            .await
1205            .map_err(UniError::Internal)?,
1206        );
1207
1208        let wal = if !self.config.wal_enabled {
1209            // WAL disabled by config
1210            None
1211        } else if is_remote_uri && !is_hybrid {
1212            // Remote-only WAL (ObjectStoreWal)
1213            Some(Arc::new(WriteAheadLog::new(
1214                wal_store,
1215                object_store::path::Path::from("wal"),
1216            )))
1217        } else if is_hybrid || !is_remote_uri {
1218            // Local WAL (using local_store)
1219            // Even if local_store uses ObjectStore trait, it maps to FS.
1220            Some(Arc::new(WriteAheadLog::new(
1221                wal_store,
1222                object_store::path::Path::from("wal"),
1223            )))
1224        } else {
1225            None
1226        };
1227
1228        let writer = Arc::new(RwLock::new(
1229            Writer::new_with_config(
1230                storage.clone(),
1231                schema_manager.clone(),
1232                start_version,
1233                self.config.clone(),
1234                wal,
1235                Some(allocator),
1236            )
1237            .await
1238            .map_err(UniError::Internal)?,
1239        ));
1240
1241        let required_embed_aliases: std::collections::BTreeSet<String> = schema_manager
1242            .schema()
1243            .indexes
1244            .iter()
1245            .filter_map(|idx| {
1246                if let uni_common::core::schema::IndexDefinition::Vector(cfg) = idx {
1247                    cfg.embedding_config.as_ref().map(|emb| emb.alias.clone())
1248                } else {
1249                    None
1250                }
1251            })
1252            .collect();
1253
1254        if !required_embed_aliases.is_empty() && self.xervo_catalog.is_none() {
1255            return Err(UniError::Internal(anyhow::anyhow!(
1256                "Uni-Xervo catalog is required because schema has vector indexes with embedding aliases"
1257            )));
1258        }
1259
1260        let xervo_runtime = if let Some(catalog) = self.xervo_catalog {
1261            for alias in &required_embed_aliases {
1262                let spec = catalog.iter().find(|s| &s.alias == alias).ok_or_else(|| {
1263                    UniError::Internal(anyhow::anyhow!(
1264                        "Missing Uni-Xervo alias '{}' referenced by vector index embedding config",
1265                        alias
1266                    ))
1267                })?;
1268                if spec.task != ModelTask::Embed {
1269                    return Err(UniError::Internal(anyhow::anyhow!(
1270                        "Uni-Xervo alias '{}' must be an embed task",
1271                        alias
1272                    )));
1273                }
1274            }
1275
1276            let mut runtime_builder = ModelRuntime::builder().catalog(catalog);
1277            #[cfg(feature = "provider-candle")]
1278            {
1279                runtime_builder = runtime_builder
1280                    .register_provider(uni_xervo::provider::LocalCandleProvider::new());
1281            }
1282            #[cfg(feature = "provider-fastembed")]
1283            {
1284                runtime_builder = runtime_builder
1285                    .register_provider(uni_xervo::provider::LocalFastEmbedProvider::new());
1286            }
1287            #[cfg(feature = "provider-openai")]
1288            {
1289                runtime_builder = runtime_builder
1290                    .register_provider(uni_xervo::provider::RemoteOpenAIProvider::new());
1291            }
1292            #[cfg(feature = "provider-gemini")]
1293            {
1294                runtime_builder = runtime_builder
1295                    .register_provider(uni_xervo::provider::RemoteGeminiProvider::new());
1296            }
1297            #[cfg(feature = "provider-vertexai")]
1298            {
1299                runtime_builder = runtime_builder
1300                    .register_provider(uni_xervo::provider::RemoteVertexAIProvider::new());
1301            }
1302            #[cfg(feature = "provider-mistral")]
1303            {
1304                runtime_builder = runtime_builder
1305                    .register_provider(uni_xervo::provider::RemoteMistralProvider::new());
1306            }
1307            #[cfg(feature = "provider-anthropic")]
1308            {
1309                runtime_builder = runtime_builder
1310                    .register_provider(uni_xervo::provider::RemoteAnthropicProvider::new());
1311            }
1312            #[cfg(feature = "provider-voyageai")]
1313            {
1314                runtime_builder = runtime_builder
1315                    .register_provider(uni_xervo::provider::RemoteVoyageAIProvider::new());
1316            }
1317            #[cfg(feature = "provider-cohere")]
1318            {
1319                runtime_builder = runtime_builder
1320                    .register_provider(uni_xervo::provider::RemoteCohereProvider::new());
1321            }
1322            #[cfg(feature = "provider-azure-openai")]
1323            {
1324                runtime_builder = runtime_builder
1325                    .register_provider(uni_xervo::provider::RemoteAzureOpenAIProvider::new());
1326            }
1327            #[cfg(feature = "provider-mistralrs")]
1328            {
1329                runtime_builder = runtime_builder
1330                    .register_provider(uni_xervo::provider::LocalMistralRsProvider::new());
1331            }
1332
1333            Some(
1334                runtime_builder
1335                    .build()
1336                    .await
1337                    .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?,
1338            )
1339        } else {
1340            None
1341        };
1342
1343        if let Some(ref runtime) = xervo_runtime {
1344            let mut writer_guard = writer.write().await;
1345            writer_guard.set_xervo_runtime(runtime.clone());
1346        }
1347
1348        // Replay WAL to restore any uncommitted mutations from previous session
1349        // Only replay mutations with LSN > wal_high_water_mark to avoid double-applying
1350        {
1351            let w = writer.read().await;
1352            let replayed = w
1353                .replay_wal(wal_high_water_mark)
1354                .await
1355                .map_err(UniError::Internal)?;
1356            if replayed > 0 {
1357                info!("WAL recovery: replayed {} mutations", replayed);
1358            }
1359        }
1360
1361        // Wire up IndexRebuildManager for post-flush automatic rebuild scheduling
1362        if self.config.index_rebuild.auto_rebuild_enabled {
1363            let rebuild_manager = Arc::new(
1364                uni_store::storage::IndexRebuildManager::new(
1365                    storage.clone(),
1366                    schema_manager.clone(),
1367                    self.config.index_rebuild.clone(),
1368                )
1369                .await
1370                .map_err(UniError::Internal)?,
1371            );
1372
1373            let handle = rebuild_manager
1374                .clone()
1375                .start_background_worker(shutdown_handle.subscribe());
1376            shutdown_handle.track_task(handle);
1377
1378            {
1379                let mut writer_guard = writer.write().await;
1380                writer_guard.set_index_rebuild_manager(rebuild_manager);
1381            }
1382        }
1383
1384        // Start background flush checker for time-based auto-flush
1385        if let Some(interval) = self.config.auto_flush_interval {
1386            let writer_clone = writer.clone();
1387            let mut shutdown_rx = shutdown_handle.subscribe();
1388
1389            let handle = tokio::spawn(async move {
1390                let mut ticker = tokio::time::interval(interval);
1391                loop {
1392                    tokio::select! {
1393                        _ = ticker.tick() => {
1394                            let mut w = writer_clone.write().await;
1395                            if let Err(e) = w.check_flush().await {
1396                                tracing::warn!("Background flush check failed: {}", e);
1397                            }
1398                        }
1399                        _ = shutdown_rx.recv() => {
1400                            tracing::info!("Auto-flush shutting down, performing final flush");
1401                            let mut w = writer_clone.write().await;
1402                            let _ = w.flush_to_l1(None).await;
1403                            break;
1404                        }
1405                    }
1406                }
1407            });
1408
1409            shutdown_handle.track_task(handle);
1410        }
1411
1412        let (commit_tx, _) = tokio::sync::broadcast::channel(256);
1413        let writer_field = if self.read_only { None } else { Some(writer) };
1414
1415        Ok(Uni {
1416            inner: Arc::new(UniInner {
1417                storage,
1418                schema: schema_manager,
1419                properties: prop_manager,
1420                writer: writer_field,
1421                xervo_runtime,
1422                config: self.config,
1423                procedure_registry: Arc::new(uni_query::ProcedureRegistry::new()),
1424                shutdown_handle,
1425                locy_rule_registry: Arc::new(std::sync::RwLock::new(
1426                    impl_locy::LocyRuleRegistry::default(),
1427                )),
1428                start_time: Instant::now(),
1429                commit_tx,
1430                write_lease: self.write_lease,
1431                active_session_count: AtomicUsize::new(0),
1432                total_queries: AtomicU64::new(0),
1433                total_commits: AtomicU64::new(0),
1434                custom_functions: Arc::new(std::sync::RwLock::new(
1435                    uni_query::CustomFunctionRegistry::new(),
1436                )),
1437                cached_l0_mutation_count: AtomicUsize::new(0),
1438                cached_l0_estimated_size: AtomicUsize::new(0),
1439                cached_wal_lsn: AtomicU64::new(0),
1440                _temp_dir: self.temp_dir,
1441            }),
1442        })
1443    }
1444
1445    /// Open the database (blocking)
1446    pub fn build_sync(self) -> Result<Uni> {
1447        let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
1448        rt.block_on(self.build())
1449    }
1450
1451    fn cloud_config_to_lancedb_storage_options(
1452        config: &CloudStorageConfig,
1453    ) -> std::collections::HashMap<String, String> {
1454        let mut opts = std::collections::HashMap::new();
1455
1456        match config {
1457            CloudStorageConfig::S3 {
1458                bucket,
1459                region,
1460                endpoint,
1461                access_key_id,
1462                secret_access_key,
1463                session_token,
1464                virtual_hosted_style,
1465            } => {
1466                opts.insert("bucket".to_string(), bucket.clone());
1467                opts.insert(
1468                    "virtual_hosted_style_request".to_string(),
1469                    virtual_hosted_style.to_string(),
1470                );
1471
1472                if let Some(r) = region {
1473                    opts.insert("region".to_string(), r.clone());
1474                }
1475                if let Some(ep) = endpoint {
1476                    opts.insert("endpoint".to_string(), ep.clone());
1477                    if ep.starts_with("http://") {
1478                        opts.insert("allow_http".to_string(), "true".to_string());
1479                    }
1480                }
1481                if let Some(v) = access_key_id {
1482                    opts.insert("access_key_id".to_string(), v.clone());
1483                }
1484                if let Some(v) = secret_access_key {
1485                    opts.insert("secret_access_key".to_string(), v.clone());
1486                }
1487                if let Some(v) = session_token {
1488                    opts.insert("session_token".to_string(), v.clone());
1489                }
1490            }
1491            CloudStorageConfig::Gcs {
1492                bucket,
1493                service_account_path,
1494                service_account_key,
1495            } => {
1496                opts.insert("bucket".to_string(), bucket.clone());
1497                if let Some(v) = service_account_path {
1498                    opts.insert("service_account".to_string(), v.clone());
1499                    opts.insert("application_credentials".to_string(), v.clone());
1500                }
1501                if let Some(v) = service_account_key {
1502                    opts.insert("service_account_key".to_string(), v.clone());
1503                }
1504            }
1505            CloudStorageConfig::Azure {
1506                container,
1507                account,
1508                access_key,
1509                sas_token,
1510            } => {
1511                opts.insert("account_name".to_string(), account.clone());
1512                opts.insert("container_name".to_string(), container.clone());
1513                if let Some(v) = access_key {
1514                    opts.insert("access_key".to_string(), v.clone());
1515                }
1516                if let Some(v) = sas_token {
1517                    opts.insert("sas_token".to_string(), v.clone());
1518                }
1519            }
1520        }
1521
1522        opts
1523    }
1524}