1use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8use tempfile::TempDir;
9
10pub mod appender;
11pub mod builder;
12pub mod bulk;
13pub mod compaction;
14pub mod functions;
15pub mod hooks;
16pub mod impl_locy;
17pub mod impl_query;
18pub mod indexes;
19pub mod locy_builder;
20pub mod locy_result;
21pub mod multi_agent;
22pub mod notifications;
23pub mod prepared;
24pub mod query_builder;
25pub mod rule_registry;
26pub mod schema;
27pub mod session;
28pub mod sync;
29pub mod template;
30pub mod transaction;
31pub mod xervo;
32
33use object_store::ObjectStore;
34use object_store::local::LocalFileSystem;
35use tracing::info;
36use uni_common::core::snapshot::SnapshotManifest;
37use uni_common::{CloudStorageConfig, UniConfig};
38use uni_common::{Result, UniError};
39use uni_store::cloud::build_cloud_store;
40use uni_xervo::api::{ModelAliasSpec, ModelTask};
41use uni_xervo::runtime::ModelRuntime;
42
43use uni_common::core::schema::SchemaManager;
44use uni_store::runtime::id_allocator::IdAllocator;
45use uni_store::runtime::property_manager::PropertyManager;
46use uni_store::runtime::wal::WriteAheadLog;
47use uni_store::storage::manager::StorageManager;
48
49use tokio::sync::RwLock;
50use uni_store::runtime::writer::Writer;
51
52use crate::shutdown::ShutdownHandle;
53
54use std::collections::HashMap;
55
56#[doc(hidden)]
63pub struct UniInner {
64 pub(crate) storage: Arc<StorageManager>,
65 pub(crate) schema: Arc<SchemaManager>,
66 pub(crate) properties: Arc<PropertyManager>,
67 pub(crate) writer: Option<Arc<RwLock<Writer>>>,
68 pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
69 pub(crate) config: UniConfig,
70 pub(crate) procedure_registry: Arc<uni_query::ProcedureRegistry>,
71 pub(crate) shutdown_handle: Arc<ShutdownHandle>,
72 pub(crate) locy_rule_registry: Arc<std::sync::RwLock<impl_locy::LocyRuleRegistry>>,
77 pub(crate) start_time: Instant,
79 pub(crate) commit_tx: tokio::sync::broadcast::Sender<Arc<notifications::CommitNotification>>,
81 pub(crate) write_lease: Option<multi_agent::WriteLease>,
83 pub(crate) active_session_count: AtomicUsize,
85 pub(crate) total_queries: AtomicU64,
87 pub(crate) total_commits: AtomicU64,
89 pub(crate) custom_functions: Arc<std::sync::RwLock<uni_query::CustomFunctionRegistry>>,
91
92 pub(crate) cached_l0_mutation_count: AtomicUsize,
95 pub(crate) cached_l0_estimated_size: AtomicUsize,
97 pub(crate) cached_wal_lsn: AtomicU64,
99 pub(crate) _temp_dir: Option<TempDir>,
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
108pub struct ThrottlePressure(f64);
109
110impl ThrottlePressure {
111 pub fn new(value: f64) -> Self {
113 Self(value.clamp(0.0, 1.0))
114 }
115
116 pub fn value(&self) -> f64 {
118 self.0
119 }
120
121 pub fn is_throttled(&self) -> bool {
123 self.0 > 0.0
124 }
125}
126
127impl std::fmt::Display for ThrottlePressure {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 write!(f, "{:.1}%", self.0 * 100.0)
130 }
131}
132
133impl Default for ThrottlePressure {
134 fn default() -> Self {
135 Self(0.0)
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct DatabaseMetrics {
142 pub l0_mutation_count: usize,
144 pub l0_estimated_size_bytes: usize,
146 pub schema_version: u64,
148 pub uptime: Duration,
150 pub active_sessions: usize,
152 pub l1_run_count: usize,
154 pub write_throttle_pressure: ThrottlePressure,
156 pub compaction_status: uni_store::CompactionStatus,
158 pub wal_size_bytes: u64,
160 pub wal_lsn: u64,
162 pub total_queries: u64,
164 pub total_commits: u64,
166}
167
168pub struct Uni {
190 pub(crate) inner: Arc<UniInner>,
191}
192
193impl UniInner {
197 pub(crate) async fn at_snapshot(&self, snapshot_id: &str) -> Result<UniInner> {
202 let manifest = self
203 .storage
204 .snapshot_manager()
205 .load_snapshot(snapshot_id)
206 .await
207 .map_err(UniError::Internal)?;
208
209 let pinned_storage = Arc::new(self.storage.pinned(manifest));
210
211 let prop_manager = Arc::new(PropertyManager::new(
212 pinned_storage.clone(),
213 self.schema.clone(),
214 self.properties.cache_size(),
215 ));
216
217 let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
218
219 let (commit_tx, _) = tokio::sync::broadcast::channel(256);
220 Ok(UniInner {
221 storage: pinned_storage,
222 schema: self.schema.clone(),
223 properties: prop_manager,
224 writer: None,
225 xervo_runtime: self.xervo_runtime.clone(),
226 config: self.config.clone(),
227 procedure_registry: self.procedure_registry.clone(),
228 shutdown_handle,
229 locy_rule_registry: Arc::new(std::sync::RwLock::new(
230 impl_locy::LocyRuleRegistry::default(),
231 )),
232 start_time: Instant::now(),
233 commit_tx,
234 write_lease: None,
235 active_session_count: AtomicUsize::new(0),
236 total_queries: AtomicU64::new(0),
237 total_commits: AtomicU64::new(0),
238 custom_functions: self.custom_functions.clone(),
239 cached_l0_mutation_count: AtomicUsize::new(0),
240 cached_l0_estimated_size: AtomicUsize::new(0),
241 cached_wal_lsn: AtomicU64::new(0),
242 _temp_dir: None,
243 })
244 }
245}
246
247impl Uni {
248 pub fn open(uri: impl Into<String>) -> UniBuilder {
260 UniBuilder::new(uri.into())
261 }
262
263 pub fn open_existing(uri: impl Into<String>) -> UniBuilder {
265 let mut builder = UniBuilder::new(uri.into());
266 builder.create_if_missing = false;
267 builder
268 }
269
270 pub fn create(uri: impl Into<String>) -> UniBuilder {
272 let mut builder = UniBuilder::new(uri.into());
273 builder.fail_if_exists = true;
274 builder
275 }
276
277 pub fn temporary() -> UniBuilder {
282 let temp_dir = tempfile::Builder::new()
283 .prefix("uni_mem_")
284 .tempdir()
285 .expect("failed to create temporary directory");
286 let uri = temp_dir.path().to_string_lossy().to_string();
287 let mut builder = UniBuilder::new(uri);
288 builder.temp_dir = Some(temp_dir);
289 builder
290 }
291
292 pub fn in_memory() -> UniBuilder {
294 Self::temporary()
295 }
296
297 pub fn session(&self) -> session::Session {
315 session::Session::new(self.inner.clone())
316 }
317
318 pub fn session_template(&self) -> template::SessionTemplateBuilder {
323 template::SessionTemplateBuilder::new(self.inner.clone())
324 }
325
326 pub fn metrics(&self) -> DatabaseMetrics {
334 let schema_version = self.inner.schema.schema().schema_version as u64;
335 let compaction_status = self.inner.storage.compaction_status().unwrap_or_default();
336 DatabaseMetrics {
337 l0_mutation_count: self.inner.cached_l0_mutation_count.load(Ordering::Relaxed),
338 l0_estimated_size_bytes: self.inner.cached_l0_estimated_size.load(Ordering::Relaxed),
339 schema_version,
340 uptime: self.inner.start_time.elapsed(),
341 active_sessions: self.inner.active_session_count.load(Ordering::Relaxed),
342 l1_run_count: compaction_status.l1_runs,
343 write_throttle_pressure: ThrottlePressure::default(),
344 compaction_status,
345 wal_size_bytes: 0u64,
346 wal_lsn: self.inner.cached_wal_lsn.load(Ordering::Relaxed),
347 total_queries: self.inner.total_queries.load(Ordering::Relaxed),
348 total_commits: self.inner.total_commits.load(Ordering::Relaxed),
349 }
350 }
351
352 pub fn write_lease(&self) -> Option<&multi_agent::WriteLease> {
355 self.inner.write_lease.as_ref()
356 }
357
358 pub fn rules(&self) -> rule_registry::RuleRegistry<'_> {
364 rule_registry::RuleRegistry::new(&self.inner.locy_rule_registry)
365 }
366
367 pub fn config(&self) -> &UniConfig {
371 &self.inner.config
372 }
373
374 #[doc(hidden)]
376 pub fn procedure_registry(&self) -> &Arc<uni_query::ProcedureRegistry> {
377 &self.inner.procedure_registry
378 }
379
380 #[doc(hidden)]
382 pub fn schema_manager(&self) -> Arc<SchemaManager> {
383 self.inner.schema.clone()
384 }
385
386 #[doc(hidden)]
387 pub fn writer(&self) -> Option<Arc<RwLock<Writer>>> {
388 self.inner.writer.clone()
389 }
390
391 #[doc(hidden)]
392 pub fn storage(&self) -> Arc<StorageManager> {
393 self.inner.storage.clone()
394 }
395
396 pub async fn flush(&self) -> Result<()> {
401 if let Some(writer_lock) = &self.inner.writer {
402 let mut writer = writer_lock.write().await;
403 writer
404 .flush_to_l1(None)
405 .await
406 .map(|_| ())
407 .map_err(UniError::Internal)
408 } else {
409 Err(UniError::ReadOnly {
410 operation: "flush".to_string(),
411 })
412 }
413 }
414
415 pub async fn create_snapshot(&self, name: &str) -> Result<String> {
421 if name.is_empty() {
422 return Err(UniError::Internal(anyhow::anyhow!(
423 "Snapshot name cannot be empty"
424 )));
425 }
426
427 let snapshot_id = if let Some(writer_lock) = &self.inner.writer {
428 let mut writer = writer_lock.write().await;
429 writer
430 .flush_to_l1(Some(name.to_string()))
431 .await
432 .map_err(UniError::Internal)?
433 } else {
434 return Err(UniError::ReadOnly {
435 operation: "create_snapshot".to_string(),
436 });
437 };
438
439 self.inner
440 .storage
441 .snapshot_manager()
442 .save_named_snapshot(name, &snapshot_id)
443 .await
444 .map_err(UniError::Internal)?;
445
446 Ok(snapshot_id)
447 }
448
449 pub async fn list_snapshots(&self) -> Result<Vec<SnapshotManifest>> {
451 let sm = self.inner.storage.snapshot_manager();
452 let ids = sm.list_snapshots().await.map_err(UniError::Internal)?;
453 let mut manifests = Vec::new();
454 for id in ids {
455 if let Ok(m) = sm.load_snapshot(&id).await {
456 manifests.push(m);
457 }
458 }
459 Ok(manifests)
460 }
461
462 pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
467 self.inner
468 .storage
469 .snapshot_manager()
470 .set_latest_snapshot(snapshot_id)
471 .await
472 .map_err(UniError::Internal)
473 }
474
475 pub async fn label_exists(&self, name: &str) -> Result<bool> {
477 Ok(self
478 .inner
479 .schema
480 .schema()
481 .labels
482 .get(name)
483 .is_some_and(|l| {
484 matches!(
485 l.state,
486 uni_common::core::schema::SchemaElementState::Active
487 )
488 }))
489 }
490
491 pub async fn edge_type_exists(&self, name: &str) -> Result<bool> {
493 Ok(self
494 .inner
495 .schema
496 .schema()
497 .edge_types
498 .get(name)
499 .is_some_and(|e| {
500 matches!(
501 e.state,
502 uni_common::core::schema::SchemaElementState::Active
503 )
504 }))
505 }
506
507 pub async fn list_labels(&self) -> Result<Vec<String>> {
513 let mut all_labels = std::collections::HashSet::new();
514
515 for (name, label) in self.inner.schema.schema().labels.iter() {
517 if matches!(
518 label.state,
519 uni_common::core::schema::SchemaElementState::Active
520 ) {
521 all_labels.insert(name.clone());
522 }
523 }
524
525 let query = "MATCH (n) RETURN DISTINCT labels(n) AS labels";
527 let result = self.inner.execute_internal(query, HashMap::new()).await?;
528 for row in result.rows() {
529 if let Ok(labels_list) = row.get::<Vec<String>>("labels") {
530 for label in labels_list {
531 all_labels.insert(label);
532 }
533 }
534 }
535
536 Ok(all_labels.into_iter().collect())
537 }
538
539 pub async fn list_edge_types(&self) -> Result<Vec<String>> {
541 Ok(self
542 .inner
543 .schema
544 .schema()
545 .edge_types
546 .iter()
547 .filter(|(_, e)| {
548 matches!(
549 e.state,
550 uni_common::core::schema::SchemaElementState::Active
551 )
552 })
553 .map(|(name, _)| name.clone())
554 .collect())
555 }
556
557 pub async fn get_label_info(
559 &self,
560 name: &str,
561 ) -> Result<Option<crate::api::schema::LabelInfo>> {
562 let schema = self.inner.schema.schema();
563 if schema.labels.contains_key(name) {
564 let count = if let Ok(ds) = self.inner.storage.vertex_dataset(name) {
565 if let Ok(raw) = ds.open_raw().await {
566 raw.count_rows(None)
567 .await
568 .map_err(|e| UniError::Internal(anyhow::anyhow!(e)))?
569 } else {
570 0
571 }
572 } else {
573 0
574 };
575
576 let mut properties = Vec::new();
577 if let Some(props) = schema.properties.get(name) {
578 for (prop_name, prop_meta) in props {
579 let is_indexed = schema.indexes.iter().any(|idx| match idx {
580 uni_common::core::schema::IndexDefinition::Vector(v) => {
581 v.label == name && v.property == *prop_name
582 }
583 uni_common::core::schema::IndexDefinition::Scalar(s) => {
584 s.label == name && s.properties.contains(prop_name)
585 }
586 uni_common::core::schema::IndexDefinition::FullText(f) => {
587 f.label == name && f.properties.contains(prop_name)
588 }
589 uni_common::core::schema::IndexDefinition::Inverted(inv) => {
590 inv.label == name && inv.property == *prop_name
591 }
592 uni_common::core::schema::IndexDefinition::JsonFullText(j) => {
593 j.label == name
594 }
595 _ => false,
596 });
597
598 properties.push(crate::api::schema::PropertyInfo {
599 name: prop_name.clone(),
600 data_type: format!("{:?}", prop_meta.r#type),
601 nullable: prop_meta.nullable,
602 is_indexed,
603 });
604 }
605 }
606
607 let mut indexes = Vec::new();
608 for idx in schema.indexes.iter().filter(|i| i.label() == name) {
609 use uni_common::core::schema::IndexDefinition;
610 let (idx_type, idx_props) = match idx {
611 IndexDefinition::Vector(v) => ("VECTOR", vec![v.property.clone()]),
612 IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
613 IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
614 IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
615 IndexDefinition::JsonFullText(j) => ("JSON_FTS", vec![j.column.clone()]),
616 _ => continue,
617 };
618
619 indexes.push(crate::api::schema::IndexInfo {
620 name: idx.name().to_string(),
621 index_type: idx_type.to_string(),
622 properties: idx_props,
623 status: "ONLINE".to_string(), });
625 }
626
627 let mut constraints = Vec::new();
628 for c in &schema.constraints {
629 if let uni_common::core::schema::ConstraintTarget::Label(l) = &c.target
630 && l == name
631 {
632 let (ctype, cprops) = match &c.constraint_type {
633 uni_common::core::schema::ConstraintType::Unique { properties } => {
634 ("UNIQUE", properties.clone())
635 }
636 uni_common::core::schema::ConstraintType::Exists { property } => {
637 ("EXISTS", vec![property.clone()])
638 }
639 uni_common::core::schema::ConstraintType::Check { expression } => {
640 ("CHECK", vec![expression.clone()])
641 }
642 _ => ("UNKNOWN", vec![]),
643 };
644
645 constraints.push(crate::api::schema::ConstraintInfo {
646 name: c.name.clone(),
647 constraint_type: ctype.to_string(),
648 properties: cprops,
649 enabled: c.enabled,
650 });
651 }
652 }
653
654 Ok(Some(crate::api::schema::LabelInfo {
655 name: name.to_string(),
656 count,
657 properties,
658 indexes,
659 constraints,
660 }))
661 } else {
662 Ok(None)
663 }
664 }
665
666 pub async fn get_edge_type_info(
668 &self,
669 name: &str,
670 ) -> Result<Option<crate::api::schema::EdgeTypeInfo>> {
671 let schema = self.inner.schema.schema();
672 let edge_meta = match schema.edge_types.get(name) {
673 Some(meta) => meta,
674 None => return Ok(None),
675 };
676
677 let count = {
679 let query = format!("MATCH ()-[r:{}]->() RETURN count(r) AS cnt", name);
680 match self.inner.execute_internal(&query, HashMap::new()).await {
681 Ok(result) => result
682 .rows()
683 .first()
684 .and_then(|r| r.get::<i64>("cnt").ok())
685 .unwrap_or(0) as usize,
686 Err(_) => 0,
687 }
688 };
689
690 let source_labels = edge_meta.src_labels.clone();
691 let target_labels = edge_meta.dst_labels.clone();
692
693 let mut properties = Vec::new();
694 if let Some(props) = schema.properties.get(name) {
695 for (prop_name, prop_meta) in props {
696 let is_indexed = schema.indexes.iter().any(|idx| match idx {
697 uni_common::core::schema::IndexDefinition::Scalar(s) => {
698 s.label == name && s.properties.contains(prop_name)
699 }
700 uni_common::core::schema::IndexDefinition::FullText(f) => {
701 f.label == name && f.properties.contains(prop_name)
702 }
703 uni_common::core::schema::IndexDefinition::Inverted(inv) => {
704 inv.label == name && inv.property == *prop_name
705 }
706 _ => false,
707 });
708
709 properties.push(crate::api::schema::PropertyInfo {
710 name: prop_name.clone(),
711 data_type: format!("{:?}", prop_meta.r#type),
712 nullable: prop_meta.nullable,
713 is_indexed,
714 });
715 }
716 }
717
718 let mut indexes = Vec::new();
719 for idx in schema.indexes.iter().filter(|i| i.label() == name) {
720 use uni_common::core::schema::IndexDefinition;
721 let (idx_type, idx_props) = match idx {
722 IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
723 IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
724 IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
725 _ => continue,
726 };
727
728 indexes.push(crate::api::schema::IndexInfo {
729 name: idx.name().to_string(),
730 index_type: idx_type.to_string(),
731 properties: idx_props,
732 status: "ONLINE".to_string(),
733 });
734 }
735
736 let mut constraints = Vec::new();
737 for c in &schema.constraints {
738 if let uni_common::core::schema::ConstraintTarget::EdgeType(et) = &c.target
739 && et == name
740 {
741 let (ctype, cprops) = match &c.constraint_type {
742 uni_common::core::schema::ConstraintType::Unique { properties } => {
743 ("UNIQUE", properties.clone())
744 }
745 uni_common::core::schema::ConstraintType::Exists { property } => {
746 ("EXISTS", vec![property.clone()])
747 }
748 uni_common::core::schema::ConstraintType::Check { expression } => {
749 ("CHECK", vec![expression.clone()])
750 }
751 _ => ("UNKNOWN", vec![]),
752 };
753
754 constraints.push(crate::api::schema::ConstraintInfo {
755 name: c.name.clone(),
756 constraint_type: ctype.to_string(),
757 properties: cprops,
758 enabled: c.enabled,
759 });
760 }
761 }
762
763 Ok(Some(crate::api::schema::EdgeTypeInfo {
764 name: name.to_string(),
765 count,
766 source_labels,
767 target_labels,
768 properties,
769 indexes,
770 constraints,
771 }))
772 }
773
774 pub fn compaction(&self) -> compaction::Compaction<'_> {
778 compaction::Compaction { inner: &self.inner }
779 }
780
781 pub fn indexes(&self) -> indexes::Indexes<'_> {
785 indexes::Indexes { inner: &self.inner }
786 }
787
788 pub fn functions(&self) -> functions::Functions<'_> {
792 functions::Functions { inner: &self.inner }
793 }
794
795 pub async fn shutdown(self) -> Result<()> {
800 if let Some(ref writer) = self.inner.writer {
802 let mut w = writer.write().await;
803 if let Err(e) = w.flush_to_l1(None).await {
804 tracing::error!("Error flushing during shutdown: {}", e);
805 }
806 }
807
808 self.inner
809 .shutdown_handle
810 .shutdown_async()
811 .await
812 .map_err(UniError::Internal)
813 }
814}
815
816impl Drop for Uni {
817 fn drop(&mut self) {
818 self.inner.shutdown_handle.shutdown_blocking();
819 tracing::debug!("Uni dropped, shutdown signal sent");
820 }
821}
822
823#[must_use = "builders do nothing until .build() is called"]
825pub struct UniBuilder {
826 uri: String,
827 config: UniConfig,
828 schema_file: Option<PathBuf>,
829 xervo_catalog: Option<Vec<ModelAliasSpec>>,
830 hybrid_remote_url: Option<String>,
831 cloud_config: Option<CloudStorageConfig>,
832 create_if_missing: bool,
833 fail_if_exists: bool,
834 read_only: bool,
835 write_lease: Option<multi_agent::WriteLease>,
836 temp_dir: Option<TempDir>,
837}
838
839impl UniBuilder {
840 pub fn new(uri: String) -> Self {
842 Self {
843 uri,
844 config: UniConfig::default(),
845 schema_file: None,
846 xervo_catalog: None,
847 hybrid_remote_url: None,
848 cloud_config: None,
849 create_if_missing: true,
850 fail_if_exists: false,
851 read_only: false,
852 write_lease: None,
853 temp_dir: None,
854 }
855 }
856
857 pub fn schema_file(mut self, path: impl AsRef<Path>) -> Self {
859 self.schema_file = Some(path.as_ref().to_path_buf());
860 self
861 }
862
863 pub fn xervo_catalog(mut self, catalog: Vec<ModelAliasSpec>) -> Self {
865 self.xervo_catalog = Some(catalog);
866 self
867 }
868
869 pub fn remote_storage(mut self, remote_url: &str, config: CloudStorageConfig) -> Self {
892 self.hybrid_remote_url = Some(remote_url.to_string());
893 self.cloud_config = Some(config);
894 self
895 }
896
897 pub fn read_only(mut self) -> Self {
903 self.read_only = true;
904 self
905 }
906
907 pub fn write_lease(mut self, lease: multi_agent::WriteLease) -> Self {
912 self.write_lease = Some(lease);
913 self
914 }
915
916 pub fn config(mut self, config: UniConfig) -> Self {
918 self.config = config;
919 self
920 }
921
922 pub async fn build(self) -> Result<Uni> {
924 let uri = self.uri.clone();
925 let is_remote_uri = uri.contains("://");
926 let is_hybrid = self.hybrid_remote_url.is_some();
927
928 if is_hybrid && is_remote_uri {
929 return Err(UniError::Internal(anyhow::anyhow!(
930 "Hybrid mode requires a local path as primary URI, found: {}",
931 uri
932 )));
933 }
934
935 let (storage_uri, data_store, local_store_opt) = if is_hybrid {
936 let remote_url = self.hybrid_remote_url.as_ref().unwrap();
937
938 let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
940 build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
941 } else {
942 let url = url::Url::parse(remote_url).map_err(|e| {
943 UniError::Io(std::io::Error::new(
944 std::io::ErrorKind::InvalidInput,
945 e.to_string(),
946 ))
947 })?;
948 let (os, _path) =
949 object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
950 Arc::from(os)
951 };
952
953 let path = PathBuf::from(&uri);
955 if path.exists() {
956 if self.fail_if_exists {
957 return Err(UniError::Internal(anyhow::anyhow!(
958 "Database already exists at {}",
959 uri
960 )));
961 }
962 } else {
963 if !self.create_if_missing {
964 return Err(UniError::NotFound { path: path.clone() });
965 }
966 std::fs::create_dir_all(&path).map_err(UniError::Io)?;
967 }
968
969 let local_store = Arc::new(
970 LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
971 );
972
973 (
976 remote_url.clone(),
977 remote_store,
978 Some(local_store as Arc<dyn ObjectStore>),
979 )
980 } else if is_remote_uri {
981 let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
983 build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
984 } else {
985 let url = url::Url::parse(&uri).map_err(|e| {
986 UniError::Io(std::io::Error::new(
987 std::io::ErrorKind::InvalidInput,
988 e.to_string(),
989 ))
990 })?;
991 let (os, _path) =
992 object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
993 Arc::from(os)
994 };
995
996 (uri.clone(), remote_store, None)
997 } else {
998 let path = PathBuf::from(&uri);
1000 let storage_path = path.join("storage");
1001
1002 if path.exists() {
1003 if self.fail_if_exists {
1004 return Err(UniError::Internal(anyhow::anyhow!(
1005 "Database already exists at {}",
1006 uri
1007 )));
1008 }
1009 } else {
1010 if !self.create_if_missing {
1011 return Err(UniError::NotFound { path: path.clone() });
1012 }
1013 std::fs::create_dir_all(&path).map_err(UniError::Io)?;
1014 }
1015
1016 if !storage_path.exists() {
1018 std::fs::create_dir_all(&storage_path).map_err(UniError::Io)?;
1019 }
1020
1021 let store = Arc::new(
1022 LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
1023 );
1024 (
1025 storage_path.to_string_lossy().to_string(),
1026 store.clone() as Arc<dyn ObjectStore>,
1027 Some(store as Arc<dyn ObjectStore>),
1028 )
1029 };
1030
1031 let schema_obj_path = object_store::path::Path::from("catalog/schema.json");
1033 let legacy_schema_obj_path = object_store::path::Path::from("schema.json");
1035
1036 let has_catalog_schema = match data_store.get(&schema_obj_path).await {
1040 Ok(_) => true,
1041 Err(object_store::Error::NotFound { .. }) => false,
1042 Err(e) => return Err(UniError::Internal(e.into())),
1043 };
1044 if !has_catalog_schema {
1045 match data_store.get(&legacy_schema_obj_path).await {
1046 Ok(result) => {
1047 let bytes = result
1048 .bytes()
1049 .await
1050 .map_err(|e| UniError::Internal(e.into()))?;
1051 data_store
1052 .put(&schema_obj_path, bytes.into())
1053 .await
1054 .map_err(|e| UniError::Internal(e.into()))?;
1055 info!(
1056 legacy = %legacy_schema_obj_path,
1057 target = %schema_obj_path,
1058 "Migrated legacy schema path to catalog path"
1059 );
1060 }
1061 Err(object_store::Error::NotFound { .. }) => {}
1062 Err(e) => return Err(UniError::Internal(e.into())),
1063 }
1064 }
1065
1066 let schema_manager = Arc::new(
1069 SchemaManager::load_from_store(data_store.clone(), &schema_obj_path)
1070 .await
1071 .map_err(UniError::Internal)?,
1072 );
1073
1074 let lancedb_storage_options = self
1075 .cloud_config
1076 .as_ref()
1077 .map(Self::cloud_config_to_lancedb_storage_options);
1078
1079 let storage = if is_hybrid || is_remote_uri {
1080 StorageManager::new_with_store_and_storage_options(
1083 &storage_uri,
1084 data_store.clone(),
1085 schema_manager.clone(),
1086 self.config.clone(),
1087 lancedb_storage_options.clone(),
1088 )
1089 .await
1090 .map_err(UniError::Internal)?
1091 } else {
1092 StorageManager::new_with_config(
1094 &storage_uri,
1095 schema_manager.clone(),
1096 self.config.clone(),
1097 )
1098 .await
1099 .map_err(UniError::Internal)?
1100 };
1101
1102 let storage = Arc::new(storage);
1103
1104 let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
1106
1107 let compaction_handle = storage
1109 .clone()
1110 .start_background_compaction(shutdown_handle.subscribe());
1111 shutdown_handle.track_task(compaction_handle);
1112
1113 let prop_cache_capacity = self.config.cache_size / 1024;
1115
1116 let prop_manager = Arc::new(PropertyManager::new(
1117 storage.clone(),
1118 schema_manager.clone(),
1119 prop_cache_capacity,
1120 ));
1121
1122 let id_store = local_store_opt
1124 .clone()
1125 .unwrap_or_else(|| data_store.clone());
1126 let wal_store = local_store_opt
1127 .clone()
1128 .unwrap_or_else(|| data_store.clone());
1129
1130 let latest_snapshot = storage
1133 .snapshot_manager()
1134 .load_latest_snapshot()
1135 .await
1136 .map_err(UniError::Internal)?;
1137
1138 let (start_version, wal_high_water_mark) = if let Some(ref snapshot) = latest_snapshot {
1139 (
1140 snapshot.version_high_water_mark + 1,
1141 snapshot.wal_high_water_mark,
1142 )
1143 } else {
1144 let has_manifests = storage
1146 .snapshot_manager()
1147 .has_any_manifests()
1148 .await
1149 .unwrap_or(false);
1150
1151 let wal_check =
1152 WriteAheadLog::new(wal_store.clone(), object_store::path::Path::from("wal"));
1153 let has_wal = wal_check.has_segments().await.unwrap_or(false);
1154
1155 if has_manifests {
1156 let snapshot_ids = storage
1158 .snapshot_manager()
1159 .list_snapshots()
1160 .await
1161 .map_err(UniError::Internal)?;
1162 if let Some(last_id) = snapshot_ids.last() {
1163 let manifest = storage
1164 .snapshot_manager()
1165 .load_snapshot(last_id)
1166 .await
1167 .map_err(UniError::Internal)?;
1168 tracing::warn!(
1169 "Latest snapshot pointer missing but found manifest '{}'. \
1170 Recovering version {}.",
1171 last_id,
1172 manifest.version_high_water_mark
1173 );
1174 (
1175 manifest.version_high_water_mark + 1,
1176 manifest.wal_high_water_mark,
1177 )
1178 } else {
1179 return Err(UniError::Internal(anyhow::anyhow!(
1180 "Snapshot manifests directory exists but contains no valid manifests. \
1181 Possible data corruption."
1182 )));
1183 }
1184 } else if has_wal {
1185 return Err(UniError::Internal(anyhow::anyhow!(
1187 "Database has WAL segments but no snapshot manifest. \
1188 Cannot safely determine version counter -- starting at 0 would cause \
1189 version conflicts and data corruption. \
1190 Restore the snapshot manifest or delete WAL to start fresh."
1191 )));
1192 } else {
1193 (0, 0)
1195 }
1196 };
1197
1198 let allocator = Arc::new(
1199 IdAllocator::new(
1200 id_store,
1201 object_store::path::Path::from("id_allocator.json"),
1202 1000,
1203 )
1204 .await
1205 .map_err(UniError::Internal)?,
1206 );
1207
1208 let wal = if !self.config.wal_enabled {
1209 None
1211 } else if is_remote_uri && !is_hybrid {
1212 Some(Arc::new(WriteAheadLog::new(
1214 wal_store,
1215 object_store::path::Path::from("wal"),
1216 )))
1217 } else if is_hybrid || !is_remote_uri {
1218 Some(Arc::new(WriteAheadLog::new(
1221 wal_store,
1222 object_store::path::Path::from("wal"),
1223 )))
1224 } else {
1225 None
1226 };
1227
1228 let writer = Arc::new(RwLock::new(
1229 Writer::new_with_config(
1230 storage.clone(),
1231 schema_manager.clone(),
1232 start_version,
1233 self.config.clone(),
1234 wal,
1235 Some(allocator),
1236 )
1237 .await
1238 .map_err(UniError::Internal)?,
1239 ));
1240
1241 let required_embed_aliases: std::collections::BTreeSet<String> = schema_manager
1242 .schema()
1243 .indexes
1244 .iter()
1245 .filter_map(|idx| {
1246 if let uni_common::core::schema::IndexDefinition::Vector(cfg) = idx {
1247 cfg.embedding_config.as_ref().map(|emb| emb.alias.clone())
1248 } else {
1249 None
1250 }
1251 })
1252 .collect();
1253
1254 if !required_embed_aliases.is_empty() && self.xervo_catalog.is_none() {
1255 return Err(UniError::Internal(anyhow::anyhow!(
1256 "Uni-Xervo catalog is required because schema has vector indexes with embedding aliases"
1257 )));
1258 }
1259
1260 let xervo_runtime = if let Some(catalog) = self.xervo_catalog {
1261 for alias in &required_embed_aliases {
1262 let spec = catalog.iter().find(|s| &s.alias == alias).ok_or_else(|| {
1263 UniError::Internal(anyhow::anyhow!(
1264 "Missing Uni-Xervo alias '{}' referenced by vector index embedding config",
1265 alias
1266 ))
1267 })?;
1268 if spec.task != ModelTask::Embed {
1269 return Err(UniError::Internal(anyhow::anyhow!(
1270 "Uni-Xervo alias '{}' must be an embed task",
1271 alias
1272 )));
1273 }
1274 }
1275
1276 let mut runtime_builder = ModelRuntime::builder().catalog(catalog);
1277 #[cfg(feature = "provider-candle")]
1278 {
1279 runtime_builder = runtime_builder
1280 .register_provider(uni_xervo::provider::LocalCandleProvider::new());
1281 }
1282 #[cfg(feature = "provider-fastembed")]
1283 {
1284 runtime_builder = runtime_builder
1285 .register_provider(uni_xervo::provider::LocalFastEmbedProvider::new());
1286 }
1287 #[cfg(feature = "provider-openai")]
1288 {
1289 runtime_builder = runtime_builder
1290 .register_provider(uni_xervo::provider::RemoteOpenAIProvider::new());
1291 }
1292 #[cfg(feature = "provider-gemini")]
1293 {
1294 runtime_builder = runtime_builder
1295 .register_provider(uni_xervo::provider::RemoteGeminiProvider::new());
1296 }
1297 #[cfg(feature = "provider-vertexai")]
1298 {
1299 runtime_builder = runtime_builder
1300 .register_provider(uni_xervo::provider::RemoteVertexAIProvider::new());
1301 }
1302 #[cfg(feature = "provider-mistral")]
1303 {
1304 runtime_builder = runtime_builder
1305 .register_provider(uni_xervo::provider::RemoteMistralProvider::new());
1306 }
1307 #[cfg(feature = "provider-anthropic")]
1308 {
1309 runtime_builder = runtime_builder
1310 .register_provider(uni_xervo::provider::RemoteAnthropicProvider::new());
1311 }
1312 #[cfg(feature = "provider-voyageai")]
1313 {
1314 runtime_builder = runtime_builder
1315 .register_provider(uni_xervo::provider::RemoteVoyageAIProvider::new());
1316 }
1317 #[cfg(feature = "provider-cohere")]
1318 {
1319 runtime_builder = runtime_builder
1320 .register_provider(uni_xervo::provider::RemoteCohereProvider::new());
1321 }
1322 #[cfg(feature = "provider-azure-openai")]
1323 {
1324 runtime_builder = runtime_builder
1325 .register_provider(uni_xervo::provider::RemoteAzureOpenAIProvider::new());
1326 }
1327 #[cfg(feature = "provider-mistralrs")]
1328 {
1329 runtime_builder = runtime_builder
1330 .register_provider(uni_xervo::provider::LocalMistralRsProvider::new());
1331 }
1332
1333 Some(
1334 runtime_builder
1335 .build()
1336 .await
1337 .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?,
1338 )
1339 } else {
1340 None
1341 };
1342
1343 if let Some(ref runtime) = xervo_runtime {
1344 let mut writer_guard = writer.write().await;
1345 writer_guard.set_xervo_runtime(runtime.clone());
1346 }
1347
1348 {
1351 let w = writer.read().await;
1352 let replayed = w
1353 .replay_wal(wal_high_water_mark)
1354 .await
1355 .map_err(UniError::Internal)?;
1356 if replayed > 0 {
1357 info!("WAL recovery: replayed {} mutations", replayed);
1358 }
1359 }
1360
1361 if self.config.index_rebuild.auto_rebuild_enabled {
1363 let rebuild_manager = Arc::new(
1364 uni_store::storage::IndexRebuildManager::new(
1365 storage.clone(),
1366 schema_manager.clone(),
1367 self.config.index_rebuild.clone(),
1368 )
1369 .await
1370 .map_err(UniError::Internal)?,
1371 );
1372
1373 let handle = rebuild_manager
1374 .clone()
1375 .start_background_worker(shutdown_handle.subscribe());
1376 shutdown_handle.track_task(handle);
1377
1378 {
1379 let mut writer_guard = writer.write().await;
1380 writer_guard.set_index_rebuild_manager(rebuild_manager);
1381 }
1382 }
1383
1384 if let Some(interval) = self.config.auto_flush_interval {
1386 let writer_clone = writer.clone();
1387 let mut shutdown_rx = shutdown_handle.subscribe();
1388
1389 let handle = tokio::spawn(async move {
1390 let mut ticker = tokio::time::interval(interval);
1391 loop {
1392 tokio::select! {
1393 _ = ticker.tick() => {
1394 let mut w = writer_clone.write().await;
1395 if let Err(e) = w.check_flush().await {
1396 tracing::warn!("Background flush check failed: {}", e);
1397 }
1398 }
1399 _ = shutdown_rx.recv() => {
1400 tracing::info!("Auto-flush shutting down, performing final flush");
1401 let mut w = writer_clone.write().await;
1402 let _ = w.flush_to_l1(None).await;
1403 break;
1404 }
1405 }
1406 }
1407 });
1408
1409 shutdown_handle.track_task(handle);
1410 }
1411
1412 let (commit_tx, _) = tokio::sync::broadcast::channel(256);
1413 let writer_field = if self.read_only { None } else { Some(writer) };
1414
1415 Ok(Uni {
1416 inner: Arc::new(UniInner {
1417 storage,
1418 schema: schema_manager,
1419 properties: prop_manager,
1420 writer: writer_field,
1421 xervo_runtime,
1422 config: self.config,
1423 procedure_registry: Arc::new(uni_query::ProcedureRegistry::new()),
1424 shutdown_handle,
1425 locy_rule_registry: Arc::new(std::sync::RwLock::new(
1426 impl_locy::LocyRuleRegistry::default(),
1427 )),
1428 start_time: Instant::now(),
1429 commit_tx,
1430 write_lease: self.write_lease,
1431 active_session_count: AtomicUsize::new(0),
1432 total_queries: AtomicU64::new(0),
1433 total_commits: AtomicU64::new(0),
1434 custom_functions: Arc::new(std::sync::RwLock::new(
1435 uni_query::CustomFunctionRegistry::new(),
1436 )),
1437 cached_l0_mutation_count: AtomicUsize::new(0),
1438 cached_l0_estimated_size: AtomicUsize::new(0),
1439 cached_wal_lsn: AtomicU64::new(0),
1440 _temp_dir: self.temp_dir,
1441 }),
1442 })
1443 }
1444
1445 pub fn build_sync(self) -> Result<Uni> {
1447 let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
1448 rt.block_on(self.build())
1449 }
1450
1451 fn cloud_config_to_lancedb_storage_options(
1452 config: &CloudStorageConfig,
1453 ) -> std::collections::HashMap<String, String> {
1454 let mut opts = std::collections::HashMap::new();
1455
1456 match config {
1457 CloudStorageConfig::S3 {
1458 bucket,
1459 region,
1460 endpoint,
1461 access_key_id,
1462 secret_access_key,
1463 session_token,
1464 virtual_hosted_style,
1465 } => {
1466 opts.insert("bucket".to_string(), bucket.clone());
1467 opts.insert(
1468 "virtual_hosted_style_request".to_string(),
1469 virtual_hosted_style.to_string(),
1470 );
1471
1472 if let Some(r) = region {
1473 opts.insert("region".to_string(), r.clone());
1474 }
1475 if let Some(ep) = endpoint {
1476 opts.insert("endpoint".to_string(), ep.clone());
1477 if ep.starts_with("http://") {
1478 opts.insert("allow_http".to_string(), "true".to_string());
1479 }
1480 }
1481 if let Some(v) = access_key_id {
1482 opts.insert("access_key_id".to_string(), v.clone());
1483 }
1484 if let Some(v) = secret_access_key {
1485 opts.insert("secret_access_key".to_string(), v.clone());
1486 }
1487 if let Some(v) = session_token {
1488 opts.insert("session_token".to_string(), v.clone());
1489 }
1490 }
1491 CloudStorageConfig::Gcs {
1492 bucket,
1493 service_account_path,
1494 service_account_key,
1495 } => {
1496 opts.insert("bucket".to_string(), bucket.clone());
1497 if let Some(v) = service_account_path {
1498 opts.insert("service_account".to_string(), v.clone());
1499 opts.insert("application_credentials".to_string(), v.clone());
1500 }
1501 if let Some(v) = service_account_key {
1502 opts.insert("service_account_key".to_string(), v.clone());
1503 }
1504 }
1505 CloudStorageConfig::Azure {
1506 container,
1507 account,
1508 access_key,
1509 sas_token,
1510 } => {
1511 opts.insert("account_name".to_string(), account.clone());
1512 opts.insert("container_name".to_string(), container.clone());
1513 if let Some(v) = access_key {
1514 opts.insert("access_key".to_string(), v.clone());
1515 }
1516 if let Some(v) = sas_token {
1517 opts.insert("sas_token".to_string(), v.clone());
1518 }
1519 }
1520 }
1521
1522 opts
1523 }
1524}