Skip to main content

uni_db/api/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::time::Duration;
7
8pub mod builder;
9pub mod bulk;
10pub mod impl_locy;
11pub mod impl_query;
12pub mod query_builder;
13pub mod schema;
14pub mod session;
15pub mod sync;
16pub mod transaction;
17pub mod xervo;
18
19use object_store::ObjectStore;
20use object_store::local::LocalFileSystem;
21use tracing::info;
22use uni_common::core::snapshot::SnapshotManifest;
23use uni_common::{CloudStorageConfig, UniConfig};
24use uni_common::{Result, UniError};
25use uni_store::cloud::build_cloud_store;
26use uni_xervo::api::{ModelAliasSpec, ModelTask};
27use uni_xervo::runtime::ModelRuntime;
28
29use uni_common::core::schema::SchemaManager;
30use uni_store::runtime::id_allocator::IdAllocator;
31use uni_store::runtime::property_manager::PropertyManager;
32use uni_store::runtime::wal::WriteAheadLog;
33use uni_store::storage::manager::StorageManager;
34
35use tokio::sync::RwLock;
36use uni_store::runtime::writer::Writer;
37
38use crate::shutdown::ShutdownHandle;
39
40/// Main entry point for Uni embedded database.
41///
42/// Handles storage, schema, and query execution. It coordinates the various
43/// subsystems including the storage engine, query executor, and graph algorithms.
44///
45/// # Examples
46///
47/// ## Local Usage
48/// ```no_run
49/// use uni_db::Uni;
50///
51/// #[tokio::main]
52/// async fn main() -> Result<(), uni_db::UniError> {
53///     let db = Uni::open("./my_db")
54///         .build()
55///         .await?;
56///
57///     // Run a query
58///     let results = db.query("MATCH (n) RETURN count(n)").await?;
59///     println!("Count: {:?}", results);
60///     Ok(())
61/// }
62/// ```
63///
64/// ## Hybrid Storage (S3 + Local)
65/// Store bulk data and catalog metadata in S3 (or GCS/Azure) while keeping WAL/ID allocation local.
66///
67/// ```no_run
68/// use uni_db::Uni;
69///
70/// #[tokio::main]
71/// async fn main() -> Result<(), uni_db::UniError> {
72///     // Requires `object_store` features enabled (aws, gcp, azure)
73///     let db = Uni::open("./local_meta")
74///         .hybrid("./local_meta", "s3://my-bucket/graph-data")
75///         .build()
76///         .await?;
77///     Ok(())
78/// }
79/// ```
80pub struct Uni {
81    pub(crate) storage: Arc<StorageManager>,
82    pub(crate) schema: Arc<SchemaManager>,
83    pub(crate) properties: Arc<PropertyManager>,
84    pub(crate) writer: Option<Arc<RwLock<Writer>>>,
85    pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
86    pub(crate) config: UniConfig,
87    pub(crate) procedure_registry: Arc<uni_query::ProcedureRegistry>,
88    pub(crate) shutdown_handle: Arc<ShutdownHandle>,
89}
90
91impl Uni {
92    /// Open or create a database at the given path.
93    ///
94    /// If the database does not exist, it will be created.
95    ///
96    /// # Arguments
97    ///
98    /// * `uri` - Local path or object store URI.
99    ///
100    /// # Returns
101    ///
102    /// A [`UniBuilder`] to configure and build the database instance.
103    pub fn open(uri: impl Into<String>) -> UniBuilder {
104        UniBuilder::new(uri.into())
105    }
106
107    /// Open an existing database at the given path. Fails if it does not exist.
108    pub fn open_existing(uri: impl Into<String>) -> UniBuilder {
109        let mut builder = UniBuilder::new(uri.into());
110        builder.create_if_missing = false;
111        builder
112    }
113
114    /// Create a new database at the given path. Fails if it already exists.
115    pub fn create(uri: impl Into<String>) -> UniBuilder {
116        let mut builder = UniBuilder::new(uri.into());
117        builder.fail_if_exists = true;
118        builder
119    }
120
121    /// Create a temporary database that is deleted when dropped.
122    ///
123    /// Useful for tests and short-lived processing.
124    /// Note: Currently uses a temporary directory on the filesystem.
125    pub fn temporary() -> UniBuilder {
126        let temp_dir = std::env::temp_dir().join(format!("uni_mem_{}", uuid::Uuid::new_v4()));
127        UniBuilder::new(temp_dir.to_string_lossy().to_string())
128    }
129
130    /// Open an in-memory database (alias for temporary).
131    pub fn in_memory() -> UniBuilder {
132        Self::temporary()
133    }
134
135    /// Open a point-in-time view of the database at the given snapshot.
136    ///
137    /// This returns a new `Uni` instance that is pinned to the specified snapshot state.
138    /// The returned instance is read-only. Used internally by Cypher `VERSION AS OF` / `TIMESTAMP AS OF`.
139    pub(crate) async fn at_snapshot(&self, snapshot_id: &str) -> Result<Uni> {
140        let manifest = self
141            .storage
142            .snapshot_manager()
143            .load_snapshot(snapshot_id)
144            .await
145            .map_err(UniError::Internal)?;
146
147        let pinned_storage = Arc::new(self.storage.pinned(manifest));
148
149        let prop_manager = Arc::new(PropertyManager::new(
150            pinned_storage.clone(),
151            self.schema.clone(),
152            self.properties.cache_size(),
153        ));
154
155        // Create a shutdown handle for this read-only snapshot view
156        // (no background tasks, but needed for consistency)
157        let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
158
159        Ok(Uni {
160            storage: pinned_storage,
161            schema: self.schema.clone(),
162            properties: prop_manager,
163            writer: None,
164            xervo_runtime: self.xervo_runtime.clone(),
165            config: self.config.clone(),
166            procedure_registry: self.procedure_registry.clone(),
167            shutdown_handle,
168        })
169    }
170
171    /// Get configuration
172    pub fn config(&self) -> &UniConfig {
173        &self.config
174    }
175
176    /// Returns the procedure registry for registering test procedures.
177    pub fn procedure_registry(&self) -> &Arc<uni_query::ProcedureRegistry> {
178        &self.procedure_registry
179    }
180
181    /// Get current schema (read-only snapshot)
182    pub fn get_schema(&self) -> Arc<uni_common::core::schema::Schema> {
183        self.schema.schema()
184    }
185
186    /// Create a bulk writer for efficient data loading.
187    pub fn bulk_writer(&self) -> bulk::BulkWriterBuilder<'_> {
188        bulk::BulkWriterBuilder::new(self)
189    }
190
191    /// Create a session builder for scoped query context.
192    pub fn session(&self) -> session::SessionBuilder<'_> {
193        session::SessionBuilder::new(self)
194    }
195
196    /// Get schema manager
197    #[doc(hidden)]
198    pub fn schema_manager(&self) -> Arc<SchemaManager> {
199        self.schema.clone()
200    }
201
202    #[doc(hidden)]
203    pub fn writer(&self) -> Option<Arc<RwLock<Writer>>> {
204        self.writer.clone()
205    }
206
207    #[doc(hidden)]
208    pub fn storage(&self) -> Arc<StorageManager> {
209        self.storage.clone()
210    }
211
212    /// Flush all uncommitted changes to persistent storage (L1).
213    ///
214    /// This forces a write of the current in-memory buffer (L0) to columnar files.
215    /// It also creates a new snapshot.
216    pub async fn flush(&self) -> Result<()> {
217        if let Some(writer_lock) = &self.writer {
218            let mut writer = writer_lock.write().await;
219            writer
220                .flush_to_l1(None)
221                .await
222                .map(|_| ())
223                .map_err(UniError::Internal)
224        } else {
225            Err(UniError::ReadOnly {
226                operation: "flush".to_string(),
227            })
228        }
229    }
230
231    /// Create a named point-in-time snapshot of the database.
232    ///
233    /// This flushes current changes and records the state.
234    /// Returns the snapshot ID.
235    pub async fn create_snapshot(&self, name: Option<&str>) -> Result<String> {
236        if let Some(writer_lock) = &self.writer {
237            let mut writer = writer_lock.write().await;
238            writer
239                .flush_to_l1(name.map(|s| s.to_string()))
240                .await
241                .map_err(UniError::Internal)
242        } else {
243            Err(UniError::ReadOnly {
244                operation: "create_snapshot".to_string(),
245            })
246        }
247    }
248
249    /// Create a persisted named snapshot that can be retrieved later.
250    pub async fn create_named_snapshot(&self, name: &str) -> Result<String> {
251        if name.is_empty() {
252            return Err(UniError::Internal(anyhow::anyhow!(
253                "Snapshot name cannot be empty"
254            )));
255        }
256
257        let snapshot_id = self.create_snapshot(Some(name)).await?;
258
259        self.storage
260            .snapshot_manager()
261            .save_named_snapshot(name, &snapshot_id)
262            .await
263            .map_err(UniError::Internal)?;
264
265        Ok(snapshot_id)
266    }
267
268    /// List all available snapshots.
269    pub async fn list_snapshots(&self) -> Result<Vec<SnapshotManifest>> {
270        let sm = self.storage.snapshot_manager();
271        let ids = sm.list_snapshots().await.map_err(UniError::Internal)?;
272        let mut manifests = Vec::new();
273        for id in ids {
274            if let Ok(m) = sm.load_snapshot(&id).await {
275                manifests.push(m);
276            }
277        }
278        Ok(manifests)
279    }
280
281    /// Restore the database to a specific snapshot.
282    ///
283    /// **Note**: This currently requires a restart or re-opening of Uni to fully take effect
284    /// as it only updates the latest pointer.
285    pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
286        self.storage
287            .snapshot_manager()
288            .set_latest_snapshot(snapshot_id)
289            .await
290            .map_err(UniError::Internal)
291    }
292
293    /// Check if a label exists in the schema.
294    pub async fn label_exists(&self, name: &str) -> Result<bool> {
295        Ok(self.schema.schema().labels.get(name).is_some_and(|l| {
296            matches!(
297                l.state,
298                uni_common::core::schema::SchemaElementState::Active
299            )
300        }))
301    }
302
303    /// Check if an edge type exists in the schema.
304    pub async fn edge_type_exists(&self, name: &str) -> Result<bool> {
305        Ok(self.schema.schema().edge_types.get(name).is_some_and(|e| {
306            matches!(
307                e.state,
308                uni_common::core::schema::SchemaElementState::Active
309            )
310        }))
311    }
312
313    /// Get all label names.
314    /// Returns the union of schema-registered labels (Active state) and labels
315    /// discovered from data (for schemaless mode where labels may not be in the
316    /// schema). This is consistent with `list_edge_types()` for schema labels
317    /// while also supporting schemaless workflows.
318    pub async fn list_labels(&self) -> Result<Vec<String>> {
319        let mut all_labels = std::collections::HashSet::new();
320
321        // Schema labels (covers schema-defined labels that may not have data yet)
322        for (name, label) in self.schema.schema().labels.iter() {
323            if matches!(
324                label.state,
325                uni_common::core::schema::SchemaElementState::Active
326            ) {
327                all_labels.insert(name.clone());
328            }
329        }
330
331        // Data labels (covers schemaless labels that aren't in the schema)
332        let query = "MATCH (n) RETURN DISTINCT labels(n) AS labels";
333        let result = self.query(query).await?;
334        for row in &result.rows {
335            if let Ok(labels_list) = row.get::<Vec<String>>("labels") {
336                for label in labels_list {
337                    all_labels.insert(label);
338                }
339            }
340        }
341
342        Ok(all_labels.into_iter().collect())
343    }
344
345    /// Get all edge type names.
346    pub async fn list_edge_types(&self) -> Result<Vec<String>> {
347        Ok(self
348            .schema
349            .schema()
350            .edge_types
351            .iter()
352            .filter(|(_, e)| {
353                matches!(
354                    e.state,
355                    uni_common::core::schema::SchemaElementState::Active
356                )
357            })
358            .map(|(name, _)| name.clone())
359            .collect())
360    }
361
362    /// Get detailed information about a label.
363    pub async fn get_label_info(
364        &self,
365        name: &str,
366    ) -> Result<Option<crate::api::schema::LabelInfo>> {
367        let schema = self.schema.schema();
368        if schema.labels.contains_key(name) {
369            let count = if let Ok(ds) = self.storage.vertex_dataset(name) {
370                if let Ok(raw) = ds.open_raw().await {
371                    raw.count_rows(None)
372                        .await
373                        .map_err(|e| UniError::Internal(anyhow::anyhow!(e)))?
374                } else {
375                    0
376                }
377            } else {
378                0
379            };
380
381            let mut properties = Vec::new();
382            if let Some(props) = schema.properties.get(name) {
383                for (prop_name, prop_meta) in props {
384                    let is_indexed = schema.indexes.iter().any(|idx| match idx {
385                        uni_common::core::schema::IndexDefinition::Vector(v) => {
386                            v.label == name && v.property == *prop_name
387                        }
388                        uni_common::core::schema::IndexDefinition::Scalar(s) => {
389                            s.label == name && s.properties.contains(prop_name)
390                        }
391                        uni_common::core::schema::IndexDefinition::FullText(f) => {
392                            f.label == name && f.properties.contains(prop_name)
393                        }
394                        uni_common::core::schema::IndexDefinition::Inverted(inv) => {
395                            inv.label == name && inv.property == *prop_name
396                        }
397                        uni_common::core::schema::IndexDefinition::JsonFullText(j) => {
398                            j.label == name
399                        }
400                        _ => false,
401                    });
402
403                    properties.push(crate::api::schema::PropertyInfo {
404                        name: prop_name.clone(),
405                        data_type: format!("{:?}", prop_meta.r#type),
406                        nullable: prop_meta.nullable,
407                        is_indexed,
408                    });
409                }
410            }
411
412            let mut indexes = Vec::new();
413            for idx in schema.indexes.iter().filter(|i| i.label() == name) {
414                use uni_common::core::schema::IndexDefinition;
415                let (idx_type, idx_props) = match idx {
416                    IndexDefinition::Vector(v) => ("VECTOR", vec![v.property.clone()]),
417                    IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
418                    IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
419                    IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
420                    IndexDefinition::JsonFullText(j) => ("JSON_FTS", vec![j.column.clone()]),
421                    _ => continue,
422                };
423
424                indexes.push(crate::api::schema::IndexInfo {
425                    name: idx.name().to_string(),
426                    index_type: idx_type.to_string(),
427                    properties: idx_props,
428                    status: "ONLINE".to_string(), // TODO: Check actual status
429                });
430            }
431
432            let mut constraints = Vec::new();
433            for c in &schema.constraints {
434                if let uni_common::core::schema::ConstraintTarget::Label(l) = &c.target
435                    && l == name
436                {
437                    let (ctype, cprops) = match &c.constraint_type {
438                        uni_common::core::schema::ConstraintType::Unique { properties } => {
439                            ("UNIQUE", properties.clone())
440                        }
441                        uni_common::core::schema::ConstraintType::Exists { property } => {
442                            ("EXISTS", vec![property.clone()])
443                        }
444                        uni_common::core::schema::ConstraintType::Check { expression } => {
445                            ("CHECK", vec![expression.clone()])
446                        }
447                        _ => ("UNKNOWN", vec![]),
448                    };
449
450                    constraints.push(crate::api::schema::ConstraintInfo {
451                        name: c.name.clone(),
452                        constraint_type: ctype.to_string(),
453                        properties: cprops,
454                        enabled: c.enabled,
455                    });
456                }
457            }
458
459            Ok(Some(crate::api::schema::LabelInfo {
460                name: name.to_string(),
461                count,
462                properties,
463                indexes,
464                constraints,
465            }))
466        } else {
467            Ok(None)
468        }
469    }
470
471    /// Manually trigger compaction for a specific label.
472    ///
473    /// Compaction merges multiple L1 files into larger files to improve read performance.
474    pub async fn compact_label(
475        &self,
476        label: &str,
477    ) -> Result<uni_store::compaction::CompactionStats> {
478        self.storage
479            .compact_label(label)
480            .await
481            .map_err(UniError::Internal)
482    }
483
484    /// Manually trigger compaction for a specific edge type.
485    pub async fn compact_edge_type(
486        &self,
487        edge_type: &str,
488    ) -> Result<uni_store::compaction::CompactionStats> {
489        self.storage
490            .compact_edge_type(edge_type)
491            .await
492            .map_err(UniError::Internal)
493    }
494
495    /// Wait for any ongoing compaction to complete.
496    ///
497    /// Useful for tests or ensuring consistent performance before benchmarks.
498    pub async fn wait_for_compaction(&self) -> Result<()> {
499        self.storage
500            .wait_for_compaction()
501            .await
502            .map_err(UniError::Internal)
503    }
504
505    /// Bulk insert vertices for a given label.
506    ///
507    /// This is a low-level API intended for bulk loading and benchmarking.
508    /// Each properties map should contain all property values as JSON.
509    ///
510    /// Returns the allocated VIDs in the same order as the input.
511    pub async fn bulk_insert_vertices(
512        &self,
513        label: &str,
514        properties_list: Vec<uni_common::Properties>,
515    ) -> Result<Vec<uni_common::core::id::Vid>> {
516        let schema = self.schema.schema();
517        // Validate label exists in schema
518        schema
519            .labels
520            .get(label)
521            .ok_or_else(|| UniError::LabelNotFound {
522                label: label.to_string(),
523            })?;
524        if let Some(writer_lock) = &self.writer {
525            let mut writer = writer_lock.write().await;
526
527            if properties_list.is_empty() {
528                return Ok(Vec::new());
529            }
530
531            // NEW: Batch allocate VIDs (single call)
532            let vids = writer
533                .allocate_vids(properties_list.len())
534                .await
535                .map_err(UniError::Internal)?;
536
537            // NEW: Use optimized batch insert
538            let _props = writer
539                .insert_vertices_batch(vids.clone(), properties_list, vec![label.to_string()])
540                .await
541                .map_err(UniError::Internal)?;
542
543            Ok(vids)
544        } else {
545            Err(UniError::ReadOnly {
546                operation: "bulk_insert_vertices".to_string(),
547            })
548        }
549    }
550
551    /// Bulk insert edges for a given edge type using pre-allocated VIDs.
552    ///
553    /// This is a low-level API intended for bulk loading and benchmarking.
554    /// Each tuple is (src_vid, dst_vid, properties).
555    pub async fn bulk_insert_edges(
556        &self,
557        edge_type: &str,
558        edges: Vec<(
559            uni_common::core::id::Vid,
560            uni_common::core::id::Vid,
561            uni_common::Properties,
562        )>,
563    ) -> Result<()> {
564        let schema = self.schema.schema();
565        let edge_meta =
566            schema
567                .edge_types
568                .get(edge_type)
569                .ok_or_else(|| UniError::EdgeTypeNotFound {
570                    edge_type: edge_type.to_string(),
571                })?;
572        let type_id = edge_meta.id;
573
574        if let Some(writer_lock) = &self.writer {
575            let mut writer = writer_lock.write().await;
576
577            for (src_vid, dst_vid, props) in edges {
578                let eid = writer.next_eid(type_id).await.map_err(UniError::Internal)?;
579                writer
580                    .insert_edge(
581                        src_vid,
582                        dst_vid,
583                        type_id,
584                        eid,
585                        props,
586                        Some(edge_type.to_string()),
587                    )
588                    .await
589                    .map_err(UniError::Internal)?;
590            }
591
592            Ok(())
593        } else {
594            Err(UniError::ReadOnly {
595                operation: "bulk_insert_edges".to_string(),
596            })
597        }
598    }
599
600    /// Get the status of background index rebuild tasks.
601    ///
602    /// Returns all tracked index rebuild tasks, including pending, in-progress,
603    /// completed, and failed tasks. Use this to monitor progress of async
604    /// index rebuilds started via `BulkWriter::commit()` with `async_indexes(true)`.
605    ///
606    /// # Example
607    ///
608    /// ```ignore
609    /// let status = db.index_rebuild_status().await?;
610    /// for task in status {
611    ///     println!("Label: {}, Status: {:?}", task.label, task.status);
612    /// }
613    /// ```
614    pub async fn index_rebuild_status(&self) -> Result<Vec<uni_store::storage::IndexRebuildTask>> {
615        let manager = uni_store::storage::IndexRebuildManager::new(
616            self.storage.clone(),
617            self.schema.clone(),
618            self.config.index_rebuild.clone(),
619        )
620        .await
621        .map_err(UniError::Internal)?;
622
623        Ok(manager.status())
624    }
625
626    /// Retry failed index rebuild tasks.
627    ///
628    /// Resets failed tasks back to pending state and returns the task IDs
629    /// that will be retried. Tasks that have exceeded their retry limit
630    /// will not be retried.
631    ///
632    /// # Returns
633    ///
634    /// A vector of task IDs that were scheduled for retry.
635    pub async fn retry_index_rebuilds(&self) -> Result<Vec<String>> {
636        let manager = uni_store::storage::IndexRebuildManager::new(
637            self.storage.clone(),
638            self.schema.clone(),
639            self.config.index_rebuild.clone(),
640        )
641        .await
642        .map_err(UniError::Internal)?;
643
644        let retried = manager.retry_failed().await.map_err(UniError::Internal)?;
645
646        // Start background worker to process the retried tasks
647        if !retried.is_empty() {
648            let manager = std::sync::Arc::new(manager);
649            let handle = manager.start_background_worker(self.shutdown_handle.subscribe());
650            self.shutdown_handle.track_task(handle);
651        }
652
653        Ok(retried)
654    }
655
656    /// Force rebuild indexes for a specific label.
657    ///
658    /// # Arguments
659    ///
660    /// * `label` - The vertex label to rebuild indexes for.
661    /// * `async_` - If true, rebuild in background; if false, block until complete.
662    ///
663    /// # Returns
664    ///
665    /// When `async_` is true, returns the task ID for tracking progress.
666    /// When `async_` is false, returns None after indexes are rebuilt.
667    pub async fn rebuild_indexes(&self, label: &str, async_: bool) -> Result<Option<String>> {
668        if async_ {
669            let manager = uni_store::storage::IndexRebuildManager::new(
670                self.storage.clone(),
671                self.schema.clone(),
672                self.config.index_rebuild.clone(),
673            )
674            .await
675            .map_err(UniError::Internal)?;
676
677            let task_ids = manager
678                .schedule(vec![label.to_string()])
679                .await
680                .map_err(UniError::Internal)?;
681
682            let manager = std::sync::Arc::new(manager);
683            let handle = manager.start_background_worker(self.shutdown_handle.subscribe());
684            self.shutdown_handle.track_task(handle);
685
686            Ok(task_ids.into_iter().next())
687        } else {
688            let idx_mgr = uni_store::storage::IndexManager::new(
689                self.storage.base_path(),
690                self.schema.clone(),
691                self.storage.lancedb_store_arc(),
692            );
693            idx_mgr
694                .rebuild_indexes_for_label(label)
695                .await
696                .map_err(UniError::Internal)?;
697            Ok(None)
698        }
699    }
700
701    /// Check if an index is currently being rebuilt for a label.
702    ///
703    /// Returns true if there is a pending or in-progress index rebuild task
704    /// for the specified label.
705    pub async fn is_index_building(&self, label: &str) -> Result<bool> {
706        let manager = uni_store::storage::IndexRebuildManager::new(
707            self.storage.clone(),
708            self.schema.clone(),
709            self.config.index_rebuild.clone(),
710        )
711        .await
712        .map_err(UniError::Internal)?;
713
714        Ok(manager.is_index_building(label))
715    }
716
717    /// Shutdown the database gracefully, flushing pending data and stopping background tasks.
718    ///
719    /// This method flushes any pending data and waits for all background tasks to complete
720    /// (with a timeout). After calling this method, the database instance should not be used.
721    pub async fn shutdown(self) -> Result<()> {
722        // Flush pending data
723        if let Some(ref writer) = self.writer {
724            let mut w = writer.write().await;
725            if let Err(e) = w.flush_to_l1(None).await {
726                tracing::error!("Error flushing during shutdown: {}", e);
727            }
728        }
729
730        self.shutdown_handle
731            .shutdown_async()
732            .await
733            .map_err(UniError::Internal)
734    }
735}
736
737impl Drop for Uni {
738    fn drop(&mut self) {
739        self.shutdown_handle.shutdown_blocking();
740        tracing::debug!("Uni dropped, shutdown signal sent");
741    }
742}
743
744/// Builder for configuring and opening a `Uni` database instance.
745#[must_use = "builders do nothing until .build() is called"]
746pub struct UniBuilder {
747    uri: String,
748    config: UniConfig,
749    schema_file: Option<PathBuf>,
750    xervo_catalog: Option<Vec<ModelAliasSpec>>,
751    hybrid_remote_url: Option<String>,
752    cloud_config: Option<CloudStorageConfig>,
753    create_if_missing: bool,
754    fail_if_exists: bool,
755}
756
757impl UniBuilder {
758    /// Creates a new builder for the given URI.
759    pub fn new(uri: String) -> Self {
760        Self {
761            uri,
762            config: UniConfig::default(),
763            schema_file: None,
764            xervo_catalog: None,
765            hybrid_remote_url: None,
766            cloud_config: None,
767            create_if_missing: true,
768            fail_if_exists: false,
769        }
770    }
771
772    /// Load schema from JSON file on initialization.
773    pub fn schema_file(mut self, path: impl AsRef<Path>) -> Self {
774        self.schema_file = Some(path.as_ref().to_path_buf());
775        self
776    }
777
778    /// Set Uni-Xervo catalog explicitly.
779    pub fn xervo_catalog(mut self, catalog: Vec<ModelAliasSpec>) -> Self {
780        self.xervo_catalog = Some(catalog);
781        self
782    }
783
784    /// Parse Uni-Xervo catalog from JSON string.
785    pub fn xervo_catalog_from_str(mut self, json: &str) -> Result<Self> {
786        let catalog = uni_xervo::api::catalog_from_str(json)
787            .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?;
788        self.xervo_catalog = Some(catalog);
789        Ok(self)
790    }
791
792    /// Parse Uni-Xervo catalog from a JSON file.
793    pub fn xervo_catalog_from_file(mut self, path: impl AsRef<Path>) -> Result<Self> {
794        let catalog = uni_xervo::api::catalog_from_file(path)
795            .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?;
796        self.xervo_catalog = Some(catalog);
797        Ok(self)
798    }
799
800    /// Configure hybrid storage with a local path for WAL/IDs and a remote URL for data.
801    ///
802    /// This allows fast local writes and metadata operations while storing bulk data
803    /// in object storage (e.g., S3, GCS, Azure Blob Storage).
804    ///
805    /// # Examples
806    ///
807    /// ```ignore
808    /// let db = Uni::open("./local_meta")
809    ///     .hybrid("./local_meta", "s3://my-bucket/graph-data")
810    ///     .build()
811    ///     .await?;
812    /// ```
813    pub fn hybrid(mut self, local_path: impl AsRef<Path>, remote_url: &str) -> Self {
814        self.uri = local_path.as_ref().to_string_lossy().to_string();
815        self.hybrid_remote_url = Some(remote_url.to_string());
816        self
817    }
818
819    /// Configure cloud storage with explicit credentials.
820    ///
821    /// Use this method when you need fine-grained control over cloud storage
822    /// credentials instead of relying on environment variables.
823    ///
824    /// # Examples
825    ///
826    /// ```ignore
827    /// use uni_common::CloudStorageConfig;
828    ///
829    /// let config = CloudStorageConfig::S3 {
830    ///     bucket: "my-bucket".to_string(),
831    ///     region: Some("us-east-1".to_string()),
832    ///     endpoint: Some("http://localhost:4566".to_string()), // LocalStack
833    ///     access_key_id: Some("test".to_string()),
834    ///     secret_access_key: Some("test".to_string()),
835    ///     session_token: None,
836    ///     virtual_hosted_style: false,
837    /// };
838    ///
839    /// let db = Uni::open("./local_meta")
840    ///     .hybrid("./local_meta", "s3://my-bucket/data")
841    ///     .cloud_config(config)
842    ///     .build()
843    ///     .await?;
844    /// ```
845    pub fn cloud_config(mut self, config: CloudStorageConfig) -> Self {
846        self.cloud_config = Some(config);
847        self
848    }
849
850    /// Configure database options using `UniConfig`.
851    pub fn config(mut self, config: UniConfig) -> Self {
852        self.config = config;
853        self
854    }
855
856    /// Set maximum adjacency cache size in bytes.
857    pub fn cache_size(mut self, bytes: usize) -> Self {
858        self.config.cache_size = bytes;
859        self
860    }
861
862    /// Set query parallelism (number of worker threads).
863    pub fn parallelism(mut self, n: usize) -> Self {
864        self.config.parallelism = n;
865        self
866    }
867
868    /// Open the database (async).
869    pub async fn build(self) -> Result<Uni> {
870        let uri = self.uri.clone();
871        let is_remote_uri = uri.contains("://");
872        let is_hybrid = self.hybrid_remote_url.is_some();
873
874        if is_hybrid && is_remote_uri {
875            return Err(UniError::Internal(anyhow::anyhow!(
876                "Hybrid mode requires a local path as primary URI, found: {}",
877                uri
878            )));
879        }
880
881        let (storage_uri, data_store, local_store_opt) = if is_hybrid {
882            let remote_url = self.hybrid_remote_url.as_ref().unwrap();
883
884            // Remote Store (Data) - use explicit cloud_config if provided
885            let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
886                build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
887            } else {
888                let url = url::Url::parse(remote_url).map_err(|e| {
889                    UniError::Io(std::io::Error::new(
890                        std::io::ErrorKind::InvalidInput,
891                        e.to_string(),
892                    ))
893                })?;
894                let (os, _path) =
895                    object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
896                Arc::from(os)
897            };
898
899            // Local Store (WAL, IDs)
900            let path = PathBuf::from(&uri);
901            if path.exists() {
902                if self.fail_if_exists {
903                    return Err(UniError::Internal(anyhow::anyhow!(
904                        "Database already exists at {}",
905                        uri
906                    )));
907                }
908            } else {
909                if !self.create_if_missing {
910                    return Err(UniError::NotFound { path: path.clone() });
911                }
912                std::fs::create_dir_all(&path).map_err(UniError::Io)?;
913            }
914
915            let local_store = Arc::new(
916                LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
917            );
918
919            // For hybrid, storage_uri is the remote URL (since StorageManager loads datasets from there)
920            // But we must provide the correct store to other components manually.
921            (
922                remote_url.clone(),
923                remote_store,
924                Some(local_store as Arc<dyn ObjectStore>),
925            )
926        } else if is_remote_uri {
927            // Remote Only - use explicit cloud_config if provided
928            let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
929                build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
930            } else {
931                let url = url::Url::parse(&uri).map_err(|e| {
932                    UniError::Io(std::io::Error::new(
933                        std::io::ErrorKind::InvalidInput,
934                        e.to_string(),
935                    ))
936                })?;
937                let (os, _path) =
938                    object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
939                Arc::from(os)
940            };
941
942            (uri.clone(), remote_store, None)
943        } else {
944            // Local Only
945            let path = PathBuf::from(&uri);
946            let storage_path = path.join("storage");
947
948            if path.exists() {
949                if self.fail_if_exists {
950                    return Err(UniError::Internal(anyhow::anyhow!(
951                        "Database already exists at {}",
952                        uri
953                    )));
954                }
955            } else {
956                if !self.create_if_missing {
957                    return Err(UniError::NotFound { path: path.clone() });
958                }
959                std::fs::create_dir_all(&path).map_err(UniError::Io)?;
960            }
961
962            // Ensure storage directory exists
963            if !storage_path.exists() {
964                std::fs::create_dir_all(&storage_path).map_err(UniError::Io)?;
965            }
966
967            let store = Arc::new(
968                LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
969            );
970            (
971                storage_path.to_string_lossy().to_string(),
972                store.clone() as Arc<dyn ObjectStore>,
973                Some(store as Arc<dyn ObjectStore>),
974            )
975        };
976
977        // Canonical schema location in metadata catalog.
978        let schema_obj_path = object_store::path::Path::from("catalog/schema.json");
979        // Legacy schema location used by older builds.
980        let legacy_schema_obj_path = object_store::path::Path::from("schema.json");
981
982        // Backward-compatible schema path migration:
983        // if catalog/schema.json is missing but root schema.json exists,
984        // copy root schema.json to catalog/schema.json.
985        let has_catalog_schema = match data_store.get(&schema_obj_path).await {
986            Ok(_) => true,
987            Err(object_store::Error::NotFound { .. }) => false,
988            Err(e) => return Err(UniError::Internal(e.into())),
989        };
990        if !has_catalog_schema {
991            match data_store.get(&legacy_schema_obj_path).await {
992                Ok(result) => {
993                    let bytes = result
994                        .bytes()
995                        .await
996                        .map_err(|e| UniError::Internal(e.into()))?;
997                    data_store
998                        .put(&schema_obj_path, bytes.into())
999                        .await
1000                        .map_err(|e| UniError::Internal(e.into()))?;
1001                    info!(
1002                        legacy = %legacy_schema_obj_path,
1003                        target = %schema_obj_path,
1004                        "Migrated legacy schema path to catalog path"
1005                    );
1006                }
1007                Err(object_store::Error::NotFound { .. }) => {}
1008                Err(e) => return Err(UniError::Internal(e.into())),
1009            }
1010        }
1011
1012        // Load schema (SchemaManager::load creates a default if missing)
1013        // Schema is always in data_store (Remote or Local)
1014        let schema_manager = Arc::new(
1015            SchemaManager::load_from_store(data_store.clone(), &schema_obj_path)
1016                .await
1017                .map_err(UniError::Internal)?,
1018        );
1019
1020        let lancedb_storage_options = self
1021            .cloud_config
1022            .as_ref()
1023            .map(Self::cloud_config_to_lancedb_storage_options);
1024
1025        let storage = if is_hybrid || is_remote_uri {
1026            // Preserve explicit cloud settings (endpoint, credentials, path style)
1027            // by reusing the constructed remote store.
1028            StorageManager::new_with_store_and_storage_options(
1029                &storage_uri,
1030                data_store.clone(),
1031                schema_manager.clone(),
1032                self.config.clone(),
1033                lancedb_storage_options.clone(),
1034            )
1035            .await
1036            .map_err(UniError::Internal)?
1037        } else {
1038            // Local mode keeps using a storage-path-scoped local store.
1039            StorageManager::new_with_config(
1040                &storage_uri,
1041                schema_manager.clone(),
1042                self.config.clone(),
1043            )
1044            .await
1045            .map_err(UniError::Internal)?
1046        };
1047
1048        let storage = Arc::new(storage);
1049
1050        // Create shutdown handle
1051        let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
1052
1053        // Start background compaction with shutdown signal
1054        let compaction_handle = storage
1055            .clone()
1056            .start_background_compaction(shutdown_handle.subscribe());
1057        shutdown_handle.track_task(compaction_handle);
1058
1059        // Initialize property manager
1060        let prop_cache_capacity = self.config.cache_size / 1024;
1061
1062        let prop_manager = Arc::new(PropertyManager::new(
1063            storage.clone(),
1064            schema_manager.clone(),
1065            prop_cache_capacity,
1066        ));
1067
1068        // Setup stores for WAL and IdAllocator (needed for version recovery check)
1069        let id_store = local_store_opt
1070            .clone()
1071            .unwrap_or_else(|| data_store.clone());
1072        let wal_store = local_store_opt
1073            .clone()
1074            .unwrap_or_else(|| data_store.clone());
1075
1076        // Determine start version and WAL high water mark from latest snapshot.
1077        // Detects and recovers from a lost manifest pointer.
1078        let latest_snapshot = storage
1079            .snapshot_manager()
1080            .load_latest_snapshot()
1081            .await
1082            .map_err(UniError::Internal)?;
1083
1084        let (start_version, wal_high_water_mark) = if let Some(ref snapshot) = latest_snapshot {
1085            (
1086                snapshot.version_high_water_mark + 1,
1087                snapshot.wal_high_water_mark,
1088            )
1089        } else {
1090            // No latest snapshot — fresh DB or lost manifest?
1091            let has_manifests = storage
1092                .snapshot_manager()
1093                .has_any_manifests()
1094                .await
1095                .unwrap_or(false);
1096
1097            let wal_check =
1098                WriteAheadLog::new(wal_store.clone(), object_store::path::Path::from("wal"));
1099            let has_wal = wal_check.has_segments().await.unwrap_or(false);
1100
1101            if has_manifests {
1102                // Manifests exist but latest pointer is missing — try to recover from manifests
1103                let snapshot_ids = storage
1104                    .snapshot_manager()
1105                    .list_snapshots()
1106                    .await
1107                    .map_err(UniError::Internal)?;
1108                if let Some(last_id) = snapshot_ids.last() {
1109                    let manifest = storage
1110                        .snapshot_manager()
1111                        .load_snapshot(last_id)
1112                        .await
1113                        .map_err(UniError::Internal)?;
1114                    tracing::warn!(
1115                        "Latest snapshot pointer missing but found manifest '{}'. \
1116                         Recovering version {}.",
1117                        last_id,
1118                        manifest.version_high_water_mark
1119                    );
1120                    (
1121                        manifest.version_high_water_mark + 1,
1122                        manifest.wal_high_water_mark,
1123                    )
1124                } else {
1125                    return Err(UniError::Internal(anyhow::anyhow!(
1126                        "Snapshot manifests directory exists but contains no valid manifests. \
1127                         Possible data corruption."
1128                    )));
1129                }
1130            } else if has_wal {
1131                // WAL exists but no manifests at all — data exists but unrecoverable version
1132                return Err(UniError::Internal(anyhow::anyhow!(
1133                    "Database has WAL segments but no snapshot manifest. \
1134                     Cannot safely determine version counter -- starting at 0 would cause \
1135                     version conflicts and data corruption. \
1136                     Restore the snapshot manifest or delete WAL to start fresh."
1137                )));
1138            } else {
1139                // Truly fresh database
1140                (0, 0)
1141            }
1142        };
1143
1144        let allocator = Arc::new(
1145            IdAllocator::new(
1146                id_store,
1147                object_store::path::Path::from("id_allocator.json"),
1148                1000,
1149            )
1150            .await
1151            .map_err(UniError::Internal)?,
1152        );
1153
1154        let wal = if !self.config.wal_enabled {
1155            // WAL disabled by config
1156            None
1157        } else if is_remote_uri && !is_hybrid {
1158            // Remote-only WAL (ObjectStoreWal)
1159            Some(Arc::new(WriteAheadLog::new(
1160                wal_store,
1161                object_store::path::Path::from("wal"),
1162            )))
1163        } else if is_hybrid || !is_remote_uri {
1164            // Local WAL (using local_store)
1165            // Even if local_store uses ObjectStore trait, it maps to FS.
1166            Some(Arc::new(WriteAheadLog::new(
1167                wal_store,
1168                object_store::path::Path::from("wal"),
1169            )))
1170        } else {
1171            None
1172        };
1173
1174        let writer = Arc::new(RwLock::new(
1175            Writer::new_with_config(
1176                storage.clone(),
1177                schema_manager.clone(),
1178                start_version,
1179                self.config.clone(),
1180                wal,
1181                Some(allocator),
1182            )
1183            .await
1184            .map_err(UniError::Internal)?,
1185        ));
1186
1187        let required_embed_aliases: std::collections::BTreeSet<String> = schema_manager
1188            .schema()
1189            .indexes
1190            .iter()
1191            .filter_map(|idx| {
1192                if let uni_common::core::schema::IndexDefinition::Vector(cfg) = idx {
1193                    cfg.embedding_config.as_ref().map(|emb| emb.alias.clone())
1194                } else {
1195                    None
1196                }
1197            })
1198            .collect();
1199
1200        if !required_embed_aliases.is_empty() && self.xervo_catalog.is_none() {
1201            return Err(UniError::Internal(anyhow::anyhow!(
1202                "Uni-Xervo catalog is required because schema has vector indexes with embedding aliases"
1203            )));
1204        }
1205
1206        let xervo_runtime = if let Some(catalog) = self.xervo_catalog {
1207            for alias in &required_embed_aliases {
1208                let spec = catalog.iter().find(|s| &s.alias == alias).ok_or_else(|| {
1209                    UniError::Internal(anyhow::anyhow!(
1210                        "Missing Uni-Xervo alias '{}' referenced by vector index embedding config",
1211                        alias
1212                    ))
1213                })?;
1214                if spec.task != ModelTask::Embed {
1215                    return Err(UniError::Internal(anyhow::anyhow!(
1216                        "Uni-Xervo alias '{}' must be an embed task",
1217                        alias
1218                    )));
1219                }
1220            }
1221
1222            let mut runtime_builder = ModelRuntime::builder().catalog(catalog);
1223            #[cfg(feature = "provider-candle")]
1224            {
1225                runtime_builder = runtime_builder
1226                    .register_provider(uni_xervo::provider::LocalCandleProvider::new());
1227            }
1228            #[cfg(feature = "provider-fastembed")]
1229            {
1230                runtime_builder = runtime_builder
1231                    .register_provider(uni_xervo::provider::LocalFastEmbedProvider::new());
1232            }
1233            #[cfg(feature = "provider-openai")]
1234            {
1235                runtime_builder = runtime_builder
1236                    .register_provider(uni_xervo::provider::RemoteOpenAIProvider::new());
1237            }
1238            #[cfg(feature = "provider-gemini")]
1239            {
1240                runtime_builder = runtime_builder
1241                    .register_provider(uni_xervo::provider::RemoteGeminiProvider::new());
1242            }
1243            #[cfg(feature = "provider-vertexai")]
1244            {
1245                runtime_builder = runtime_builder
1246                    .register_provider(uni_xervo::provider::RemoteVertexAIProvider::new());
1247            }
1248            #[cfg(feature = "provider-mistral")]
1249            {
1250                runtime_builder = runtime_builder
1251                    .register_provider(uni_xervo::provider::RemoteMistralProvider::new());
1252            }
1253            #[cfg(feature = "provider-anthropic")]
1254            {
1255                runtime_builder = runtime_builder
1256                    .register_provider(uni_xervo::provider::RemoteAnthropicProvider::new());
1257            }
1258            #[cfg(feature = "provider-voyageai")]
1259            {
1260                runtime_builder = runtime_builder
1261                    .register_provider(uni_xervo::provider::RemoteVoyageAIProvider::new());
1262            }
1263            #[cfg(feature = "provider-cohere")]
1264            {
1265                runtime_builder = runtime_builder
1266                    .register_provider(uni_xervo::provider::RemoteCohereProvider::new());
1267            }
1268            #[cfg(feature = "provider-azure-openai")]
1269            {
1270                runtime_builder = runtime_builder
1271                    .register_provider(uni_xervo::provider::RemoteAzureOpenAIProvider::new());
1272            }
1273            #[cfg(feature = "provider-mistralrs")]
1274            {
1275                runtime_builder = runtime_builder
1276                    .register_provider(uni_xervo::provider::LocalMistralRsProvider::new());
1277            }
1278
1279            Some(
1280                runtime_builder
1281                    .build()
1282                    .await
1283                    .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?,
1284            )
1285        } else {
1286            None
1287        };
1288
1289        if let Some(ref runtime) = xervo_runtime {
1290            let mut writer_guard = writer.write().await;
1291            writer_guard.set_xervo_runtime(runtime.clone());
1292        }
1293
1294        // Replay WAL to restore any uncommitted mutations from previous session
1295        // Only replay mutations with LSN > wal_high_water_mark to avoid double-applying
1296        {
1297            let w = writer.read().await;
1298            let replayed = w
1299                .replay_wal(wal_high_water_mark)
1300                .await
1301                .map_err(UniError::Internal)?;
1302            if replayed > 0 {
1303                info!("WAL recovery: replayed {} mutations", replayed);
1304            }
1305        }
1306
1307        // Start background flush checker for time-based auto-flush
1308        if let Some(interval) = self.config.auto_flush_interval {
1309            let writer_clone = writer.clone();
1310            let mut shutdown_rx = shutdown_handle.subscribe();
1311
1312            let handle = tokio::spawn(async move {
1313                let mut ticker = tokio::time::interval(interval);
1314                loop {
1315                    tokio::select! {
1316                        _ = ticker.tick() => {
1317                            let mut w = writer_clone.write().await;
1318                            if let Err(e) = w.check_flush().await {
1319                                tracing::warn!("Background flush check failed: {}", e);
1320                            }
1321                        }
1322                        _ = shutdown_rx.recv() => {
1323                            tracing::info!("Auto-flush shutting down, performing final flush");
1324                            let mut w = writer_clone.write().await;
1325                            let _ = w.flush_to_l1(None).await;
1326                            break;
1327                        }
1328                    }
1329                }
1330            });
1331
1332            shutdown_handle.track_task(handle);
1333        }
1334
1335        Ok(Uni {
1336            storage,
1337            schema: schema_manager,
1338            properties: prop_manager,
1339            writer: Some(writer),
1340            xervo_runtime,
1341            config: self.config,
1342            procedure_registry: Arc::new(uni_query::ProcedureRegistry::new()),
1343            shutdown_handle,
1344        })
1345    }
1346
1347    /// Open the database (blocking)
1348    pub fn build_sync(self) -> Result<Uni> {
1349        let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
1350        rt.block_on(self.build())
1351    }
1352
1353    fn cloud_config_to_lancedb_storage_options(
1354        config: &CloudStorageConfig,
1355    ) -> std::collections::HashMap<String, String> {
1356        let mut opts = std::collections::HashMap::new();
1357
1358        match config {
1359            CloudStorageConfig::S3 {
1360                bucket,
1361                region,
1362                endpoint,
1363                access_key_id,
1364                secret_access_key,
1365                session_token,
1366                virtual_hosted_style,
1367            } => {
1368                opts.insert("bucket".to_string(), bucket.clone());
1369                opts.insert(
1370                    "virtual_hosted_style_request".to_string(),
1371                    virtual_hosted_style.to_string(),
1372                );
1373
1374                if let Some(r) = region {
1375                    opts.insert("region".to_string(), r.clone());
1376                }
1377                if let Some(ep) = endpoint {
1378                    opts.insert("endpoint".to_string(), ep.clone());
1379                    if ep.starts_with("http://") {
1380                        opts.insert("allow_http".to_string(), "true".to_string());
1381                    }
1382                }
1383                if let Some(v) = access_key_id {
1384                    opts.insert("access_key_id".to_string(), v.clone());
1385                }
1386                if let Some(v) = secret_access_key {
1387                    opts.insert("secret_access_key".to_string(), v.clone());
1388                }
1389                if let Some(v) = session_token {
1390                    opts.insert("session_token".to_string(), v.clone());
1391                }
1392            }
1393            CloudStorageConfig::Gcs {
1394                bucket,
1395                service_account_path,
1396                service_account_key,
1397            } => {
1398                opts.insert("bucket".to_string(), bucket.clone());
1399                if let Some(v) = service_account_path {
1400                    opts.insert("service_account".to_string(), v.clone());
1401                    opts.insert("application_credentials".to_string(), v.clone());
1402                }
1403                if let Some(v) = service_account_key {
1404                    opts.insert("service_account_key".to_string(), v.clone());
1405                }
1406            }
1407            CloudStorageConfig::Azure {
1408                container,
1409                account,
1410                access_key,
1411                sas_token,
1412            } => {
1413                opts.insert("account_name".to_string(), account.clone());
1414                opts.insert("container_name".to_string(), container.clone());
1415                if let Some(v) = access_key {
1416                    opts.insert("access_key".to_string(), v.clone());
1417                }
1418                if let Some(v) = sas_token {
1419                    opts.insert("sas_token".to_string(), v.clone());
1420                }
1421            }
1422        }
1423
1424        opts
1425    }
1426}