Skip to main content

uni_db/api/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::time::Duration;
7
8pub mod builder;
9pub mod bulk;
10pub mod impl_locy;
11pub mod impl_query;
12pub mod locy_builder;
13pub mod query_builder;
14pub mod schema;
15pub mod session;
16pub mod sync;
17pub mod transaction;
18pub mod xervo;
19
20use object_store::ObjectStore;
21use object_store::local::LocalFileSystem;
22use tracing::info;
23use uni_common::core::snapshot::SnapshotManifest;
24use uni_common::{CloudStorageConfig, UniConfig};
25use uni_common::{Result, UniError};
26use uni_store::cloud::build_cloud_store;
27use uni_xervo::api::{ModelAliasSpec, ModelTask};
28use uni_xervo::runtime::ModelRuntime;
29
30use uni_common::core::schema::SchemaManager;
31use uni_store::runtime::id_allocator::IdAllocator;
32use uni_store::runtime::property_manager::PropertyManager;
33use uni_store::runtime::wal::WriteAheadLog;
34use uni_store::storage::manager::StorageManager;
35
36use tokio::sync::RwLock;
37use uni_store::runtime::writer::Writer;
38
39use crate::shutdown::ShutdownHandle;
40
41/// Main entry point for Uni embedded database.
42///
43/// Handles storage, schema, and query execution. It coordinates the various
44/// subsystems including the storage engine, query executor, and graph algorithms.
45///
46/// # Examples
47///
48/// ## Local Usage
49/// ```no_run
50/// use uni_db::Uni;
51///
52/// #[tokio::main]
53/// async fn main() -> Result<(), uni_db::UniError> {
54///     let db = Uni::open("./my_db")
55///         .build()
56///         .await?;
57///
58///     // Run a query
59///     let results = db.query("MATCH (n) RETURN count(n)").await?;
60///     println!("Count: {:?}", results);
61///     Ok(())
62/// }
63/// ```
64///
65/// ## Hybrid Storage (S3 + Local)
66/// Store bulk data and catalog metadata in S3 (or GCS/Azure) while keeping WAL/ID allocation local.
67///
68/// ```no_run
69/// use uni_db::Uni;
70///
71/// #[tokio::main]
72/// async fn main() -> Result<(), uni_db::UniError> {
73///     // Requires `object_store` features enabled (aws, gcp, azure)
74///     let db = Uni::open("./local_meta")
75///         .hybrid("./local_meta", "s3://my-bucket/graph-data")
76///         .build()
77///         .await?;
78///     Ok(())
79/// }
80/// ```
81pub struct Uni {
82    pub(crate) storage: Arc<StorageManager>,
83    pub(crate) schema: Arc<SchemaManager>,
84    pub(crate) properties: Arc<PropertyManager>,
85    pub(crate) writer: Option<Arc<RwLock<Writer>>>,
86    pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
87    pub(crate) config: UniConfig,
88    pub(crate) procedure_registry: Arc<uni_query::ProcedureRegistry>,
89    pub(crate) shutdown_handle: Arc<ShutdownHandle>,
90    /// Session-level registry of pre-compiled Locy rules.
91    ///
92    /// Populated by `locy().compile()` and merged into subsequent `evaluate()`
93    /// calls so that rules defined once persist across multiple evaluations.
94    pub(crate) locy_rule_registry: Arc<std::sync::RwLock<impl_locy::LocyRuleRegistry>>,
95}
96
97impl Uni {
98    /// Open or create a database at the given path.
99    ///
100    /// If the database does not exist, it will be created.
101    ///
102    /// # Arguments
103    ///
104    /// * `uri` - Local path or object store URI.
105    ///
106    /// # Returns
107    ///
108    /// A [`UniBuilder`] to configure and build the database instance.
109    pub fn open(uri: impl Into<String>) -> UniBuilder {
110        UniBuilder::new(uri.into())
111    }
112
113    /// Open an existing database at the given path. Fails if it does not exist.
114    pub fn open_existing(uri: impl Into<String>) -> UniBuilder {
115        let mut builder = UniBuilder::new(uri.into());
116        builder.create_if_missing = false;
117        builder
118    }
119
120    /// Create a new database at the given path. Fails if it already exists.
121    pub fn create(uri: impl Into<String>) -> UniBuilder {
122        let mut builder = UniBuilder::new(uri.into());
123        builder.fail_if_exists = true;
124        builder
125    }
126
127    /// Create a temporary database that is deleted when dropped.
128    ///
129    /// Useful for tests and short-lived processing.
130    /// Note: Currently uses a temporary directory on the filesystem.
131    pub fn temporary() -> UniBuilder {
132        let temp_dir = std::env::temp_dir().join(format!("uni_mem_{}", uuid::Uuid::new_v4()));
133        UniBuilder::new(temp_dir.to_string_lossy().to_string())
134    }
135
136    /// Open an in-memory database (alias for temporary).
137    pub fn in_memory() -> UniBuilder {
138        Self::temporary()
139    }
140
141    /// Open a point-in-time view of the database at the given snapshot.
142    ///
143    /// This returns a new `Uni` instance that is pinned to the specified snapshot state.
144    /// The returned instance is read-only. Used internally by Cypher `VERSION AS OF` / `TIMESTAMP AS OF`.
145    pub(crate) async fn at_snapshot(&self, snapshot_id: &str) -> Result<Uni> {
146        let manifest = self
147            .storage
148            .snapshot_manager()
149            .load_snapshot(snapshot_id)
150            .await
151            .map_err(UniError::Internal)?;
152
153        let pinned_storage = Arc::new(self.storage.pinned(manifest));
154
155        let prop_manager = Arc::new(PropertyManager::new(
156            pinned_storage.clone(),
157            self.schema.clone(),
158            self.properties.cache_size(),
159        ));
160
161        // Create a shutdown handle for this read-only snapshot view
162        // (no background tasks, but needed for consistency)
163        let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
164
165        Ok(Uni {
166            storage: pinned_storage,
167            schema: self.schema.clone(),
168            properties: prop_manager,
169            writer: None,
170            xervo_runtime: self.xervo_runtime.clone(),
171            config: self.config.clone(),
172            procedure_registry: self.procedure_registry.clone(),
173            shutdown_handle,
174            locy_rule_registry: Arc::new(std::sync::RwLock::new(
175                impl_locy::LocyRuleRegistry::default(),
176            )),
177        })
178    }
179
180    /// Get configuration
181    pub fn config(&self) -> &UniConfig {
182        &self.config
183    }
184
185    /// Returns the procedure registry for registering test procedures.
186    pub fn procedure_registry(&self) -> &Arc<uni_query::ProcedureRegistry> {
187        &self.procedure_registry
188    }
189
190    /// Get current schema (read-only snapshot)
191    pub fn get_schema(&self) -> Arc<uni_common::core::schema::Schema> {
192        self.schema.schema()
193    }
194
195    /// Create a bulk writer for efficient data loading.
196    pub fn bulk_writer(&self) -> bulk::BulkWriterBuilder<'_> {
197        bulk::BulkWriterBuilder::new(self)
198    }
199
200    /// Create a session builder for scoped query context.
201    pub fn session(&self) -> session::SessionBuilder<'_> {
202        session::SessionBuilder::new(self)
203    }
204
205    /// Get schema manager
206    #[doc(hidden)]
207    pub fn schema_manager(&self) -> Arc<SchemaManager> {
208        self.schema.clone()
209    }
210
211    #[doc(hidden)]
212    pub fn writer(&self) -> Option<Arc<RwLock<Writer>>> {
213        self.writer.clone()
214    }
215
216    #[doc(hidden)]
217    pub fn storage(&self) -> Arc<StorageManager> {
218        self.storage.clone()
219    }
220
221    /// Flush all uncommitted changes to persistent storage (L1).
222    ///
223    /// This forces a write of the current in-memory buffer (L0) to columnar files.
224    /// It also creates a new snapshot.
225    pub async fn flush(&self) -> Result<()> {
226        if let Some(writer_lock) = &self.writer {
227            let mut writer = writer_lock.write().await;
228            writer
229                .flush_to_l1(None)
230                .await
231                .map(|_| ())
232                .map_err(UniError::Internal)
233        } else {
234            Err(UniError::ReadOnly {
235                operation: "flush".to_string(),
236            })
237        }
238    }
239
240    /// Create a named point-in-time snapshot of the database.
241    ///
242    /// This flushes current changes and records the state.
243    /// Returns the snapshot ID.
244    pub async fn create_snapshot(&self, name: Option<&str>) -> Result<String> {
245        if let Some(writer_lock) = &self.writer {
246            let mut writer = writer_lock.write().await;
247            writer
248                .flush_to_l1(name.map(|s| s.to_string()))
249                .await
250                .map_err(UniError::Internal)
251        } else {
252            Err(UniError::ReadOnly {
253                operation: "create_snapshot".to_string(),
254            })
255        }
256    }
257
258    /// Create a persisted named snapshot that can be retrieved later.
259    pub async fn create_named_snapshot(&self, name: &str) -> Result<String> {
260        if name.is_empty() {
261            return Err(UniError::Internal(anyhow::anyhow!(
262                "Snapshot name cannot be empty"
263            )));
264        }
265
266        let snapshot_id = self.create_snapshot(Some(name)).await?;
267
268        self.storage
269            .snapshot_manager()
270            .save_named_snapshot(name, &snapshot_id)
271            .await
272            .map_err(UniError::Internal)?;
273
274        Ok(snapshot_id)
275    }
276
277    /// List all available snapshots.
278    pub async fn list_snapshots(&self) -> Result<Vec<SnapshotManifest>> {
279        let sm = self.storage.snapshot_manager();
280        let ids = sm.list_snapshots().await.map_err(UniError::Internal)?;
281        let mut manifests = Vec::new();
282        for id in ids {
283            if let Ok(m) = sm.load_snapshot(&id).await {
284                manifests.push(m);
285            }
286        }
287        Ok(manifests)
288    }
289
290    /// Restore the database to a specific snapshot.
291    ///
292    /// **Note**: This currently requires a restart or re-opening of Uni to fully take effect
293    /// as it only updates the latest pointer.
294    pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
295        self.storage
296            .snapshot_manager()
297            .set_latest_snapshot(snapshot_id)
298            .await
299            .map_err(UniError::Internal)
300    }
301
302    /// Check if a label exists in the schema.
303    pub async fn label_exists(&self, name: &str) -> Result<bool> {
304        Ok(self.schema.schema().labels.get(name).is_some_and(|l| {
305            matches!(
306                l.state,
307                uni_common::core::schema::SchemaElementState::Active
308            )
309        }))
310    }
311
312    /// Check if an edge type exists in the schema.
313    pub async fn edge_type_exists(&self, name: &str) -> Result<bool> {
314        Ok(self.schema.schema().edge_types.get(name).is_some_and(|e| {
315            matches!(
316                e.state,
317                uni_common::core::schema::SchemaElementState::Active
318            )
319        }))
320    }
321
322    /// Get all label names.
323    /// Returns the union of schema-registered labels (Active state) and labels
324    /// discovered from data (for schemaless mode where labels may not be in the
325    /// schema). This is consistent with `list_edge_types()` for schema labels
326    /// while also supporting schemaless workflows.
327    pub async fn list_labels(&self) -> Result<Vec<String>> {
328        let mut all_labels = std::collections::HashSet::new();
329
330        // Schema labels (covers schema-defined labels that may not have data yet)
331        for (name, label) in self.schema.schema().labels.iter() {
332            if matches!(
333                label.state,
334                uni_common::core::schema::SchemaElementState::Active
335            ) {
336                all_labels.insert(name.clone());
337            }
338        }
339
340        // Data labels (covers schemaless labels that aren't in the schema)
341        let query = "MATCH (n) RETURN DISTINCT labels(n) AS labels";
342        let result = self.query(query).await?;
343        for row in &result.rows {
344            if let Ok(labels_list) = row.get::<Vec<String>>("labels") {
345                for label in labels_list {
346                    all_labels.insert(label);
347                }
348            }
349        }
350
351        Ok(all_labels.into_iter().collect())
352    }
353
354    /// Get all edge type names.
355    pub async fn list_edge_types(&self) -> Result<Vec<String>> {
356        Ok(self
357            .schema
358            .schema()
359            .edge_types
360            .iter()
361            .filter(|(_, e)| {
362                matches!(
363                    e.state,
364                    uni_common::core::schema::SchemaElementState::Active
365                )
366            })
367            .map(|(name, _)| name.clone())
368            .collect())
369    }
370
371    /// Get detailed information about a label.
372    pub async fn get_label_info(
373        &self,
374        name: &str,
375    ) -> Result<Option<crate::api::schema::LabelInfo>> {
376        let schema = self.schema.schema();
377        if schema.labels.contains_key(name) {
378            let count = if let Ok(ds) = self.storage.vertex_dataset(name) {
379                if let Ok(raw) = ds.open_raw().await {
380                    raw.count_rows(None)
381                        .await
382                        .map_err(|e| UniError::Internal(anyhow::anyhow!(e)))?
383                } else {
384                    0
385                }
386            } else {
387                0
388            };
389
390            let mut properties = Vec::new();
391            if let Some(props) = schema.properties.get(name) {
392                for (prop_name, prop_meta) in props {
393                    let is_indexed = schema.indexes.iter().any(|idx| match idx {
394                        uni_common::core::schema::IndexDefinition::Vector(v) => {
395                            v.label == name && v.property == *prop_name
396                        }
397                        uni_common::core::schema::IndexDefinition::Scalar(s) => {
398                            s.label == name && s.properties.contains(prop_name)
399                        }
400                        uni_common::core::schema::IndexDefinition::FullText(f) => {
401                            f.label == name && f.properties.contains(prop_name)
402                        }
403                        uni_common::core::schema::IndexDefinition::Inverted(inv) => {
404                            inv.label == name && inv.property == *prop_name
405                        }
406                        uni_common::core::schema::IndexDefinition::JsonFullText(j) => {
407                            j.label == name
408                        }
409                        _ => false,
410                    });
411
412                    properties.push(crate::api::schema::PropertyInfo {
413                        name: prop_name.clone(),
414                        data_type: format!("{:?}", prop_meta.r#type),
415                        nullable: prop_meta.nullable,
416                        is_indexed,
417                    });
418                }
419            }
420
421            let mut indexes = Vec::new();
422            for idx in schema.indexes.iter().filter(|i| i.label() == name) {
423                use uni_common::core::schema::IndexDefinition;
424                let (idx_type, idx_props) = match idx {
425                    IndexDefinition::Vector(v) => ("VECTOR", vec![v.property.clone()]),
426                    IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
427                    IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
428                    IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
429                    IndexDefinition::JsonFullText(j) => ("JSON_FTS", vec![j.column.clone()]),
430                    _ => continue,
431                };
432
433                indexes.push(crate::api::schema::IndexInfo {
434                    name: idx.name().to_string(),
435                    index_type: idx_type.to_string(),
436                    properties: idx_props,
437                    status: "ONLINE".to_string(), // TODO: Check actual status
438                });
439            }
440
441            let mut constraints = Vec::new();
442            for c in &schema.constraints {
443                if let uni_common::core::schema::ConstraintTarget::Label(l) = &c.target
444                    && l == name
445                {
446                    let (ctype, cprops) = match &c.constraint_type {
447                        uni_common::core::schema::ConstraintType::Unique { properties } => {
448                            ("UNIQUE", properties.clone())
449                        }
450                        uni_common::core::schema::ConstraintType::Exists { property } => {
451                            ("EXISTS", vec![property.clone()])
452                        }
453                        uni_common::core::schema::ConstraintType::Check { expression } => {
454                            ("CHECK", vec![expression.clone()])
455                        }
456                        _ => ("UNKNOWN", vec![]),
457                    };
458
459                    constraints.push(crate::api::schema::ConstraintInfo {
460                        name: c.name.clone(),
461                        constraint_type: ctype.to_string(),
462                        properties: cprops,
463                        enabled: c.enabled,
464                    });
465                }
466            }
467
468            Ok(Some(crate::api::schema::LabelInfo {
469                name: name.to_string(),
470                count,
471                properties,
472                indexes,
473                constraints,
474            }))
475        } else {
476            Ok(None)
477        }
478    }
479
480    /// Manually trigger compaction for a specific label.
481    ///
482    /// Compaction merges multiple L1 files into larger files to improve read performance.
483    pub async fn compact_label(
484        &self,
485        label: &str,
486    ) -> Result<uni_store::compaction::CompactionStats> {
487        self.storage
488            .compact_label(label)
489            .await
490            .map_err(UniError::Internal)
491    }
492
493    /// Manually trigger compaction for a specific edge type.
494    pub async fn compact_edge_type(
495        &self,
496        edge_type: &str,
497    ) -> Result<uni_store::compaction::CompactionStats> {
498        self.storage
499            .compact_edge_type(edge_type)
500            .await
501            .map_err(UniError::Internal)
502    }
503
504    /// Wait for any ongoing compaction to complete.
505    ///
506    /// Useful for tests or ensuring consistent performance before benchmarks.
507    pub async fn wait_for_compaction(&self) -> Result<()> {
508        self.storage
509            .wait_for_compaction()
510            .await
511            .map_err(UniError::Internal)
512    }
513
514    /// Bulk insert vertices for a given label.
515    ///
516    /// This is a low-level API intended for bulk loading and benchmarking.
517    /// Each properties map should contain all property values as JSON.
518    ///
519    /// Returns the allocated VIDs in the same order as the input.
520    pub async fn bulk_insert_vertices(
521        &self,
522        label: &str,
523        properties_list: Vec<uni_common::Properties>,
524    ) -> Result<Vec<uni_common::core::id::Vid>> {
525        let schema = self.schema.schema();
526        // Validate label exists in schema
527        schema
528            .labels
529            .get(label)
530            .ok_or_else(|| UniError::LabelNotFound {
531                label: label.to_string(),
532            })?;
533        if let Some(writer_lock) = &self.writer {
534            let mut writer = writer_lock.write().await;
535
536            if properties_list.is_empty() {
537                return Ok(Vec::new());
538            }
539
540            // NEW: Batch allocate VIDs (single call)
541            let vids = writer
542                .allocate_vids(properties_list.len())
543                .await
544                .map_err(UniError::Internal)?;
545
546            // NEW: Use optimized batch insert
547            let _props = writer
548                .insert_vertices_batch(vids.clone(), properties_list, vec![label.to_string()])
549                .await
550                .map_err(UniError::Internal)?;
551
552            Ok(vids)
553        } else {
554            Err(UniError::ReadOnly {
555                operation: "bulk_insert_vertices".to_string(),
556            })
557        }
558    }
559
560    /// Bulk insert edges for a given edge type using pre-allocated VIDs.
561    ///
562    /// This is a low-level API intended for bulk loading and benchmarking.
563    /// Each tuple is (src_vid, dst_vid, properties).
564    pub async fn bulk_insert_edges(
565        &self,
566        edge_type: &str,
567        edges: Vec<(
568            uni_common::core::id::Vid,
569            uni_common::core::id::Vid,
570            uni_common::Properties,
571        )>,
572    ) -> Result<()> {
573        let schema = self.schema.schema();
574        let edge_meta =
575            schema
576                .edge_types
577                .get(edge_type)
578                .ok_or_else(|| UniError::EdgeTypeNotFound {
579                    edge_type: edge_type.to_string(),
580                })?;
581        let type_id = edge_meta.id;
582
583        if let Some(writer_lock) = &self.writer {
584            let mut writer = writer_lock.write().await;
585
586            for (src_vid, dst_vid, props) in edges {
587                let eid = writer.next_eid(type_id).await.map_err(UniError::Internal)?;
588                writer
589                    .insert_edge(
590                        src_vid,
591                        dst_vid,
592                        type_id,
593                        eid,
594                        props,
595                        Some(edge_type.to_string()),
596                    )
597                    .await
598                    .map_err(UniError::Internal)?;
599            }
600
601            Ok(())
602        } else {
603            Err(UniError::ReadOnly {
604                operation: "bulk_insert_edges".to_string(),
605            })
606        }
607    }
608
609    /// Get the status of background index rebuild tasks.
610    ///
611    /// Returns all tracked index rebuild tasks, including pending, in-progress,
612    /// completed, and failed tasks. Use this to monitor progress of async
613    /// index rebuilds started via `BulkWriter::commit()` with `async_indexes(true)`.
614    ///
615    /// # Example
616    ///
617    /// ```ignore
618    /// let status = db.index_rebuild_status().await?;
619    /// for task in status {
620    ///     println!("Label: {}, Status: {:?}", task.label, task.status);
621    /// }
622    /// ```
623    pub async fn index_rebuild_status(&self) -> Result<Vec<uni_store::storage::IndexRebuildTask>> {
624        let manager = uni_store::storage::IndexRebuildManager::new(
625            self.storage.clone(),
626            self.schema.clone(),
627            self.config.index_rebuild.clone(),
628        )
629        .await
630        .map_err(UniError::Internal)?;
631
632        Ok(manager.status())
633    }
634
635    /// Retry failed index rebuild tasks.
636    ///
637    /// Resets failed tasks back to pending state and returns the task IDs
638    /// that will be retried. Tasks that have exceeded their retry limit
639    /// will not be retried.
640    ///
641    /// # Returns
642    ///
643    /// A vector of task IDs that were scheduled for retry.
644    pub async fn retry_index_rebuilds(&self) -> Result<Vec<String>> {
645        let manager = uni_store::storage::IndexRebuildManager::new(
646            self.storage.clone(),
647            self.schema.clone(),
648            self.config.index_rebuild.clone(),
649        )
650        .await
651        .map_err(UniError::Internal)?;
652
653        let retried = manager.retry_failed().await.map_err(UniError::Internal)?;
654
655        // Start background worker to process the retried tasks
656        if !retried.is_empty() {
657            let manager = std::sync::Arc::new(manager);
658            let handle = manager.start_background_worker(self.shutdown_handle.subscribe());
659            self.shutdown_handle.track_task(handle);
660        }
661
662        Ok(retried)
663    }
664
665    /// Force rebuild indexes for a specific label.
666    ///
667    /// # Arguments
668    ///
669    /// * `label` - The vertex label to rebuild indexes for.
670    /// * `async_` - If true, rebuild in background; if false, block until complete.
671    ///
672    /// # Returns
673    ///
674    /// When `async_` is true, returns the task ID for tracking progress.
675    /// When `async_` is false, returns None after indexes are rebuilt.
676    pub async fn rebuild_indexes(&self, label: &str, async_: bool) -> Result<Option<String>> {
677        if async_ {
678            let manager = uni_store::storage::IndexRebuildManager::new(
679                self.storage.clone(),
680                self.schema.clone(),
681                self.config.index_rebuild.clone(),
682            )
683            .await
684            .map_err(UniError::Internal)?;
685
686            let task_ids = manager
687                .schedule(vec![label.to_string()])
688                .await
689                .map_err(UniError::Internal)?;
690
691            let manager = std::sync::Arc::new(manager);
692            let handle = manager.start_background_worker(self.shutdown_handle.subscribe());
693            self.shutdown_handle.track_task(handle);
694
695            Ok(task_ids.into_iter().next())
696        } else {
697            let idx_mgr = uni_store::storage::IndexManager::new(
698                self.storage.base_path(),
699                self.schema.clone(),
700                self.storage.lancedb_store_arc(),
701            );
702            idx_mgr
703                .rebuild_indexes_for_label(label)
704                .await
705                .map_err(UniError::Internal)?;
706            Ok(None)
707        }
708    }
709
710    /// Check if an index is currently being rebuilt for a label.
711    ///
712    /// Returns true if there is a pending or in-progress index rebuild task
713    /// for the specified label.
714    pub async fn is_index_building(&self, label: &str) -> Result<bool> {
715        let manager = uni_store::storage::IndexRebuildManager::new(
716            self.storage.clone(),
717            self.schema.clone(),
718            self.config.index_rebuild.clone(),
719        )
720        .await
721        .map_err(UniError::Internal)?;
722
723        Ok(manager.is_index_building(label))
724    }
725
726    /// List all indexes defined on a specific label.
727    pub fn list_indexes(&self, label: &str) -> Vec<uni_common::core::schema::IndexDefinition> {
728        let schema = self.schema.schema();
729        schema
730            .indexes
731            .iter()
732            .filter(|i| i.label() == label)
733            .cloned()
734            .collect()
735    }
736
737    /// List all indexes in the database.
738    pub fn list_all_indexes(&self) -> Vec<uni_common::core::schema::IndexDefinition> {
739        self.schema.schema().indexes.clone()
740    }
741
742    /// Shutdown the database gracefully, flushing pending data and stopping background tasks.
743    ///
744    /// This method flushes any pending data and waits for all background tasks to complete
745    /// (with a timeout). After calling this method, the database instance should not be used.
746    pub async fn shutdown(self) -> Result<()> {
747        // Flush pending data
748        if let Some(ref writer) = self.writer {
749            let mut w = writer.write().await;
750            if let Err(e) = w.flush_to_l1(None).await {
751                tracing::error!("Error flushing during shutdown: {}", e);
752            }
753        }
754
755        self.shutdown_handle
756            .shutdown_async()
757            .await
758            .map_err(UniError::Internal)
759    }
760}
761
762impl Drop for Uni {
763    fn drop(&mut self) {
764        self.shutdown_handle.shutdown_blocking();
765        tracing::debug!("Uni dropped, shutdown signal sent");
766    }
767}
768
769/// Builder for configuring and opening a `Uni` database instance.
770#[must_use = "builders do nothing until .build() is called"]
771pub struct UniBuilder {
772    uri: String,
773    config: UniConfig,
774    schema_file: Option<PathBuf>,
775    xervo_catalog: Option<Vec<ModelAliasSpec>>,
776    hybrid_remote_url: Option<String>,
777    cloud_config: Option<CloudStorageConfig>,
778    create_if_missing: bool,
779    fail_if_exists: bool,
780}
781
782impl UniBuilder {
783    /// Creates a new builder for the given URI.
784    pub fn new(uri: String) -> Self {
785        Self {
786            uri,
787            config: UniConfig::default(),
788            schema_file: None,
789            xervo_catalog: None,
790            hybrid_remote_url: None,
791            cloud_config: None,
792            create_if_missing: true,
793            fail_if_exists: false,
794        }
795    }
796
797    /// Load schema from JSON file on initialization.
798    pub fn schema_file(mut self, path: impl AsRef<Path>) -> Self {
799        self.schema_file = Some(path.as_ref().to_path_buf());
800        self
801    }
802
803    /// Set Uni-Xervo catalog explicitly.
804    pub fn xervo_catalog(mut self, catalog: Vec<ModelAliasSpec>) -> Self {
805        self.xervo_catalog = Some(catalog);
806        self
807    }
808
809    /// Parse Uni-Xervo catalog from JSON string.
810    pub fn xervo_catalog_from_str(mut self, json: &str) -> Result<Self> {
811        let catalog = uni_xervo::api::catalog_from_str(json)
812            .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?;
813        self.xervo_catalog = Some(catalog);
814        Ok(self)
815    }
816
817    /// Parse Uni-Xervo catalog from a JSON file.
818    pub fn xervo_catalog_from_file(mut self, path: impl AsRef<Path>) -> Result<Self> {
819        let catalog = uni_xervo::api::catalog_from_file(path)
820            .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?;
821        self.xervo_catalog = Some(catalog);
822        Ok(self)
823    }
824
825    /// Configure hybrid storage with a local path for WAL/IDs and a remote URL for data.
826    ///
827    /// This allows fast local writes and metadata operations while storing bulk data
828    /// in object storage (e.g., S3, GCS, Azure Blob Storage).
829    ///
830    /// # Examples
831    ///
832    /// ```ignore
833    /// let db = Uni::open("./local_meta")
834    ///     .hybrid("./local_meta", "s3://my-bucket/graph-data")
835    ///     .build()
836    ///     .await?;
837    /// ```
838    pub fn hybrid(mut self, local_path: impl AsRef<Path>, remote_url: &str) -> Self {
839        self.uri = local_path.as_ref().to_string_lossy().to_string();
840        self.hybrid_remote_url = Some(remote_url.to_string());
841        self
842    }
843
844    /// Configure cloud storage with explicit credentials.
845    ///
846    /// Use this method when you need fine-grained control over cloud storage
847    /// credentials instead of relying on environment variables.
848    ///
849    /// # Examples
850    ///
851    /// ```ignore
852    /// use uni_common::CloudStorageConfig;
853    ///
854    /// let config = CloudStorageConfig::S3 {
855    ///     bucket: "my-bucket".to_string(),
856    ///     region: Some("us-east-1".to_string()),
857    ///     endpoint: Some("http://localhost:4566".to_string()), // LocalStack
858    ///     access_key_id: Some("test".to_string()),
859    ///     secret_access_key: Some("test".to_string()),
860    ///     session_token: None,
861    ///     virtual_hosted_style: false,
862    /// };
863    ///
864    /// let db = Uni::open("./local_meta")
865    ///     .hybrid("./local_meta", "s3://my-bucket/data")
866    ///     .cloud_config(config)
867    ///     .build()
868    ///     .await?;
869    /// ```
870    pub fn cloud_config(mut self, config: CloudStorageConfig) -> Self {
871        self.cloud_config = Some(config);
872        self
873    }
874
875    /// Configure database options using `UniConfig`.
876    pub fn config(mut self, config: UniConfig) -> Self {
877        self.config = config;
878        self
879    }
880
881    /// Set maximum adjacency cache size in bytes.
882    pub fn cache_size(mut self, bytes: usize) -> Self {
883        self.config.cache_size = bytes;
884        self
885    }
886
887    /// Set query parallelism (number of worker threads).
888    pub fn parallelism(mut self, n: usize) -> Self {
889        self.config.parallelism = n;
890        self
891    }
892
893    /// Open the database (async).
894    pub async fn build(self) -> Result<Uni> {
895        let uri = self.uri.clone();
896        let is_remote_uri = uri.contains("://");
897        let is_hybrid = self.hybrid_remote_url.is_some();
898
899        if is_hybrid && is_remote_uri {
900            return Err(UniError::Internal(anyhow::anyhow!(
901                "Hybrid mode requires a local path as primary URI, found: {}",
902                uri
903            )));
904        }
905
906        let (storage_uri, data_store, local_store_opt) = if is_hybrid {
907            let remote_url = self.hybrid_remote_url.as_ref().unwrap();
908
909            // Remote Store (Data) - use explicit cloud_config if provided
910            let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
911                build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
912            } else {
913                let url = url::Url::parse(remote_url).map_err(|e| {
914                    UniError::Io(std::io::Error::new(
915                        std::io::ErrorKind::InvalidInput,
916                        e.to_string(),
917                    ))
918                })?;
919                let (os, _path) =
920                    object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
921                Arc::from(os)
922            };
923
924            // Local Store (WAL, IDs)
925            let path = PathBuf::from(&uri);
926            if path.exists() {
927                if self.fail_if_exists {
928                    return Err(UniError::Internal(anyhow::anyhow!(
929                        "Database already exists at {}",
930                        uri
931                    )));
932                }
933            } else {
934                if !self.create_if_missing {
935                    return Err(UniError::NotFound { path: path.clone() });
936                }
937                std::fs::create_dir_all(&path).map_err(UniError::Io)?;
938            }
939
940            let local_store = Arc::new(
941                LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
942            );
943
944            // For hybrid, storage_uri is the remote URL (since StorageManager loads datasets from there)
945            // But we must provide the correct store to other components manually.
946            (
947                remote_url.clone(),
948                remote_store,
949                Some(local_store as Arc<dyn ObjectStore>),
950            )
951        } else if is_remote_uri {
952            // Remote Only - use explicit cloud_config if provided
953            let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
954                build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
955            } else {
956                let url = url::Url::parse(&uri).map_err(|e| {
957                    UniError::Io(std::io::Error::new(
958                        std::io::ErrorKind::InvalidInput,
959                        e.to_string(),
960                    ))
961                })?;
962                let (os, _path) =
963                    object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
964                Arc::from(os)
965            };
966
967            (uri.clone(), remote_store, None)
968        } else {
969            // Local Only
970            let path = PathBuf::from(&uri);
971            let storage_path = path.join("storage");
972
973            if path.exists() {
974                if self.fail_if_exists {
975                    return Err(UniError::Internal(anyhow::anyhow!(
976                        "Database already exists at {}",
977                        uri
978                    )));
979                }
980            } else {
981                if !self.create_if_missing {
982                    return Err(UniError::NotFound { path: path.clone() });
983                }
984                std::fs::create_dir_all(&path).map_err(UniError::Io)?;
985            }
986
987            // Ensure storage directory exists
988            if !storage_path.exists() {
989                std::fs::create_dir_all(&storage_path).map_err(UniError::Io)?;
990            }
991
992            let store = Arc::new(
993                LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
994            );
995            (
996                storage_path.to_string_lossy().to_string(),
997                store.clone() as Arc<dyn ObjectStore>,
998                Some(store as Arc<dyn ObjectStore>),
999            )
1000        };
1001
1002        // Canonical schema location in metadata catalog.
1003        let schema_obj_path = object_store::path::Path::from("catalog/schema.json");
1004        // Legacy schema location used by older builds.
1005        let legacy_schema_obj_path = object_store::path::Path::from("schema.json");
1006
1007        // Backward-compatible schema path migration:
1008        // if catalog/schema.json is missing but root schema.json exists,
1009        // copy root schema.json to catalog/schema.json.
1010        let has_catalog_schema = match data_store.get(&schema_obj_path).await {
1011            Ok(_) => true,
1012            Err(object_store::Error::NotFound { .. }) => false,
1013            Err(e) => return Err(UniError::Internal(e.into())),
1014        };
1015        if !has_catalog_schema {
1016            match data_store.get(&legacy_schema_obj_path).await {
1017                Ok(result) => {
1018                    let bytes = result
1019                        .bytes()
1020                        .await
1021                        .map_err(|e| UniError::Internal(e.into()))?;
1022                    data_store
1023                        .put(&schema_obj_path, bytes.into())
1024                        .await
1025                        .map_err(|e| UniError::Internal(e.into()))?;
1026                    info!(
1027                        legacy = %legacy_schema_obj_path,
1028                        target = %schema_obj_path,
1029                        "Migrated legacy schema path to catalog path"
1030                    );
1031                }
1032                Err(object_store::Error::NotFound { .. }) => {}
1033                Err(e) => return Err(UniError::Internal(e.into())),
1034            }
1035        }
1036
1037        // Load schema (SchemaManager::load creates a default if missing)
1038        // Schema is always in data_store (Remote or Local)
1039        let schema_manager = Arc::new(
1040            SchemaManager::load_from_store(data_store.clone(), &schema_obj_path)
1041                .await
1042                .map_err(UniError::Internal)?,
1043        );
1044
1045        let lancedb_storage_options = self
1046            .cloud_config
1047            .as_ref()
1048            .map(Self::cloud_config_to_lancedb_storage_options);
1049
1050        let storage = if is_hybrid || is_remote_uri {
1051            // Preserve explicit cloud settings (endpoint, credentials, path style)
1052            // by reusing the constructed remote store.
1053            StorageManager::new_with_store_and_storage_options(
1054                &storage_uri,
1055                data_store.clone(),
1056                schema_manager.clone(),
1057                self.config.clone(),
1058                lancedb_storage_options.clone(),
1059            )
1060            .await
1061            .map_err(UniError::Internal)?
1062        } else {
1063            // Local mode keeps using a storage-path-scoped local store.
1064            StorageManager::new_with_config(
1065                &storage_uri,
1066                schema_manager.clone(),
1067                self.config.clone(),
1068            )
1069            .await
1070            .map_err(UniError::Internal)?
1071        };
1072
1073        let storage = Arc::new(storage);
1074
1075        // Create shutdown handle
1076        let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
1077
1078        // Start background compaction with shutdown signal
1079        let compaction_handle = storage
1080            .clone()
1081            .start_background_compaction(shutdown_handle.subscribe());
1082        shutdown_handle.track_task(compaction_handle);
1083
1084        // Initialize property manager
1085        let prop_cache_capacity = self.config.cache_size / 1024;
1086
1087        let prop_manager = Arc::new(PropertyManager::new(
1088            storage.clone(),
1089            schema_manager.clone(),
1090            prop_cache_capacity,
1091        ));
1092
1093        // Setup stores for WAL and IdAllocator (needed for version recovery check)
1094        let id_store = local_store_opt
1095            .clone()
1096            .unwrap_or_else(|| data_store.clone());
1097        let wal_store = local_store_opt
1098            .clone()
1099            .unwrap_or_else(|| data_store.clone());
1100
1101        // Determine start version and WAL high water mark from latest snapshot.
1102        // Detects and recovers from a lost manifest pointer.
1103        let latest_snapshot = storage
1104            .snapshot_manager()
1105            .load_latest_snapshot()
1106            .await
1107            .map_err(UniError::Internal)?;
1108
1109        let (start_version, wal_high_water_mark) = if let Some(ref snapshot) = latest_snapshot {
1110            (
1111                snapshot.version_high_water_mark + 1,
1112                snapshot.wal_high_water_mark,
1113            )
1114        } else {
1115            // No latest snapshot — fresh DB or lost manifest?
1116            let has_manifests = storage
1117                .snapshot_manager()
1118                .has_any_manifests()
1119                .await
1120                .unwrap_or(false);
1121
1122            let wal_check =
1123                WriteAheadLog::new(wal_store.clone(), object_store::path::Path::from("wal"));
1124            let has_wal = wal_check.has_segments().await.unwrap_or(false);
1125
1126            if has_manifests {
1127                // Manifests exist but latest pointer is missing — try to recover from manifests
1128                let snapshot_ids = storage
1129                    .snapshot_manager()
1130                    .list_snapshots()
1131                    .await
1132                    .map_err(UniError::Internal)?;
1133                if let Some(last_id) = snapshot_ids.last() {
1134                    let manifest = storage
1135                        .snapshot_manager()
1136                        .load_snapshot(last_id)
1137                        .await
1138                        .map_err(UniError::Internal)?;
1139                    tracing::warn!(
1140                        "Latest snapshot pointer missing but found manifest '{}'. \
1141                         Recovering version {}.",
1142                        last_id,
1143                        manifest.version_high_water_mark
1144                    );
1145                    (
1146                        manifest.version_high_water_mark + 1,
1147                        manifest.wal_high_water_mark,
1148                    )
1149                } else {
1150                    return Err(UniError::Internal(anyhow::anyhow!(
1151                        "Snapshot manifests directory exists but contains no valid manifests. \
1152                         Possible data corruption."
1153                    )));
1154                }
1155            } else if has_wal {
1156                // WAL exists but no manifests at all — data exists but unrecoverable version
1157                return Err(UniError::Internal(anyhow::anyhow!(
1158                    "Database has WAL segments but no snapshot manifest. \
1159                     Cannot safely determine version counter -- starting at 0 would cause \
1160                     version conflicts and data corruption. \
1161                     Restore the snapshot manifest or delete WAL to start fresh."
1162                )));
1163            } else {
1164                // Truly fresh database
1165                (0, 0)
1166            }
1167        };
1168
1169        let allocator = Arc::new(
1170            IdAllocator::new(
1171                id_store,
1172                object_store::path::Path::from("id_allocator.json"),
1173                1000,
1174            )
1175            .await
1176            .map_err(UniError::Internal)?,
1177        );
1178
1179        let wal = if !self.config.wal_enabled {
1180            // WAL disabled by config
1181            None
1182        } else if is_remote_uri && !is_hybrid {
1183            // Remote-only WAL (ObjectStoreWal)
1184            Some(Arc::new(WriteAheadLog::new(
1185                wal_store,
1186                object_store::path::Path::from("wal"),
1187            )))
1188        } else if is_hybrid || !is_remote_uri {
1189            // Local WAL (using local_store)
1190            // Even if local_store uses ObjectStore trait, it maps to FS.
1191            Some(Arc::new(WriteAheadLog::new(
1192                wal_store,
1193                object_store::path::Path::from("wal"),
1194            )))
1195        } else {
1196            None
1197        };
1198
1199        let writer = Arc::new(RwLock::new(
1200            Writer::new_with_config(
1201                storage.clone(),
1202                schema_manager.clone(),
1203                start_version,
1204                self.config.clone(),
1205                wal,
1206                Some(allocator),
1207            )
1208            .await
1209            .map_err(UniError::Internal)?,
1210        ));
1211
1212        let required_embed_aliases: std::collections::BTreeSet<String> = schema_manager
1213            .schema()
1214            .indexes
1215            .iter()
1216            .filter_map(|idx| {
1217                if let uni_common::core::schema::IndexDefinition::Vector(cfg) = idx {
1218                    cfg.embedding_config.as_ref().map(|emb| emb.alias.clone())
1219                } else {
1220                    None
1221                }
1222            })
1223            .collect();
1224
1225        if !required_embed_aliases.is_empty() && self.xervo_catalog.is_none() {
1226            return Err(UniError::Internal(anyhow::anyhow!(
1227                "Uni-Xervo catalog is required because schema has vector indexes with embedding aliases"
1228            )));
1229        }
1230
1231        let xervo_runtime = if let Some(catalog) = self.xervo_catalog {
1232            for alias in &required_embed_aliases {
1233                let spec = catalog.iter().find(|s| &s.alias == alias).ok_or_else(|| {
1234                    UniError::Internal(anyhow::anyhow!(
1235                        "Missing Uni-Xervo alias '{}' referenced by vector index embedding config",
1236                        alias
1237                    ))
1238                })?;
1239                if spec.task != ModelTask::Embed {
1240                    return Err(UniError::Internal(anyhow::anyhow!(
1241                        "Uni-Xervo alias '{}' must be an embed task",
1242                        alias
1243                    )));
1244                }
1245            }
1246
1247            let mut runtime_builder = ModelRuntime::builder().catalog(catalog);
1248            #[cfg(feature = "provider-candle")]
1249            {
1250                runtime_builder = runtime_builder
1251                    .register_provider(uni_xervo::provider::LocalCandleProvider::new());
1252            }
1253            #[cfg(feature = "provider-fastembed")]
1254            {
1255                runtime_builder = runtime_builder
1256                    .register_provider(uni_xervo::provider::LocalFastEmbedProvider::new());
1257            }
1258            #[cfg(feature = "provider-openai")]
1259            {
1260                runtime_builder = runtime_builder
1261                    .register_provider(uni_xervo::provider::RemoteOpenAIProvider::new());
1262            }
1263            #[cfg(feature = "provider-gemini")]
1264            {
1265                runtime_builder = runtime_builder
1266                    .register_provider(uni_xervo::provider::RemoteGeminiProvider::new());
1267            }
1268            #[cfg(feature = "provider-vertexai")]
1269            {
1270                runtime_builder = runtime_builder
1271                    .register_provider(uni_xervo::provider::RemoteVertexAIProvider::new());
1272            }
1273            #[cfg(feature = "provider-mistral")]
1274            {
1275                runtime_builder = runtime_builder
1276                    .register_provider(uni_xervo::provider::RemoteMistralProvider::new());
1277            }
1278            #[cfg(feature = "provider-anthropic")]
1279            {
1280                runtime_builder = runtime_builder
1281                    .register_provider(uni_xervo::provider::RemoteAnthropicProvider::new());
1282            }
1283            #[cfg(feature = "provider-voyageai")]
1284            {
1285                runtime_builder = runtime_builder
1286                    .register_provider(uni_xervo::provider::RemoteVoyageAIProvider::new());
1287            }
1288            #[cfg(feature = "provider-cohere")]
1289            {
1290                runtime_builder = runtime_builder
1291                    .register_provider(uni_xervo::provider::RemoteCohereProvider::new());
1292            }
1293            #[cfg(feature = "provider-azure-openai")]
1294            {
1295                runtime_builder = runtime_builder
1296                    .register_provider(uni_xervo::provider::RemoteAzureOpenAIProvider::new());
1297            }
1298            #[cfg(feature = "provider-mistralrs")]
1299            {
1300                runtime_builder = runtime_builder
1301                    .register_provider(uni_xervo::provider::LocalMistralRsProvider::new());
1302            }
1303
1304            Some(
1305                runtime_builder
1306                    .build()
1307                    .await
1308                    .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?,
1309            )
1310        } else {
1311            None
1312        };
1313
1314        if let Some(ref runtime) = xervo_runtime {
1315            let mut writer_guard = writer.write().await;
1316            writer_guard.set_xervo_runtime(runtime.clone());
1317        }
1318
1319        // Replay WAL to restore any uncommitted mutations from previous session
1320        // Only replay mutations with LSN > wal_high_water_mark to avoid double-applying
1321        {
1322            let w = writer.read().await;
1323            let replayed = w
1324                .replay_wal(wal_high_water_mark)
1325                .await
1326                .map_err(UniError::Internal)?;
1327            if replayed > 0 {
1328                info!("WAL recovery: replayed {} mutations", replayed);
1329            }
1330        }
1331
1332        // Wire up IndexRebuildManager for post-flush automatic rebuild scheduling
1333        if self.config.index_rebuild.auto_rebuild_enabled {
1334            let rebuild_manager = Arc::new(
1335                uni_store::storage::IndexRebuildManager::new(
1336                    storage.clone(),
1337                    schema_manager.clone(),
1338                    self.config.index_rebuild.clone(),
1339                )
1340                .await
1341                .map_err(UniError::Internal)?,
1342            );
1343
1344            let handle = rebuild_manager
1345                .clone()
1346                .start_background_worker(shutdown_handle.subscribe());
1347            shutdown_handle.track_task(handle);
1348
1349            {
1350                let mut writer_guard = writer.write().await;
1351                writer_guard.set_index_rebuild_manager(rebuild_manager);
1352            }
1353        }
1354
1355        // Start background flush checker for time-based auto-flush
1356        if let Some(interval) = self.config.auto_flush_interval {
1357            let writer_clone = writer.clone();
1358            let mut shutdown_rx = shutdown_handle.subscribe();
1359
1360            let handle = tokio::spawn(async move {
1361                let mut ticker = tokio::time::interval(interval);
1362                loop {
1363                    tokio::select! {
1364                        _ = ticker.tick() => {
1365                            let mut w = writer_clone.write().await;
1366                            if let Err(e) = w.check_flush().await {
1367                                tracing::warn!("Background flush check failed: {}", e);
1368                            }
1369                        }
1370                        _ = shutdown_rx.recv() => {
1371                            tracing::info!("Auto-flush shutting down, performing final flush");
1372                            let mut w = writer_clone.write().await;
1373                            let _ = w.flush_to_l1(None).await;
1374                            break;
1375                        }
1376                    }
1377                }
1378            });
1379
1380            shutdown_handle.track_task(handle);
1381        }
1382
1383        Ok(Uni {
1384            storage,
1385            schema: schema_manager,
1386            properties: prop_manager,
1387            writer: Some(writer),
1388            xervo_runtime,
1389            config: self.config,
1390            procedure_registry: Arc::new(uni_query::ProcedureRegistry::new()),
1391            shutdown_handle,
1392            locy_rule_registry: Arc::new(std::sync::RwLock::new(
1393                impl_locy::LocyRuleRegistry::default(),
1394            )),
1395        })
1396    }
1397
1398    /// Open the database (blocking)
1399    pub fn build_sync(self) -> Result<Uni> {
1400        let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
1401        rt.block_on(self.build())
1402    }
1403
1404    fn cloud_config_to_lancedb_storage_options(
1405        config: &CloudStorageConfig,
1406    ) -> std::collections::HashMap<String, String> {
1407        let mut opts = std::collections::HashMap::new();
1408
1409        match config {
1410            CloudStorageConfig::S3 {
1411                bucket,
1412                region,
1413                endpoint,
1414                access_key_id,
1415                secret_access_key,
1416                session_token,
1417                virtual_hosted_style,
1418            } => {
1419                opts.insert("bucket".to_string(), bucket.clone());
1420                opts.insert(
1421                    "virtual_hosted_style_request".to_string(),
1422                    virtual_hosted_style.to_string(),
1423                );
1424
1425                if let Some(r) = region {
1426                    opts.insert("region".to_string(), r.clone());
1427                }
1428                if let Some(ep) = endpoint {
1429                    opts.insert("endpoint".to_string(), ep.clone());
1430                    if ep.starts_with("http://") {
1431                        opts.insert("allow_http".to_string(), "true".to_string());
1432                    }
1433                }
1434                if let Some(v) = access_key_id {
1435                    opts.insert("access_key_id".to_string(), v.clone());
1436                }
1437                if let Some(v) = secret_access_key {
1438                    opts.insert("secret_access_key".to_string(), v.clone());
1439                }
1440                if let Some(v) = session_token {
1441                    opts.insert("session_token".to_string(), v.clone());
1442                }
1443            }
1444            CloudStorageConfig::Gcs {
1445                bucket,
1446                service_account_path,
1447                service_account_key,
1448            } => {
1449                opts.insert("bucket".to_string(), bucket.clone());
1450                if let Some(v) = service_account_path {
1451                    opts.insert("service_account".to_string(), v.clone());
1452                    opts.insert("application_credentials".to_string(), v.clone());
1453                }
1454                if let Some(v) = service_account_key {
1455                    opts.insert("service_account_key".to_string(), v.clone());
1456                }
1457            }
1458            CloudStorageConfig::Azure {
1459                container,
1460                account,
1461                access_key,
1462                sas_token,
1463            } => {
1464                opts.insert("account_name".to_string(), account.clone());
1465                opts.insert("container_name".to_string(), container.clone());
1466                if let Some(v) = access_key {
1467                    opts.insert("access_key".to_string(), v.clone());
1468                }
1469                if let Some(v) = sas_token {
1470                    opts.insert("sas_token".to_string(), v.clone());
1471                }
1472            }
1473        }
1474
1475        opts
1476    }
1477}