1use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8
9pub mod appender;
10pub mod builder;
11pub mod bulk;
12pub mod compaction;
13pub mod functions;
14pub mod hooks;
15pub mod impl_locy;
16pub mod impl_query;
17pub mod indexes;
18pub mod locy_builder;
19pub mod locy_result;
20pub mod multi_agent;
21pub mod notifications;
22pub mod prepared;
23pub mod query_builder;
24pub mod rule_registry;
25pub mod schema;
26pub mod session;
27pub mod sync;
28pub mod template;
29pub mod transaction;
30pub mod xervo;
31
32use object_store::ObjectStore;
33use object_store::local::LocalFileSystem;
34use tracing::info;
35use uni_common::core::snapshot::SnapshotManifest;
36use uni_common::{CloudStorageConfig, UniConfig};
37use uni_common::{Result, UniError};
38use uni_store::cloud::build_cloud_store;
39use uni_xervo::api::{ModelAliasSpec, ModelTask};
40use uni_xervo::runtime::ModelRuntime;
41
42use uni_common::core::schema::SchemaManager;
43use uni_store::runtime::id_allocator::IdAllocator;
44use uni_store::runtime::property_manager::PropertyManager;
45use uni_store::runtime::wal::WriteAheadLog;
46use uni_store::storage::manager::StorageManager;
47
48use tokio::sync::RwLock;
49use uni_store::runtime::writer::Writer;
50
51use crate::shutdown::ShutdownHandle;
52
53use std::collections::HashMap;
54
55#[doc(hidden)]
62pub struct UniInner {
63 pub(crate) storage: Arc<StorageManager>,
64 pub(crate) schema: Arc<SchemaManager>,
65 pub(crate) properties: Arc<PropertyManager>,
66 pub(crate) writer: Option<Arc<RwLock<Writer>>>,
67 pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
68 pub(crate) config: UniConfig,
69 pub(crate) procedure_registry: Arc<uni_query::ProcedureRegistry>,
70 pub(crate) shutdown_handle: Arc<ShutdownHandle>,
71 pub(crate) locy_rule_registry: Arc<std::sync::RwLock<impl_locy::LocyRuleRegistry>>,
76 pub(crate) start_time: Instant,
78 pub(crate) commit_tx: tokio::sync::broadcast::Sender<Arc<notifications::CommitNotification>>,
80 pub(crate) write_lease: Option<multi_agent::WriteLease>,
82 pub(crate) active_session_count: AtomicUsize,
84 pub(crate) total_queries: AtomicU64,
86 pub(crate) total_commits: AtomicU64,
88 pub(crate) custom_functions: Arc<std::sync::RwLock<uni_query::CustomFunctionRegistry>>,
90
91 pub(crate) cached_l0_mutation_count: AtomicUsize,
94 pub(crate) cached_l0_estimated_size: AtomicUsize,
96 pub(crate) cached_wal_lsn: AtomicU64,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
105pub struct ThrottlePressure(f64);
106
107impl ThrottlePressure {
108 pub fn new(value: f64) -> Self {
110 Self(value.clamp(0.0, 1.0))
111 }
112
113 pub fn value(&self) -> f64 {
115 self.0
116 }
117
118 pub fn is_throttled(&self) -> bool {
120 self.0 > 0.0
121 }
122}
123
124impl std::fmt::Display for ThrottlePressure {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 write!(f, "{:.1}%", self.0 * 100.0)
127 }
128}
129
130impl Default for ThrottlePressure {
131 fn default() -> Self {
132 Self(0.0)
133 }
134}
135
136#[derive(Debug, Clone)]
138pub struct DatabaseMetrics {
139 pub l0_mutation_count: usize,
141 pub l0_estimated_size_bytes: usize,
143 pub schema_version: u64,
145 pub uptime: Duration,
147 pub active_sessions: usize,
149 pub l1_run_count: usize,
151 pub write_throttle_pressure: ThrottlePressure,
153 pub compaction_status: uni_store::CompactionStatus,
155 pub wal_size_bytes: u64,
157 pub wal_lsn: u64,
159 pub total_queries: u64,
161 pub total_commits: u64,
163}
164
165pub struct Uni {
187 pub(crate) inner: Arc<UniInner>,
188}
189
190impl UniInner {
194 pub(crate) async fn at_snapshot(&self, snapshot_id: &str) -> Result<UniInner> {
199 let manifest = self
200 .storage
201 .snapshot_manager()
202 .load_snapshot(snapshot_id)
203 .await
204 .map_err(UniError::Internal)?;
205
206 let pinned_storage = Arc::new(self.storage.pinned(manifest));
207
208 let prop_manager = Arc::new(PropertyManager::new(
209 pinned_storage.clone(),
210 self.schema.clone(),
211 self.properties.cache_size(),
212 ));
213
214 let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
215
216 let (commit_tx, _) = tokio::sync::broadcast::channel(256);
217 Ok(UniInner {
218 storage: pinned_storage,
219 schema: self.schema.clone(),
220 properties: prop_manager,
221 writer: None,
222 xervo_runtime: self.xervo_runtime.clone(),
223 config: self.config.clone(),
224 procedure_registry: self.procedure_registry.clone(),
225 shutdown_handle,
226 locy_rule_registry: Arc::new(std::sync::RwLock::new(
227 impl_locy::LocyRuleRegistry::default(),
228 )),
229 start_time: Instant::now(),
230 commit_tx,
231 write_lease: None,
232 active_session_count: AtomicUsize::new(0),
233 total_queries: AtomicU64::new(0),
234 total_commits: AtomicU64::new(0),
235 custom_functions: self.custom_functions.clone(),
236 cached_l0_mutation_count: AtomicUsize::new(0),
237 cached_l0_estimated_size: AtomicUsize::new(0),
238 cached_wal_lsn: AtomicU64::new(0),
239 })
240 }
241}
242
243impl Uni {
244 pub fn open(uri: impl Into<String>) -> UniBuilder {
256 UniBuilder::new(uri.into())
257 }
258
259 pub fn open_existing(uri: impl Into<String>) -> UniBuilder {
261 let mut builder = UniBuilder::new(uri.into());
262 builder.create_if_missing = false;
263 builder
264 }
265
266 pub fn create(uri: impl Into<String>) -> UniBuilder {
268 let mut builder = UniBuilder::new(uri.into());
269 builder.fail_if_exists = true;
270 builder
271 }
272
273 pub fn temporary() -> UniBuilder {
278 let temp_dir = std::env::temp_dir().join(format!("uni_mem_{}", uuid::Uuid::new_v4()));
279 UniBuilder::new(temp_dir.to_string_lossy().to_string())
280 }
281
282 pub fn in_memory() -> UniBuilder {
284 Self::temporary()
285 }
286
287 pub fn session(&self) -> session::Session {
305 session::Session::new(self.inner.clone())
306 }
307
308 pub fn session_template(&self) -> template::SessionTemplateBuilder {
313 template::SessionTemplateBuilder::new(self.inner.clone())
314 }
315
316 pub fn metrics(&self) -> DatabaseMetrics {
324 let schema_version = self.inner.schema.schema().schema_version as u64;
325 let compaction_status = self.inner.storage.compaction_status().unwrap_or_default();
326 DatabaseMetrics {
327 l0_mutation_count: self.inner.cached_l0_mutation_count.load(Ordering::Relaxed),
328 l0_estimated_size_bytes: self.inner.cached_l0_estimated_size.load(Ordering::Relaxed),
329 schema_version,
330 uptime: self.inner.start_time.elapsed(),
331 active_sessions: self.inner.active_session_count.load(Ordering::Relaxed),
332 l1_run_count: compaction_status.l1_runs,
333 write_throttle_pressure: ThrottlePressure::default(),
334 compaction_status,
335 wal_size_bytes: 0u64,
336 wal_lsn: self.inner.cached_wal_lsn.load(Ordering::Relaxed),
337 total_queries: self.inner.total_queries.load(Ordering::Relaxed),
338 total_commits: self.inner.total_commits.load(Ordering::Relaxed),
339 }
340 }
341
342 pub fn write_lease(&self) -> Option<&multi_agent::WriteLease> {
345 self.inner.write_lease.as_ref()
346 }
347
348 pub fn rules(&self) -> rule_registry::RuleRegistry<'_> {
354 rule_registry::RuleRegistry::new(&self.inner.locy_rule_registry)
355 }
356
357 pub fn config(&self) -> &UniConfig {
361 &self.inner.config
362 }
363
364 #[doc(hidden)]
366 pub fn procedure_registry(&self) -> &Arc<uni_query::ProcedureRegistry> {
367 &self.inner.procedure_registry
368 }
369
370 #[doc(hidden)]
372 pub fn schema_manager(&self) -> Arc<SchemaManager> {
373 self.inner.schema.clone()
374 }
375
376 #[doc(hidden)]
377 pub fn writer(&self) -> Option<Arc<RwLock<Writer>>> {
378 self.inner.writer.clone()
379 }
380
381 #[doc(hidden)]
382 pub fn storage(&self) -> Arc<StorageManager> {
383 self.inner.storage.clone()
384 }
385
386 pub async fn flush(&self) -> Result<()> {
391 if let Some(writer_lock) = &self.inner.writer {
392 let mut writer = writer_lock.write().await;
393 writer
394 .flush_to_l1(None)
395 .await
396 .map(|_| ())
397 .map_err(UniError::Internal)
398 } else {
399 Err(UniError::ReadOnly {
400 operation: "flush".to_string(),
401 })
402 }
403 }
404
405 pub async fn create_snapshot(&self, name: &str) -> Result<String> {
411 if name.is_empty() {
412 return Err(UniError::Internal(anyhow::anyhow!(
413 "Snapshot name cannot be empty"
414 )));
415 }
416
417 let snapshot_id = if let Some(writer_lock) = &self.inner.writer {
418 let mut writer = writer_lock.write().await;
419 writer
420 .flush_to_l1(Some(name.to_string()))
421 .await
422 .map_err(UniError::Internal)?
423 } else {
424 return Err(UniError::ReadOnly {
425 operation: "create_snapshot".to_string(),
426 });
427 };
428
429 self.inner
430 .storage
431 .snapshot_manager()
432 .save_named_snapshot(name, &snapshot_id)
433 .await
434 .map_err(UniError::Internal)?;
435
436 Ok(snapshot_id)
437 }
438
439 pub async fn list_snapshots(&self) -> Result<Vec<SnapshotManifest>> {
441 let sm = self.inner.storage.snapshot_manager();
442 let ids = sm.list_snapshots().await.map_err(UniError::Internal)?;
443 let mut manifests = Vec::new();
444 for id in ids {
445 if let Ok(m) = sm.load_snapshot(&id).await {
446 manifests.push(m);
447 }
448 }
449 Ok(manifests)
450 }
451
452 pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
457 self.inner
458 .storage
459 .snapshot_manager()
460 .set_latest_snapshot(snapshot_id)
461 .await
462 .map_err(UniError::Internal)
463 }
464
465 pub async fn label_exists(&self, name: &str) -> Result<bool> {
467 Ok(self
468 .inner
469 .schema
470 .schema()
471 .labels
472 .get(name)
473 .is_some_and(|l| {
474 matches!(
475 l.state,
476 uni_common::core::schema::SchemaElementState::Active
477 )
478 }))
479 }
480
481 pub async fn edge_type_exists(&self, name: &str) -> Result<bool> {
483 Ok(self
484 .inner
485 .schema
486 .schema()
487 .edge_types
488 .get(name)
489 .is_some_and(|e| {
490 matches!(
491 e.state,
492 uni_common::core::schema::SchemaElementState::Active
493 )
494 }))
495 }
496
497 pub async fn list_labels(&self) -> Result<Vec<String>> {
503 let mut all_labels = std::collections::HashSet::new();
504
505 for (name, label) in self.inner.schema.schema().labels.iter() {
507 if matches!(
508 label.state,
509 uni_common::core::schema::SchemaElementState::Active
510 ) {
511 all_labels.insert(name.clone());
512 }
513 }
514
515 let query = "MATCH (n) RETURN DISTINCT labels(n) AS labels";
517 let result = self.inner.execute_internal(query, HashMap::new()).await?;
518 for row in result.rows() {
519 if let Ok(labels_list) = row.get::<Vec<String>>("labels") {
520 for label in labels_list {
521 all_labels.insert(label);
522 }
523 }
524 }
525
526 Ok(all_labels.into_iter().collect())
527 }
528
529 pub async fn list_edge_types(&self) -> Result<Vec<String>> {
531 Ok(self
532 .inner
533 .schema
534 .schema()
535 .edge_types
536 .iter()
537 .filter(|(_, e)| {
538 matches!(
539 e.state,
540 uni_common::core::schema::SchemaElementState::Active
541 )
542 })
543 .map(|(name, _)| name.clone())
544 .collect())
545 }
546
547 pub async fn get_label_info(
549 &self,
550 name: &str,
551 ) -> Result<Option<crate::api::schema::LabelInfo>> {
552 let schema = self.inner.schema.schema();
553 if schema.labels.contains_key(name) {
554 let count = if let Ok(ds) = self.inner.storage.vertex_dataset(name) {
555 if let Ok(raw) = ds.open_raw().await {
556 raw.count_rows(None)
557 .await
558 .map_err(|e| UniError::Internal(anyhow::anyhow!(e)))?
559 } else {
560 0
561 }
562 } else {
563 0
564 };
565
566 let mut properties = Vec::new();
567 if let Some(props) = schema.properties.get(name) {
568 for (prop_name, prop_meta) in props {
569 let is_indexed = schema.indexes.iter().any(|idx| match idx {
570 uni_common::core::schema::IndexDefinition::Vector(v) => {
571 v.label == name && v.property == *prop_name
572 }
573 uni_common::core::schema::IndexDefinition::Scalar(s) => {
574 s.label == name && s.properties.contains(prop_name)
575 }
576 uni_common::core::schema::IndexDefinition::FullText(f) => {
577 f.label == name && f.properties.contains(prop_name)
578 }
579 uni_common::core::schema::IndexDefinition::Inverted(inv) => {
580 inv.label == name && inv.property == *prop_name
581 }
582 uni_common::core::schema::IndexDefinition::JsonFullText(j) => {
583 j.label == name
584 }
585 _ => false,
586 });
587
588 properties.push(crate::api::schema::PropertyInfo {
589 name: prop_name.clone(),
590 data_type: format!("{:?}", prop_meta.r#type),
591 nullable: prop_meta.nullable,
592 is_indexed,
593 });
594 }
595 }
596
597 let mut indexes = Vec::new();
598 for idx in schema.indexes.iter().filter(|i| i.label() == name) {
599 use uni_common::core::schema::IndexDefinition;
600 let (idx_type, idx_props) = match idx {
601 IndexDefinition::Vector(v) => ("VECTOR", vec![v.property.clone()]),
602 IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
603 IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
604 IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
605 IndexDefinition::JsonFullText(j) => ("JSON_FTS", vec![j.column.clone()]),
606 _ => continue,
607 };
608
609 indexes.push(crate::api::schema::IndexInfo {
610 name: idx.name().to_string(),
611 index_type: idx_type.to_string(),
612 properties: idx_props,
613 status: "ONLINE".to_string(), });
615 }
616
617 let mut constraints = Vec::new();
618 for c in &schema.constraints {
619 if let uni_common::core::schema::ConstraintTarget::Label(l) = &c.target
620 && l == name
621 {
622 let (ctype, cprops) = match &c.constraint_type {
623 uni_common::core::schema::ConstraintType::Unique { properties } => {
624 ("UNIQUE", properties.clone())
625 }
626 uni_common::core::schema::ConstraintType::Exists { property } => {
627 ("EXISTS", vec![property.clone()])
628 }
629 uni_common::core::schema::ConstraintType::Check { expression } => {
630 ("CHECK", vec![expression.clone()])
631 }
632 _ => ("UNKNOWN", vec![]),
633 };
634
635 constraints.push(crate::api::schema::ConstraintInfo {
636 name: c.name.clone(),
637 constraint_type: ctype.to_string(),
638 properties: cprops,
639 enabled: c.enabled,
640 });
641 }
642 }
643
644 Ok(Some(crate::api::schema::LabelInfo {
645 name: name.to_string(),
646 count,
647 properties,
648 indexes,
649 constraints,
650 }))
651 } else {
652 Ok(None)
653 }
654 }
655
656 pub async fn get_edge_type_info(
658 &self,
659 name: &str,
660 ) -> Result<Option<crate::api::schema::EdgeTypeInfo>> {
661 let schema = self.inner.schema.schema();
662 let edge_meta = match schema.edge_types.get(name) {
663 Some(meta) => meta,
664 None => return Ok(None),
665 };
666
667 let count = {
669 let query = format!("MATCH ()-[r:{}]->() RETURN count(r) AS cnt", name);
670 match self.inner.execute_internal(&query, HashMap::new()).await {
671 Ok(result) => result
672 .rows()
673 .first()
674 .and_then(|r| r.get::<i64>("cnt").ok())
675 .unwrap_or(0) as usize,
676 Err(_) => 0,
677 }
678 };
679
680 let source_labels = edge_meta.src_labels.clone();
681 let target_labels = edge_meta.dst_labels.clone();
682
683 let mut properties = Vec::new();
684 if let Some(props) = schema.properties.get(name) {
685 for (prop_name, prop_meta) in props {
686 let is_indexed = schema.indexes.iter().any(|idx| match idx {
687 uni_common::core::schema::IndexDefinition::Scalar(s) => {
688 s.label == name && s.properties.contains(prop_name)
689 }
690 uni_common::core::schema::IndexDefinition::FullText(f) => {
691 f.label == name && f.properties.contains(prop_name)
692 }
693 uni_common::core::schema::IndexDefinition::Inverted(inv) => {
694 inv.label == name && inv.property == *prop_name
695 }
696 _ => false,
697 });
698
699 properties.push(crate::api::schema::PropertyInfo {
700 name: prop_name.clone(),
701 data_type: format!("{:?}", prop_meta.r#type),
702 nullable: prop_meta.nullable,
703 is_indexed,
704 });
705 }
706 }
707
708 let mut indexes = Vec::new();
709 for idx in schema.indexes.iter().filter(|i| i.label() == name) {
710 use uni_common::core::schema::IndexDefinition;
711 let (idx_type, idx_props) = match idx {
712 IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
713 IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
714 IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
715 _ => continue,
716 };
717
718 indexes.push(crate::api::schema::IndexInfo {
719 name: idx.name().to_string(),
720 index_type: idx_type.to_string(),
721 properties: idx_props,
722 status: "ONLINE".to_string(),
723 });
724 }
725
726 let mut constraints = Vec::new();
727 for c in &schema.constraints {
728 if let uni_common::core::schema::ConstraintTarget::EdgeType(et) = &c.target
729 && et == name
730 {
731 let (ctype, cprops) = match &c.constraint_type {
732 uni_common::core::schema::ConstraintType::Unique { properties } => {
733 ("UNIQUE", properties.clone())
734 }
735 uni_common::core::schema::ConstraintType::Exists { property } => {
736 ("EXISTS", vec![property.clone()])
737 }
738 uni_common::core::schema::ConstraintType::Check { expression } => {
739 ("CHECK", vec![expression.clone()])
740 }
741 _ => ("UNKNOWN", vec![]),
742 };
743
744 constraints.push(crate::api::schema::ConstraintInfo {
745 name: c.name.clone(),
746 constraint_type: ctype.to_string(),
747 properties: cprops,
748 enabled: c.enabled,
749 });
750 }
751 }
752
753 Ok(Some(crate::api::schema::EdgeTypeInfo {
754 name: name.to_string(),
755 count,
756 source_labels,
757 target_labels,
758 properties,
759 indexes,
760 constraints,
761 }))
762 }
763
764 pub fn compaction(&self) -> compaction::Compaction<'_> {
768 compaction::Compaction { inner: &self.inner }
769 }
770
771 pub fn indexes(&self) -> indexes::Indexes<'_> {
775 indexes::Indexes { inner: &self.inner }
776 }
777
778 pub fn functions(&self) -> functions::Functions<'_> {
782 functions::Functions { inner: &self.inner }
783 }
784
785 pub async fn shutdown(self) -> Result<()> {
790 if let Some(ref writer) = self.inner.writer {
792 let mut w = writer.write().await;
793 if let Err(e) = w.flush_to_l1(None).await {
794 tracing::error!("Error flushing during shutdown: {}", e);
795 }
796 }
797
798 self.inner
799 .shutdown_handle
800 .shutdown_async()
801 .await
802 .map_err(UniError::Internal)
803 }
804}
805
806impl Drop for Uni {
807 fn drop(&mut self) {
808 self.inner.shutdown_handle.shutdown_blocking();
809 tracing::debug!("Uni dropped, shutdown signal sent");
810 }
811}
812
813#[must_use = "builders do nothing until .build() is called"]
815pub struct UniBuilder {
816 uri: String,
817 config: UniConfig,
818 schema_file: Option<PathBuf>,
819 xervo_catalog: Option<Vec<ModelAliasSpec>>,
820 hybrid_remote_url: Option<String>,
821 cloud_config: Option<CloudStorageConfig>,
822 create_if_missing: bool,
823 fail_if_exists: bool,
824 read_only: bool,
825 write_lease: Option<multi_agent::WriteLease>,
826}
827
828impl UniBuilder {
829 pub fn new(uri: String) -> Self {
831 Self {
832 uri,
833 config: UniConfig::default(),
834 schema_file: None,
835 xervo_catalog: None,
836 hybrid_remote_url: None,
837 cloud_config: None,
838 create_if_missing: true,
839 fail_if_exists: false,
840 read_only: false,
841 write_lease: None,
842 }
843 }
844
845 pub fn schema_file(mut self, path: impl AsRef<Path>) -> Self {
847 self.schema_file = Some(path.as_ref().to_path_buf());
848 self
849 }
850
851 pub fn xervo_catalog(mut self, catalog: Vec<ModelAliasSpec>) -> Self {
853 self.xervo_catalog = Some(catalog);
854 self
855 }
856
857 pub fn remote_storage(mut self, remote_url: &str, config: CloudStorageConfig) -> Self {
880 self.hybrid_remote_url = Some(remote_url.to_string());
881 self.cloud_config = Some(config);
882 self
883 }
884
885 pub fn read_only(mut self) -> Self {
891 self.read_only = true;
892 self
893 }
894
895 pub fn write_lease(mut self, lease: multi_agent::WriteLease) -> Self {
900 self.write_lease = Some(lease);
901 self
902 }
903
904 pub fn config(mut self, config: UniConfig) -> Self {
906 self.config = config;
907 self
908 }
909
910 pub async fn build(self) -> Result<Uni> {
912 let uri = self.uri.clone();
913 let is_remote_uri = uri.contains("://");
914 let is_hybrid = self.hybrid_remote_url.is_some();
915
916 if is_hybrid && is_remote_uri {
917 return Err(UniError::Internal(anyhow::anyhow!(
918 "Hybrid mode requires a local path as primary URI, found: {}",
919 uri
920 )));
921 }
922
923 let (storage_uri, data_store, local_store_opt) = if is_hybrid {
924 let remote_url = self.hybrid_remote_url.as_ref().unwrap();
925
926 let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
928 build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
929 } else {
930 let url = url::Url::parse(remote_url).map_err(|e| {
931 UniError::Io(std::io::Error::new(
932 std::io::ErrorKind::InvalidInput,
933 e.to_string(),
934 ))
935 })?;
936 let (os, _path) =
937 object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
938 Arc::from(os)
939 };
940
941 let path = PathBuf::from(&uri);
943 if path.exists() {
944 if self.fail_if_exists {
945 return Err(UniError::Internal(anyhow::anyhow!(
946 "Database already exists at {}",
947 uri
948 )));
949 }
950 } else {
951 if !self.create_if_missing {
952 return Err(UniError::NotFound { path: path.clone() });
953 }
954 std::fs::create_dir_all(&path).map_err(UniError::Io)?;
955 }
956
957 let local_store = Arc::new(
958 LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
959 );
960
961 (
964 remote_url.clone(),
965 remote_store,
966 Some(local_store as Arc<dyn ObjectStore>),
967 )
968 } else if is_remote_uri {
969 let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
971 build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
972 } else {
973 let url = url::Url::parse(&uri).map_err(|e| {
974 UniError::Io(std::io::Error::new(
975 std::io::ErrorKind::InvalidInput,
976 e.to_string(),
977 ))
978 })?;
979 let (os, _path) =
980 object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
981 Arc::from(os)
982 };
983
984 (uri.clone(), remote_store, None)
985 } else {
986 let path = PathBuf::from(&uri);
988 let storage_path = path.join("storage");
989
990 if path.exists() {
991 if self.fail_if_exists {
992 return Err(UniError::Internal(anyhow::anyhow!(
993 "Database already exists at {}",
994 uri
995 )));
996 }
997 } else {
998 if !self.create_if_missing {
999 return Err(UniError::NotFound { path: path.clone() });
1000 }
1001 std::fs::create_dir_all(&path).map_err(UniError::Io)?;
1002 }
1003
1004 if !storage_path.exists() {
1006 std::fs::create_dir_all(&storage_path).map_err(UniError::Io)?;
1007 }
1008
1009 let store = Arc::new(
1010 LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
1011 );
1012 (
1013 storage_path.to_string_lossy().to_string(),
1014 store.clone() as Arc<dyn ObjectStore>,
1015 Some(store as Arc<dyn ObjectStore>),
1016 )
1017 };
1018
1019 let schema_obj_path = object_store::path::Path::from("catalog/schema.json");
1021 let legacy_schema_obj_path = object_store::path::Path::from("schema.json");
1023
1024 let has_catalog_schema = match data_store.get(&schema_obj_path).await {
1028 Ok(_) => true,
1029 Err(object_store::Error::NotFound { .. }) => false,
1030 Err(e) => return Err(UniError::Internal(e.into())),
1031 };
1032 if !has_catalog_schema {
1033 match data_store.get(&legacy_schema_obj_path).await {
1034 Ok(result) => {
1035 let bytes = result
1036 .bytes()
1037 .await
1038 .map_err(|e| UniError::Internal(e.into()))?;
1039 data_store
1040 .put(&schema_obj_path, bytes.into())
1041 .await
1042 .map_err(|e| UniError::Internal(e.into()))?;
1043 info!(
1044 legacy = %legacy_schema_obj_path,
1045 target = %schema_obj_path,
1046 "Migrated legacy schema path to catalog path"
1047 );
1048 }
1049 Err(object_store::Error::NotFound { .. }) => {}
1050 Err(e) => return Err(UniError::Internal(e.into())),
1051 }
1052 }
1053
1054 let schema_manager = Arc::new(
1057 SchemaManager::load_from_store(data_store.clone(), &schema_obj_path)
1058 .await
1059 .map_err(UniError::Internal)?,
1060 );
1061
1062 let lancedb_storage_options = self
1063 .cloud_config
1064 .as_ref()
1065 .map(Self::cloud_config_to_lancedb_storage_options);
1066
1067 let storage = if is_hybrid || is_remote_uri {
1068 StorageManager::new_with_store_and_storage_options(
1071 &storage_uri,
1072 data_store.clone(),
1073 schema_manager.clone(),
1074 self.config.clone(),
1075 lancedb_storage_options.clone(),
1076 )
1077 .await
1078 .map_err(UniError::Internal)?
1079 } else {
1080 StorageManager::new_with_config(
1082 &storage_uri,
1083 schema_manager.clone(),
1084 self.config.clone(),
1085 )
1086 .await
1087 .map_err(UniError::Internal)?
1088 };
1089
1090 let storage = Arc::new(storage);
1091
1092 let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
1094
1095 let compaction_handle = storage
1097 .clone()
1098 .start_background_compaction(shutdown_handle.subscribe());
1099 shutdown_handle.track_task(compaction_handle);
1100
1101 let prop_cache_capacity = self.config.cache_size / 1024;
1103
1104 let prop_manager = Arc::new(PropertyManager::new(
1105 storage.clone(),
1106 schema_manager.clone(),
1107 prop_cache_capacity,
1108 ));
1109
1110 let id_store = local_store_opt
1112 .clone()
1113 .unwrap_or_else(|| data_store.clone());
1114 let wal_store = local_store_opt
1115 .clone()
1116 .unwrap_or_else(|| data_store.clone());
1117
1118 let latest_snapshot = storage
1121 .snapshot_manager()
1122 .load_latest_snapshot()
1123 .await
1124 .map_err(UniError::Internal)?;
1125
1126 let (start_version, wal_high_water_mark) = if let Some(ref snapshot) = latest_snapshot {
1127 (
1128 snapshot.version_high_water_mark + 1,
1129 snapshot.wal_high_water_mark,
1130 )
1131 } else {
1132 let has_manifests = storage
1134 .snapshot_manager()
1135 .has_any_manifests()
1136 .await
1137 .unwrap_or(false);
1138
1139 let wal_check =
1140 WriteAheadLog::new(wal_store.clone(), object_store::path::Path::from("wal"));
1141 let has_wal = wal_check.has_segments().await.unwrap_or(false);
1142
1143 if has_manifests {
1144 let snapshot_ids = storage
1146 .snapshot_manager()
1147 .list_snapshots()
1148 .await
1149 .map_err(UniError::Internal)?;
1150 if let Some(last_id) = snapshot_ids.last() {
1151 let manifest = storage
1152 .snapshot_manager()
1153 .load_snapshot(last_id)
1154 .await
1155 .map_err(UniError::Internal)?;
1156 tracing::warn!(
1157 "Latest snapshot pointer missing but found manifest '{}'. \
1158 Recovering version {}.",
1159 last_id,
1160 manifest.version_high_water_mark
1161 );
1162 (
1163 manifest.version_high_water_mark + 1,
1164 manifest.wal_high_water_mark,
1165 )
1166 } else {
1167 return Err(UniError::Internal(anyhow::anyhow!(
1168 "Snapshot manifests directory exists but contains no valid manifests. \
1169 Possible data corruption."
1170 )));
1171 }
1172 } else if has_wal {
1173 return Err(UniError::Internal(anyhow::anyhow!(
1175 "Database has WAL segments but no snapshot manifest. \
1176 Cannot safely determine version counter -- starting at 0 would cause \
1177 version conflicts and data corruption. \
1178 Restore the snapshot manifest or delete WAL to start fresh."
1179 )));
1180 } else {
1181 (0, 0)
1183 }
1184 };
1185
1186 let allocator = Arc::new(
1187 IdAllocator::new(
1188 id_store,
1189 object_store::path::Path::from("id_allocator.json"),
1190 1000,
1191 )
1192 .await
1193 .map_err(UniError::Internal)?,
1194 );
1195
1196 let wal = if !self.config.wal_enabled {
1197 None
1199 } else if is_remote_uri && !is_hybrid {
1200 Some(Arc::new(WriteAheadLog::new(
1202 wal_store,
1203 object_store::path::Path::from("wal"),
1204 )))
1205 } else if is_hybrid || !is_remote_uri {
1206 Some(Arc::new(WriteAheadLog::new(
1209 wal_store,
1210 object_store::path::Path::from("wal"),
1211 )))
1212 } else {
1213 None
1214 };
1215
1216 let writer = Arc::new(RwLock::new(
1217 Writer::new_with_config(
1218 storage.clone(),
1219 schema_manager.clone(),
1220 start_version,
1221 self.config.clone(),
1222 wal,
1223 Some(allocator),
1224 )
1225 .await
1226 .map_err(UniError::Internal)?,
1227 ));
1228
1229 let required_embed_aliases: std::collections::BTreeSet<String> = schema_manager
1230 .schema()
1231 .indexes
1232 .iter()
1233 .filter_map(|idx| {
1234 if let uni_common::core::schema::IndexDefinition::Vector(cfg) = idx {
1235 cfg.embedding_config.as_ref().map(|emb| emb.alias.clone())
1236 } else {
1237 None
1238 }
1239 })
1240 .collect();
1241
1242 if !required_embed_aliases.is_empty() && self.xervo_catalog.is_none() {
1243 return Err(UniError::Internal(anyhow::anyhow!(
1244 "Uni-Xervo catalog is required because schema has vector indexes with embedding aliases"
1245 )));
1246 }
1247
1248 let xervo_runtime = if let Some(catalog) = self.xervo_catalog {
1249 for alias in &required_embed_aliases {
1250 let spec = catalog.iter().find(|s| &s.alias == alias).ok_or_else(|| {
1251 UniError::Internal(anyhow::anyhow!(
1252 "Missing Uni-Xervo alias '{}' referenced by vector index embedding config",
1253 alias
1254 ))
1255 })?;
1256 if spec.task != ModelTask::Embed {
1257 return Err(UniError::Internal(anyhow::anyhow!(
1258 "Uni-Xervo alias '{}' must be an embed task",
1259 alias
1260 )));
1261 }
1262 }
1263
1264 let mut runtime_builder = ModelRuntime::builder().catalog(catalog);
1265 #[cfg(feature = "provider-candle")]
1266 {
1267 runtime_builder = runtime_builder
1268 .register_provider(uni_xervo::provider::LocalCandleProvider::new());
1269 }
1270 #[cfg(feature = "provider-fastembed")]
1271 {
1272 runtime_builder = runtime_builder
1273 .register_provider(uni_xervo::provider::LocalFastEmbedProvider::new());
1274 }
1275 #[cfg(feature = "provider-openai")]
1276 {
1277 runtime_builder = runtime_builder
1278 .register_provider(uni_xervo::provider::RemoteOpenAIProvider::new());
1279 }
1280 #[cfg(feature = "provider-gemini")]
1281 {
1282 runtime_builder = runtime_builder
1283 .register_provider(uni_xervo::provider::RemoteGeminiProvider::new());
1284 }
1285 #[cfg(feature = "provider-vertexai")]
1286 {
1287 runtime_builder = runtime_builder
1288 .register_provider(uni_xervo::provider::RemoteVertexAIProvider::new());
1289 }
1290 #[cfg(feature = "provider-mistral")]
1291 {
1292 runtime_builder = runtime_builder
1293 .register_provider(uni_xervo::provider::RemoteMistralProvider::new());
1294 }
1295 #[cfg(feature = "provider-anthropic")]
1296 {
1297 runtime_builder = runtime_builder
1298 .register_provider(uni_xervo::provider::RemoteAnthropicProvider::new());
1299 }
1300 #[cfg(feature = "provider-voyageai")]
1301 {
1302 runtime_builder = runtime_builder
1303 .register_provider(uni_xervo::provider::RemoteVoyageAIProvider::new());
1304 }
1305 #[cfg(feature = "provider-cohere")]
1306 {
1307 runtime_builder = runtime_builder
1308 .register_provider(uni_xervo::provider::RemoteCohereProvider::new());
1309 }
1310 #[cfg(feature = "provider-azure-openai")]
1311 {
1312 runtime_builder = runtime_builder
1313 .register_provider(uni_xervo::provider::RemoteAzureOpenAIProvider::new());
1314 }
1315 #[cfg(feature = "provider-mistralrs")]
1316 {
1317 runtime_builder = runtime_builder
1318 .register_provider(uni_xervo::provider::LocalMistralRsProvider::new());
1319 }
1320
1321 Some(
1322 runtime_builder
1323 .build()
1324 .await
1325 .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?,
1326 )
1327 } else {
1328 None
1329 };
1330
1331 if let Some(ref runtime) = xervo_runtime {
1332 let mut writer_guard = writer.write().await;
1333 writer_guard.set_xervo_runtime(runtime.clone());
1334 }
1335
1336 {
1339 let w = writer.read().await;
1340 let replayed = w
1341 .replay_wal(wal_high_water_mark)
1342 .await
1343 .map_err(UniError::Internal)?;
1344 if replayed > 0 {
1345 info!("WAL recovery: replayed {} mutations", replayed);
1346 }
1347 }
1348
1349 if self.config.index_rebuild.auto_rebuild_enabled {
1351 let rebuild_manager = Arc::new(
1352 uni_store::storage::IndexRebuildManager::new(
1353 storage.clone(),
1354 schema_manager.clone(),
1355 self.config.index_rebuild.clone(),
1356 )
1357 .await
1358 .map_err(UniError::Internal)?,
1359 );
1360
1361 let handle = rebuild_manager
1362 .clone()
1363 .start_background_worker(shutdown_handle.subscribe());
1364 shutdown_handle.track_task(handle);
1365
1366 {
1367 let mut writer_guard = writer.write().await;
1368 writer_guard.set_index_rebuild_manager(rebuild_manager);
1369 }
1370 }
1371
1372 if let Some(interval) = self.config.auto_flush_interval {
1374 let writer_clone = writer.clone();
1375 let mut shutdown_rx = shutdown_handle.subscribe();
1376
1377 let handle = tokio::spawn(async move {
1378 let mut ticker = tokio::time::interval(interval);
1379 loop {
1380 tokio::select! {
1381 _ = ticker.tick() => {
1382 let mut w = writer_clone.write().await;
1383 if let Err(e) = w.check_flush().await {
1384 tracing::warn!("Background flush check failed: {}", e);
1385 }
1386 }
1387 _ = shutdown_rx.recv() => {
1388 tracing::info!("Auto-flush shutting down, performing final flush");
1389 let mut w = writer_clone.write().await;
1390 let _ = w.flush_to_l1(None).await;
1391 break;
1392 }
1393 }
1394 }
1395 });
1396
1397 shutdown_handle.track_task(handle);
1398 }
1399
1400 let (commit_tx, _) = tokio::sync::broadcast::channel(256);
1401 let writer_field = if self.read_only { None } else { Some(writer) };
1402
1403 Ok(Uni {
1404 inner: Arc::new(UniInner {
1405 storage,
1406 schema: schema_manager,
1407 properties: prop_manager,
1408 writer: writer_field,
1409 xervo_runtime,
1410 config: self.config,
1411 procedure_registry: Arc::new(uni_query::ProcedureRegistry::new()),
1412 shutdown_handle,
1413 locy_rule_registry: Arc::new(std::sync::RwLock::new(
1414 impl_locy::LocyRuleRegistry::default(),
1415 )),
1416 start_time: Instant::now(),
1417 commit_tx,
1418 write_lease: self.write_lease,
1419 active_session_count: AtomicUsize::new(0),
1420 total_queries: AtomicU64::new(0),
1421 total_commits: AtomicU64::new(0),
1422 custom_functions: Arc::new(std::sync::RwLock::new(
1423 uni_query::CustomFunctionRegistry::new(),
1424 )),
1425 cached_l0_mutation_count: AtomicUsize::new(0),
1426 cached_l0_estimated_size: AtomicUsize::new(0),
1427 cached_wal_lsn: AtomicU64::new(0),
1428 }),
1429 })
1430 }
1431
1432 pub fn build_sync(self) -> Result<Uni> {
1434 let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
1435 rt.block_on(self.build())
1436 }
1437
1438 fn cloud_config_to_lancedb_storage_options(
1439 config: &CloudStorageConfig,
1440 ) -> std::collections::HashMap<String, String> {
1441 let mut opts = std::collections::HashMap::new();
1442
1443 match config {
1444 CloudStorageConfig::S3 {
1445 bucket,
1446 region,
1447 endpoint,
1448 access_key_id,
1449 secret_access_key,
1450 session_token,
1451 virtual_hosted_style,
1452 } => {
1453 opts.insert("bucket".to_string(), bucket.clone());
1454 opts.insert(
1455 "virtual_hosted_style_request".to_string(),
1456 virtual_hosted_style.to_string(),
1457 );
1458
1459 if let Some(r) = region {
1460 opts.insert("region".to_string(), r.clone());
1461 }
1462 if let Some(ep) = endpoint {
1463 opts.insert("endpoint".to_string(), ep.clone());
1464 if ep.starts_with("http://") {
1465 opts.insert("allow_http".to_string(), "true".to_string());
1466 }
1467 }
1468 if let Some(v) = access_key_id {
1469 opts.insert("access_key_id".to_string(), v.clone());
1470 }
1471 if let Some(v) = secret_access_key {
1472 opts.insert("secret_access_key".to_string(), v.clone());
1473 }
1474 if let Some(v) = session_token {
1475 opts.insert("session_token".to_string(), v.clone());
1476 }
1477 }
1478 CloudStorageConfig::Gcs {
1479 bucket,
1480 service_account_path,
1481 service_account_key,
1482 } => {
1483 opts.insert("bucket".to_string(), bucket.clone());
1484 if let Some(v) = service_account_path {
1485 opts.insert("service_account".to_string(), v.clone());
1486 opts.insert("application_credentials".to_string(), v.clone());
1487 }
1488 if let Some(v) = service_account_key {
1489 opts.insert("service_account_key".to_string(), v.clone());
1490 }
1491 }
1492 CloudStorageConfig::Azure {
1493 container,
1494 account,
1495 access_key,
1496 sas_token,
1497 } => {
1498 opts.insert("account_name".to_string(), account.clone());
1499 opts.insert("container_name".to_string(), container.clone());
1500 if let Some(v) = access_key {
1501 opts.insert("access_key".to_string(), v.clone());
1502 }
1503 if let Some(v) = sas_token {
1504 opts.insert("sas_token".to_string(), v.clone());
1505 }
1506 }
1507 }
1508
1509 opts
1510 }
1511}