1use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::time::Duration;
7
8pub mod builder;
9pub mod bulk;
10pub mod impl_locy;
11pub mod impl_query;
12pub mod locy_builder;
13pub mod query_builder;
14pub mod schema;
15pub mod session;
16pub mod sync;
17pub mod transaction;
18pub mod xervo;
19
20use object_store::ObjectStore;
21use object_store::local::LocalFileSystem;
22use tracing::info;
23use uni_common::core::snapshot::SnapshotManifest;
24use uni_common::{CloudStorageConfig, UniConfig};
25use uni_common::{Result, UniError};
26use uni_store::cloud::build_cloud_store;
27use uni_xervo::api::{ModelAliasSpec, ModelTask};
28use uni_xervo::runtime::ModelRuntime;
29
30use uni_common::core::schema::SchemaManager;
31use uni_store::runtime::id_allocator::IdAllocator;
32use uni_store::runtime::property_manager::PropertyManager;
33use uni_store::runtime::wal::WriteAheadLog;
34use uni_store::storage::manager::StorageManager;
35
36use tokio::sync::RwLock;
37use uni_store::runtime::writer::Writer;
38
39use crate::shutdown::ShutdownHandle;
40
41pub struct Uni {
82 pub(crate) storage: Arc<StorageManager>,
83 pub(crate) schema: Arc<SchemaManager>,
84 pub(crate) properties: Arc<PropertyManager>,
85 pub(crate) writer: Option<Arc<RwLock<Writer>>>,
86 pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
87 pub(crate) config: UniConfig,
88 pub(crate) procedure_registry: Arc<uni_query::ProcedureRegistry>,
89 pub(crate) shutdown_handle: Arc<ShutdownHandle>,
90 pub(crate) locy_rule_registry: Arc<std::sync::RwLock<impl_locy::LocyRuleRegistry>>,
95}
96
97impl Uni {
98 pub fn open(uri: impl Into<String>) -> UniBuilder {
110 UniBuilder::new(uri.into())
111 }
112
113 pub fn open_existing(uri: impl Into<String>) -> UniBuilder {
115 let mut builder = UniBuilder::new(uri.into());
116 builder.create_if_missing = false;
117 builder
118 }
119
120 pub fn create(uri: impl Into<String>) -> UniBuilder {
122 let mut builder = UniBuilder::new(uri.into());
123 builder.fail_if_exists = true;
124 builder
125 }
126
127 pub fn temporary() -> UniBuilder {
132 let temp_dir = std::env::temp_dir().join(format!("uni_mem_{}", uuid::Uuid::new_v4()));
133 UniBuilder::new(temp_dir.to_string_lossy().to_string())
134 }
135
136 pub fn in_memory() -> UniBuilder {
138 Self::temporary()
139 }
140
141 pub(crate) async fn at_snapshot(&self, snapshot_id: &str) -> Result<Uni> {
146 let manifest = self
147 .storage
148 .snapshot_manager()
149 .load_snapshot(snapshot_id)
150 .await
151 .map_err(UniError::Internal)?;
152
153 let pinned_storage = Arc::new(self.storage.pinned(manifest));
154
155 let prop_manager = Arc::new(PropertyManager::new(
156 pinned_storage.clone(),
157 self.schema.clone(),
158 self.properties.cache_size(),
159 ));
160
161 let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
164
165 Ok(Uni {
166 storage: pinned_storage,
167 schema: self.schema.clone(),
168 properties: prop_manager,
169 writer: None,
170 xervo_runtime: self.xervo_runtime.clone(),
171 config: self.config.clone(),
172 procedure_registry: self.procedure_registry.clone(),
173 shutdown_handle,
174 locy_rule_registry: Arc::new(std::sync::RwLock::new(
175 impl_locy::LocyRuleRegistry::default(),
176 )),
177 })
178 }
179
180 pub fn config(&self) -> &UniConfig {
182 &self.config
183 }
184
185 pub fn procedure_registry(&self) -> &Arc<uni_query::ProcedureRegistry> {
187 &self.procedure_registry
188 }
189
190 pub fn get_schema(&self) -> Arc<uni_common::core::schema::Schema> {
192 self.schema.schema()
193 }
194
195 pub fn bulk_writer(&self) -> bulk::BulkWriterBuilder<'_> {
197 bulk::BulkWriterBuilder::new(self)
198 }
199
200 pub fn session(&self) -> session::SessionBuilder<'_> {
202 session::SessionBuilder::new(self)
203 }
204
205 #[doc(hidden)]
207 pub fn schema_manager(&self) -> Arc<SchemaManager> {
208 self.schema.clone()
209 }
210
211 #[doc(hidden)]
212 pub fn writer(&self) -> Option<Arc<RwLock<Writer>>> {
213 self.writer.clone()
214 }
215
216 #[doc(hidden)]
217 pub fn storage(&self) -> Arc<StorageManager> {
218 self.storage.clone()
219 }
220
221 pub async fn flush(&self) -> Result<()> {
226 if let Some(writer_lock) = &self.writer {
227 let mut writer = writer_lock.write().await;
228 writer
229 .flush_to_l1(None)
230 .await
231 .map(|_| ())
232 .map_err(UniError::Internal)
233 } else {
234 Err(UniError::ReadOnly {
235 operation: "flush".to_string(),
236 })
237 }
238 }
239
240 pub async fn create_snapshot(&self, name: Option<&str>) -> Result<String> {
245 if let Some(writer_lock) = &self.writer {
246 let mut writer = writer_lock.write().await;
247 writer
248 .flush_to_l1(name.map(|s| s.to_string()))
249 .await
250 .map_err(UniError::Internal)
251 } else {
252 Err(UniError::ReadOnly {
253 operation: "create_snapshot".to_string(),
254 })
255 }
256 }
257
258 pub async fn create_named_snapshot(&self, name: &str) -> Result<String> {
260 if name.is_empty() {
261 return Err(UniError::Internal(anyhow::anyhow!(
262 "Snapshot name cannot be empty"
263 )));
264 }
265
266 let snapshot_id = self.create_snapshot(Some(name)).await?;
267
268 self.storage
269 .snapshot_manager()
270 .save_named_snapshot(name, &snapshot_id)
271 .await
272 .map_err(UniError::Internal)?;
273
274 Ok(snapshot_id)
275 }
276
277 pub async fn list_snapshots(&self) -> Result<Vec<SnapshotManifest>> {
279 let sm = self.storage.snapshot_manager();
280 let ids = sm.list_snapshots().await.map_err(UniError::Internal)?;
281 let mut manifests = Vec::new();
282 for id in ids {
283 if let Ok(m) = sm.load_snapshot(&id).await {
284 manifests.push(m);
285 }
286 }
287 Ok(manifests)
288 }
289
290 pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
295 self.storage
296 .snapshot_manager()
297 .set_latest_snapshot(snapshot_id)
298 .await
299 .map_err(UniError::Internal)
300 }
301
302 pub async fn label_exists(&self, name: &str) -> Result<bool> {
304 Ok(self.schema.schema().labels.get(name).is_some_and(|l| {
305 matches!(
306 l.state,
307 uni_common::core::schema::SchemaElementState::Active
308 )
309 }))
310 }
311
312 pub async fn edge_type_exists(&self, name: &str) -> Result<bool> {
314 Ok(self.schema.schema().edge_types.get(name).is_some_and(|e| {
315 matches!(
316 e.state,
317 uni_common::core::schema::SchemaElementState::Active
318 )
319 }))
320 }
321
322 pub async fn list_labels(&self) -> Result<Vec<String>> {
328 let mut all_labels = std::collections::HashSet::new();
329
330 for (name, label) in self.schema.schema().labels.iter() {
332 if matches!(
333 label.state,
334 uni_common::core::schema::SchemaElementState::Active
335 ) {
336 all_labels.insert(name.clone());
337 }
338 }
339
340 let query = "MATCH (n) RETURN DISTINCT labels(n) AS labels";
342 let result = self.query(query).await?;
343 for row in &result.rows {
344 if let Ok(labels_list) = row.get::<Vec<String>>("labels") {
345 for label in labels_list {
346 all_labels.insert(label);
347 }
348 }
349 }
350
351 Ok(all_labels.into_iter().collect())
352 }
353
354 pub async fn list_edge_types(&self) -> Result<Vec<String>> {
356 Ok(self
357 .schema
358 .schema()
359 .edge_types
360 .iter()
361 .filter(|(_, e)| {
362 matches!(
363 e.state,
364 uni_common::core::schema::SchemaElementState::Active
365 )
366 })
367 .map(|(name, _)| name.clone())
368 .collect())
369 }
370
371 pub async fn get_label_info(
373 &self,
374 name: &str,
375 ) -> Result<Option<crate::api::schema::LabelInfo>> {
376 let schema = self.schema.schema();
377 if schema.labels.contains_key(name) {
378 let count = if let Ok(ds) = self.storage.vertex_dataset(name) {
379 if let Ok(raw) = ds.open_raw().await {
380 raw.count_rows(None)
381 .await
382 .map_err(|e| UniError::Internal(anyhow::anyhow!(e)))?
383 } else {
384 0
385 }
386 } else {
387 0
388 };
389
390 let mut properties = Vec::new();
391 if let Some(props) = schema.properties.get(name) {
392 for (prop_name, prop_meta) in props {
393 let is_indexed = schema.indexes.iter().any(|idx| match idx {
394 uni_common::core::schema::IndexDefinition::Vector(v) => {
395 v.label == name && v.property == *prop_name
396 }
397 uni_common::core::schema::IndexDefinition::Scalar(s) => {
398 s.label == name && s.properties.contains(prop_name)
399 }
400 uni_common::core::schema::IndexDefinition::FullText(f) => {
401 f.label == name && f.properties.contains(prop_name)
402 }
403 uni_common::core::schema::IndexDefinition::Inverted(inv) => {
404 inv.label == name && inv.property == *prop_name
405 }
406 uni_common::core::schema::IndexDefinition::JsonFullText(j) => {
407 j.label == name
408 }
409 _ => false,
410 });
411
412 properties.push(crate::api::schema::PropertyInfo {
413 name: prop_name.clone(),
414 data_type: format!("{:?}", prop_meta.r#type),
415 nullable: prop_meta.nullable,
416 is_indexed,
417 });
418 }
419 }
420
421 let mut indexes = Vec::new();
422 for idx in schema.indexes.iter().filter(|i| i.label() == name) {
423 use uni_common::core::schema::IndexDefinition;
424 let (idx_type, idx_props) = match idx {
425 IndexDefinition::Vector(v) => ("VECTOR", vec![v.property.clone()]),
426 IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
427 IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
428 IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
429 IndexDefinition::JsonFullText(j) => ("JSON_FTS", vec![j.column.clone()]),
430 _ => continue,
431 };
432
433 indexes.push(crate::api::schema::IndexInfo {
434 name: idx.name().to_string(),
435 index_type: idx_type.to_string(),
436 properties: idx_props,
437 status: "ONLINE".to_string(), });
439 }
440
441 let mut constraints = Vec::new();
442 for c in &schema.constraints {
443 if let uni_common::core::schema::ConstraintTarget::Label(l) = &c.target
444 && l == name
445 {
446 let (ctype, cprops) = match &c.constraint_type {
447 uni_common::core::schema::ConstraintType::Unique { properties } => {
448 ("UNIQUE", properties.clone())
449 }
450 uni_common::core::schema::ConstraintType::Exists { property } => {
451 ("EXISTS", vec![property.clone()])
452 }
453 uni_common::core::schema::ConstraintType::Check { expression } => {
454 ("CHECK", vec![expression.clone()])
455 }
456 _ => ("UNKNOWN", vec![]),
457 };
458
459 constraints.push(crate::api::schema::ConstraintInfo {
460 name: c.name.clone(),
461 constraint_type: ctype.to_string(),
462 properties: cprops,
463 enabled: c.enabled,
464 });
465 }
466 }
467
468 Ok(Some(crate::api::schema::LabelInfo {
469 name: name.to_string(),
470 count,
471 properties,
472 indexes,
473 constraints,
474 }))
475 } else {
476 Ok(None)
477 }
478 }
479
480 pub async fn compact_label(
484 &self,
485 label: &str,
486 ) -> Result<uni_store::compaction::CompactionStats> {
487 self.storage
488 .compact_label(label)
489 .await
490 .map_err(UniError::Internal)
491 }
492
493 pub async fn compact_edge_type(
495 &self,
496 edge_type: &str,
497 ) -> Result<uni_store::compaction::CompactionStats> {
498 self.storage
499 .compact_edge_type(edge_type)
500 .await
501 .map_err(UniError::Internal)
502 }
503
504 pub async fn wait_for_compaction(&self) -> Result<()> {
508 self.storage
509 .wait_for_compaction()
510 .await
511 .map_err(UniError::Internal)
512 }
513
514 pub async fn bulk_insert_vertices(
521 &self,
522 label: &str,
523 properties_list: Vec<uni_common::Properties>,
524 ) -> Result<Vec<uni_common::core::id::Vid>> {
525 let schema = self.schema.schema();
526 schema
528 .labels
529 .get(label)
530 .ok_or_else(|| UniError::LabelNotFound {
531 label: label.to_string(),
532 })?;
533 if let Some(writer_lock) = &self.writer {
534 let mut writer = writer_lock.write().await;
535
536 if properties_list.is_empty() {
537 return Ok(Vec::new());
538 }
539
540 let vids = writer
542 .allocate_vids(properties_list.len())
543 .await
544 .map_err(UniError::Internal)?;
545
546 let _props = writer
548 .insert_vertices_batch(vids.clone(), properties_list, vec![label.to_string()])
549 .await
550 .map_err(UniError::Internal)?;
551
552 Ok(vids)
553 } else {
554 Err(UniError::ReadOnly {
555 operation: "bulk_insert_vertices".to_string(),
556 })
557 }
558 }
559
560 pub async fn bulk_insert_edges(
565 &self,
566 edge_type: &str,
567 edges: Vec<(
568 uni_common::core::id::Vid,
569 uni_common::core::id::Vid,
570 uni_common::Properties,
571 )>,
572 ) -> Result<()> {
573 let schema = self.schema.schema();
574 let edge_meta =
575 schema
576 .edge_types
577 .get(edge_type)
578 .ok_or_else(|| UniError::EdgeTypeNotFound {
579 edge_type: edge_type.to_string(),
580 })?;
581 let type_id = edge_meta.id;
582
583 if let Some(writer_lock) = &self.writer {
584 let mut writer = writer_lock.write().await;
585
586 for (src_vid, dst_vid, props) in edges {
587 let eid = writer.next_eid(type_id).await.map_err(UniError::Internal)?;
588 writer
589 .insert_edge(
590 src_vid,
591 dst_vid,
592 type_id,
593 eid,
594 props,
595 Some(edge_type.to_string()),
596 )
597 .await
598 .map_err(UniError::Internal)?;
599 }
600
601 Ok(())
602 } else {
603 Err(UniError::ReadOnly {
604 operation: "bulk_insert_edges".to_string(),
605 })
606 }
607 }
608
609 pub async fn index_rebuild_status(&self) -> Result<Vec<uni_store::storage::IndexRebuildTask>> {
624 let manager = uni_store::storage::IndexRebuildManager::new(
625 self.storage.clone(),
626 self.schema.clone(),
627 self.config.index_rebuild.clone(),
628 )
629 .await
630 .map_err(UniError::Internal)?;
631
632 Ok(manager.status())
633 }
634
635 pub async fn retry_index_rebuilds(&self) -> Result<Vec<String>> {
645 let manager = uni_store::storage::IndexRebuildManager::new(
646 self.storage.clone(),
647 self.schema.clone(),
648 self.config.index_rebuild.clone(),
649 )
650 .await
651 .map_err(UniError::Internal)?;
652
653 let retried = manager.retry_failed().await.map_err(UniError::Internal)?;
654
655 if !retried.is_empty() {
657 let manager = std::sync::Arc::new(manager);
658 let handle = manager.start_background_worker(self.shutdown_handle.subscribe());
659 self.shutdown_handle.track_task(handle);
660 }
661
662 Ok(retried)
663 }
664
665 pub async fn rebuild_indexes(&self, label: &str, async_: bool) -> Result<Option<String>> {
677 if async_ {
678 let manager = uni_store::storage::IndexRebuildManager::new(
679 self.storage.clone(),
680 self.schema.clone(),
681 self.config.index_rebuild.clone(),
682 )
683 .await
684 .map_err(UniError::Internal)?;
685
686 let task_ids = manager
687 .schedule(vec![label.to_string()])
688 .await
689 .map_err(UniError::Internal)?;
690
691 let manager = std::sync::Arc::new(manager);
692 let handle = manager.start_background_worker(self.shutdown_handle.subscribe());
693 self.shutdown_handle.track_task(handle);
694
695 Ok(task_ids.into_iter().next())
696 } else {
697 let idx_mgr = uni_store::storage::IndexManager::new(
698 self.storage.base_path(),
699 self.schema.clone(),
700 self.storage.lancedb_store_arc(),
701 );
702 idx_mgr
703 .rebuild_indexes_for_label(label)
704 .await
705 .map_err(UniError::Internal)?;
706 Ok(None)
707 }
708 }
709
710 pub async fn is_index_building(&self, label: &str) -> Result<bool> {
715 let manager = uni_store::storage::IndexRebuildManager::new(
716 self.storage.clone(),
717 self.schema.clone(),
718 self.config.index_rebuild.clone(),
719 )
720 .await
721 .map_err(UniError::Internal)?;
722
723 Ok(manager.is_index_building(label))
724 }
725
726 pub fn list_indexes(&self, label: &str) -> Vec<uni_common::core::schema::IndexDefinition> {
728 let schema = self.schema.schema();
729 schema
730 .indexes
731 .iter()
732 .filter(|i| i.label() == label)
733 .cloned()
734 .collect()
735 }
736
737 pub fn list_all_indexes(&self) -> Vec<uni_common::core::schema::IndexDefinition> {
739 self.schema.schema().indexes.clone()
740 }
741
742 pub async fn shutdown(self) -> Result<()> {
747 if let Some(ref writer) = self.writer {
749 let mut w = writer.write().await;
750 if let Err(e) = w.flush_to_l1(None).await {
751 tracing::error!("Error flushing during shutdown: {}", e);
752 }
753 }
754
755 self.shutdown_handle
756 .shutdown_async()
757 .await
758 .map_err(UniError::Internal)
759 }
760}
761
762impl Drop for Uni {
763 fn drop(&mut self) {
764 self.shutdown_handle.shutdown_blocking();
765 tracing::debug!("Uni dropped, shutdown signal sent");
766 }
767}
768
769#[must_use = "builders do nothing until .build() is called"]
771pub struct UniBuilder {
772 uri: String,
773 config: UniConfig,
774 schema_file: Option<PathBuf>,
775 xervo_catalog: Option<Vec<ModelAliasSpec>>,
776 hybrid_remote_url: Option<String>,
777 cloud_config: Option<CloudStorageConfig>,
778 create_if_missing: bool,
779 fail_if_exists: bool,
780}
781
782impl UniBuilder {
783 pub fn new(uri: String) -> Self {
785 Self {
786 uri,
787 config: UniConfig::default(),
788 schema_file: None,
789 xervo_catalog: None,
790 hybrid_remote_url: None,
791 cloud_config: None,
792 create_if_missing: true,
793 fail_if_exists: false,
794 }
795 }
796
797 pub fn schema_file(mut self, path: impl AsRef<Path>) -> Self {
799 self.schema_file = Some(path.as_ref().to_path_buf());
800 self
801 }
802
803 pub fn xervo_catalog(mut self, catalog: Vec<ModelAliasSpec>) -> Self {
805 self.xervo_catalog = Some(catalog);
806 self
807 }
808
809 pub fn xervo_catalog_from_str(mut self, json: &str) -> Result<Self> {
811 let catalog = uni_xervo::api::catalog_from_str(json)
812 .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?;
813 self.xervo_catalog = Some(catalog);
814 Ok(self)
815 }
816
817 pub fn xervo_catalog_from_file(mut self, path: impl AsRef<Path>) -> Result<Self> {
819 let catalog = uni_xervo::api::catalog_from_file(path)
820 .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?;
821 self.xervo_catalog = Some(catalog);
822 Ok(self)
823 }
824
825 pub fn hybrid(mut self, local_path: impl AsRef<Path>, remote_url: &str) -> Self {
839 self.uri = local_path.as_ref().to_string_lossy().to_string();
840 self.hybrid_remote_url = Some(remote_url.to_string());
841 self
842 }
843
844 pub fn cloud_config(mut self, config: CloudStorageConfig) -> Self {
871 self.cloud_config = Some(config);
872 self
873 }
874
875 pub fn config(mut self, config: UniConfig) -> Self {
877 self.config = config;
878 self
879 }
880
881 pub fn cache_size(mut self, bytes: usize) -> Self {
883 self.config.cache_size = bytes;
884 self
885 }
886
887 pub fn parallelism(mut self, n: usize) -> Self {
889 self.config.parallelism = n;
890 self
891 }
892
893 pub async fn build(self) -> Result<Uni> {
895 let uri = self.uri.clone();
896 let is_remote_uri = uri.contains("://");
897 let is_hybrid = self.hybrid_remote_url.is_some();
898
899 if is_hybrid && is_remote_uri {
900 return Err(UniError::Internal(anyhow::anyhow!(
901 "Hybrid mode requires a local path as primary URI, found: {}",
902 uri
903 )));
904 }
905
906 let (storage_uri, data_store, local_store_opt) = if is_hybrid {
907 let remote_url = self.hybrid_remote_url.as_ref().unwrap();
908
909 let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
911 build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
912 } else {
913 let url = url::Url::parse(remote_url).map_err(|e| {
914 UniError::Io(std::io::Error::new(
915 std::io::ErrorKind::InvalidInput,
916 e.to_string(),
917 ))
918 })?;
919 let (os, _path) =
920 object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
921 Arc::from(os)
922 };
923
924 let path = PathBuf::from(&uri);
926 if path.exists() {
927 if self.fail_if_exists {
928 return Err(UniError::Internal(anyhow::anyhow!(
929 "Database already exists at {}",
930 uri
931 )));
932 }
933 } else {
934 if !self.create_if_missing {
935 return Err(UniError::NotFound { path: path.clone() });
936 }
937 std::fs::create_dir_all(&path).map_err(UniError::Io)?;
938 }
939
940 let local_store = Arc::new(
941 LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
942 );
943
944 (
947 remote_url.clone(),
948 remote_store,
949 Some(local_store as Arc<dyn ObjectStore>),
950 )
951 } else if is_remote_uri {
952 let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
954 build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
955 } else {
956 let url = url::Url::parse(&uri).map_err(|e| {
957 UniError::Io(std::io::Error::new(
958 std::io::ErrorKind::InvalidInput,
959 e.to_string(),
960 ))
961 })?;
962 let (os, _path) =
963 object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
964 Arc::from(os)
965 };
966
967 (uri.clone(), remote_store, None)
968 } else {
969 let path = PathBuf::from(&uri);
971 let storage_path = path.join("storage");
972
973 if path.exists() {
974 if self.fail_if_exists {
975 return Err(UniError::Internal(anyhow::anyhow!(
976 "Database already exists at {}",
977 uri
978 )));
979 }
980 } else {
981 if !self.create_if_missing {
982 return Err(UniError::NotFound { path: path.clone() });
983 }
984 std::fs::create_dir_all(&path).map_err(UniError::Io)?;
985 }
986
987 if !storage_path.exists() {
989 std::fs::create_dir_all(&storage_path).map_err(UniError::Io)?;
990 }
991
992 let store = Arc::new(
993 LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
994 );
995 (
996 storage_path.to_string_lossy().to_string(),
997 store.clone() as Arc<dyn ObjectStore>,
998 Some(store as Arc<dyn ObjectStore>),
999 )
1000 };
1001
1002 let schema_obj_path = object_store::path::Path::from("catalog/schema.json");
1004 let legacy_schema_obj_path = object_store::path::Path::from("schema.json");
1006
1007 let has_catalog_schema = match data_store.get(&schema_obj_path).await {
1011 Ok(_) => true,
1012 Err(object_store::Error::NotFound { .. }) => false,
1013 Err(e) => return Err(UniError::Internal(e.into())),
1014 };
1015 if !has_catalog_schema {
1016 match data_store.get(&legacy_schema_obj_path).await {
1017 Ok(result) => {
1018 let bytes = result
1019 .bytes()
1020 .await
1021 .map_err(|e| UniError::Internal(e.into()))?;
1022 data_store
1023 .put(&schema_obj_path, bytes.into())
1024 .await
1025 .map_err(|e| UniError::Internal(e.into()))?;
1026 info!(
1027 legacy = %legacy_schema_obj_path,
1028 target = %schema_obj_path,
1029 "Migrated legacy schema path to catalog path"
1030 );
1031 }
1032 Err(object_store::Error::NotFound { .. }) => {}
1033 Err(e) => return Err(UniError::Internal(e.into())),
1034 }
1035 }
1036
1037 let schema_manager = Arc::new(
1040 SchemaManager::load_from_store(data_store.clone(), &schema_obj_path)
1041 .await
1042 .map_err(UniError::Internal)?,
1043 );
1044
1045 let lancedb_storage_options = self
1046 .cloud_config
1047 .as_ref()
1048 .map(Self::cloud_config_to_lancedb_storage_options);
1049
1050 let storage = if is_hybrid || is_remote_uri {
1051 StorageManager::new_with_store_and_storage_options(
1054 &storage_uri,
1055 data_store.clone(),
1056 schema_manager.clone(),
1057 self.config.clone(),
1058 lancedb_storage_options.clone(),
1059 )
1060 .await
1061 .map_err(UniError::Internal)?
1062 } else {
1063 StorageManager::new_with_config(
1065 &storage_uri,
1066 schema_manager.clone(),
1067 self.config.clone(),
1068 )
1069 .await
1070 .map_err(UniError::Internal)?
1071 };
1072
1073 let storage = Arc::new(storage);
1074
1075 let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
1077
1078 let compaction_handle = storage
1080 .clone()
1081 .start_background_compaction(shutdown_handle.subscribe());
1082 shutdown_handle.track_task(compaction_handle);
1083
1084 let prop_cache_capacity = self.config.cache_size / 1024;
1086
1087 let prop_manager = Arc::new(PropertyManager::new(
1088 storage.clone(),
1089 schema_manager.clone(),
1090 prop_cache_capacity,
1091 ));
1092
1093 let id_store = local_store_opt
1095 .clone()
1096 .unwrap_or_else(|| data_store.clone());
1097 let wal_store = local_store_opt
1098 .clone()
1099 .unwrap_or_else(|| data_store.clone());
1100
1101 let latest_snapshot = storage
1104 .snapshot_manager()
1105 .load_latest_snapshot()
1106 .await
1107 .map_err(UniError::Internal)?;
1108
1109 let (start_version, wal_high_water_mark) = if let Some(ref snapshot) = latest_snapshot {
1110 (
1111 snapshot.version_high_water_mark + 1,
1112 snapshot.wal_high_water_mark,
1113 )
1114 } else {
1115 let has_manifests = storage
1117 .snapshot_manager()
1118 .has_any_manifests()
1119 .await
1120 .unwrap_or(false);
1121
1122 let wal_check =
1123 WriteAheadLog::new(wal_store.clone(), object_store::path::Path::from("wal"));
1124 let has_wal = wal_check.has_segments().await.unwrap_or(false);
1125
1126 if has_manifests {
1127 let snapshot_ids = storage
1129 .snapshot_manager()
1130 .list_snapshots()
1131 .await
1132 .map_err(UniError::Internal)?;
1133 if let Some(last_id) = snapshot_ids.last() {
1134 let manifest = storage
1135 .snapshot_manager()
1136 .load_snapshot(last_id)
1137 .await
1138 .map_err(UniError::Internal)?;
1139 tracing::warn!(
1140 "Latest snapshot pointer missing but found manifest '{}'. \
1141 Recovering version {}.",
1142 last_id,
1143 manifest.version_high_water_mark
1144 );
1145 (
1146 manifest.version_high_water_mark + 1,
1147 manifest.wal_high_water_mark,
1148 )
1149 } else {
1150 return Err(UniError::Internal(anyhow::anyhow!(
1151 "Snapshot manifests directory exists but contains no valid manifests. \
1152 Possible data corruption."
1153 )));
1154 }
1155 } else if has_wal {
1156 return Err(UniError::Internal(anyhow::anyhow!(
1158 "Database has WAL segments but no snapshot manifest. \
1159 Cannot safely determine version counter -- starting at 0 would cause \
1160 version conflicts and data corruption. \
1161 Restore the snapshot manifest or delete WAL to start fresh."
1162 )));
1163 } else {
1164 (0, 0)
1166 }
1167 };
1168
1169 let allocator = Arc::new(
1170 IdAllocator::new(
1171 id_store,
1172 object_store::path::Path::from("id_allocator.json"),
1173 1000,
1174 )
1175 .await
1176 .map_err(UniError::Internal)?,
1177 );
1178
1179 let wal = if !self.config.wal_enabled {
1180 None
1182 } else if is_remote_uri && !is_hybrid {
1183 Some(Arc::new(WriteAheadLog::new(
1185 wal_store,
1186 object_store::path::Path::from("wal"),
1187 )))
1188 } else if is_hybrid || !is_remote_uri {
1189 Some(Arc::new(WriteAheadLog::new(
1192 wal_store,
1193 object_store::path::Path::from("wal"),
1194 )))
1195 } else {
1196 None
1197 };
1198
1199 let writer = Arc::new(RwLock::new(
1200 Writer::new_with_config(
1201 storage.clone(),
1202 schema_manager.clone(),
1203 start_version,
1204 self.config.clone(),
1205 wal,
1206 Some(allocator),
1207 )
1208 .await
1209 .map_err(UniError::Internal)?,
1210 ));
1211
1212 let required_embed_aliases: std::collections::BTreeSet<String> = schema_manager
1213 .schema()
1214 .indexes
1215 .iter()
1216 .filter_map(|idx| {
1217 if let uni_common::core::schema::IndexDefinition::Vector(cfg) = idx {
1218 cfg.embedding_config.as_ref().map(|emb| emb.alias.clone())
1219 } else {
1220 None
1221 }
1222 })
1223 .collect();
1224
1225 if !required_embed_aliases.is_empty() && self.xervo_catalog.is_none() {
1226 return Err(UniError::Internal(anyhow::anyhow!(
1227 "Uni-Xervo catalog is required because schema has vector indexes with embedding aliases"
1228 )));
1229 }
1230
1231 let xervo_runtime = if let Some(catalog) = self.xervo_catalog {
1232 for alias in &required_embed_aliases {
1233 let spec = catalog.iter().find(|s| &s.alias == alias).ok_or_else(|| {
1234 UniError::Internal(anyhow::anyhow!(
1235 "Missing Uni-Xervo alias '{}' referenced by vector index embedding config",
1236 alias
1237 ))
1238 })?;
1239 if spec.task != ModelTask::Embed {
1240 return Err(UniError::Internal(anyhow::anyhow!(
1241 "Uni-Xervo alias '{}' must be an embed task",
1242 alias
1243 )));
1244 }
1245 }
1246
1247 let mut runtime_builder = ModelRuntime::builder().catalog(catalog);
1248 #[cfg(feature = "provider-candle")]
1249 {
1250 runtime_builder = runtime_builder
1251 .register_provider(uni_xervo::provider::LocalCandleProvider::new());
1252 }
1253 #[cfg(feature = "provider-fastembed")]
1254 {
1255 runtime_builder = runtime_builder
1256 .register_provider(uni_xervo::provider::LocalFastEmbedProvider::new());
1257 }
1258 #[cfg(feature = "provider-openai")]
1259 {
1260 runtime_builder = runtime_builder
1261 .register_provider(uni_xervo::provider::RemoteOpenAIProvider::new());
1262 }
1263 #[cfg(feature = "provider-gemini")]
1264 {
1265 runtime_builder = runtime_builder
1266 .register_provider(uni_xervo::provider::RemoteGeminiProvider::new());
1267 }
1268 #[cfg(feature = "provider-vertexai")]
1269 {
1270 runtime_builder = runtime_builder
1271 .register_provider(uni_xervo::provider::RemoteVertexAIProvider::new());
1272 }
1273 #[cfg(feature = "provider-mistral")]
1274 {
1275 runtime_builder = runtime_builder
1276 .register_provider(uni_xervo::provider::RemoteMistralProvider::new());
1277 }
1278 #[cfg(feature = "provider-anthropic")]
1279 {
1280 runtime_builder = runtime_builder
1281 .register_provider(uni_xervo::provider::RemoteAnthropicProvider::new());
1282 }
1283 #[cfg(feature = "provider-voyageai")]
1284 {
1285 runtime_builder = runtime_builder
1286 .register_provider(uni_xervo::provider::RemoteVoyageAIProvider::new());
1287 }
1288 #[cfg(feature = "provider-cohere")]
1289 {
1290 runtime_builder = runtime_builder
1291 .register_provider(uni_xervo::provider::RemoteCohereProvider::new());
1292 }
1293 #[cfg(feature = "provider-azure-openai")]
1294 {
1295 runtime_builder = runtime_builder
1296 .register_provider(uni_xervo::provider::RemoteAzureOpenAIProvider::new());
1297 }
1298 #[cfg(feature = "provider-mistralrs")]
1299 {
1300 runtime_builder = runtime_builder
1301 .register_provider(uni_xervo::provider::LocalMistralRsProvider::new());
1302 }
1303
1304 Some(
1305 runtime_builder
1306 .build()
1307 .await
1308 .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?,
1309 )
1310 } else {
1311 None
1312 };
1313
1314 if let Some(ref runtime) = xervo_runtime {
1315 let mut writer_guard = writer.write().await;
1316 writer_guard.set_xervo_runtime(runtime.clone());
1317 }
1318
1319 {
1322 let w = writer.read().await;
1323 let replayed = w
1324 .replay_wal(wal_high_water_mark)
1325 .await
1326 .map_err(UniError::Internal)?;
1327 if replayed > 0 {
1328 info!("WAL recovery: replayed {} mutations", replayed);
1329 }
1330 }
1331
1332 if self.config.index_rebuild.auto_rebuild_enabled {
1334 let rebuild_manager = Arc::new(
1335 uni_store::storage::IndexRebuildManager::new(
1336 storage.clone(),
1337 schema_manager.clone(),
1338 self.config.index_rebuild.clone(),
1339 )
1340 .await
1341 .map_err(UniError::Internal)?,
1342 );
1343
1344 let handle = rebuild_manager
1345 .clone()
1346 .start_background_worker(shutdown_handle.subscribe());
1347 shutdown_handle.track_task(handle);
1348
1349 {
1350 let mut writer_guard = writer.write().await;
1351 writer_guard.set_index_rebuild_manager(rebuild_manager);
1352 }
1353 }
1354
1355 if let Some(interval) = self.config.auto_flush_interval {
1357 let writer_clone = writer.clone();
1358 let mut shutdown_rx = shutdown_handle.subscribe();
1359
1360 let handle = tokio::spawn(async move {
1361 let mut ticker = tokio::time::interval(interval);
1362 loop {
1363 tokio::select! {
1364 _ = ticker.tick() => {
1365 let mut w = writer_clone.write().await;
1366 if let Err(e) = w.check_flush().await {
1367 tracing::warn!("Background flush check failed: {}", e);
1368 }
1369 }
1370 _ = shutdown_rx.recv() => {
1371 tracing::info!("Auto-flush shutting down, performing final flush");
1372 let mut w = writer_clone.write().await;
1373 let _ = w.flush_to_l1(None).await;
1374 break;
1375 }
1376 }
1377 }
1378 });
1379
1380 shutdown_handle.track_task(handle);
1381 }
1382
1383 Ok(Uni {
1384 storage,
1385 schema: schema_manager,
1386 properties: prop_manager,
1387 writer: Some(writer),
1388 xervo_runtime,
1389 config: self.config,
1390 procedure_registry: Arc::new(uni_query::ProcedureRegistry::new()),
1391 shutdown_handle,
1392 locy_rule_registry: Arc::new(std::sync::RwLock::new(
1393 impl_locy::LocyRuleRegistry::default(),
1394 )),
1395 })
1396 }
1397
1398 pub fn build_sync(self) -> Result<Uni> {
1400 let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
1401 rt.block_on(self.build())
1402 }
1403
1404 fn cloud_config_to_lancedb_storage_options(
1405 config: &CloudStorageConfig,
1406 ) -> std::collections::HashMap<String, String> {
1407 let mut opts = std::collections::HashMap::new();
1408
1409 match config {
1410 CloudStorageConfig::S3 {
1411 bucket,
1412 region,
1413 endpoint,
1414 access_key_id,
1415 secret_access_key,
1416 session_token,
1417 virtual_hosted_style,
1418 } => {
1419 opts.insert("bucket".to_string(), bucket.clone());
1420 opts.insert(
1421 "virtual_hosted_style_request".to_string(),
1422 virtual_hosted_style.to_string(),
1423 );
1424
1425 if let Some(r) = region {
1426 opts.insert("region".to_string(), r.clone());
1427 }
1428 if let Some(ep) = endpoint {
1429 opts.insert("endpoint".to_string(), ep.clone());
1430 if ep.starts_with("http://") {
1431 opts.insert("allow_http".to_string(), "true".to_string());
1432 }
1433 }
1434 if let Some(v) = access_key_id {
1435 opts.insert("access_key_id".to_string(), v.clone());
1436 }
1437 if let Some(v) = secret_access_key {
1438 opts.insert("secret_access_key".to_string(), v.clone());
1439 }
1440 if let Some(v) = session_token {
1441 opts.insert("session_token".to_string(), v.clone());
1442 }
1443 }
1444 CloudStorageConfig::Gcs {
1445 bucket,
1446 service_account_path,
1447 service_account_key,
1448 } => {
1449 opts.insert("bucket".to_string(), bucket.clone());
1450 if let Some(v) = service_account_path {
1451 opts.insert("service_account".to_string(), v.clone());
1452 opts.insert("application_credentials".to_string(), v.clone());
1453 }
1454 if let Some(v) = service_account_key {
1455 opts.insert("service_account_key".to_string(), v.clone());
1456 }
1457 }
1458 CloudStorageConfig::Azure {
1459 container,
1460 account,
1461 access_key,
1462 sas_token,
1463 } => {
1464 opts.insert("account_name".to_string(), account.clone());
1465 opts.insert("container_name".to_string(), container.clone());
1466 if let Some(v) = access_key {
1467 opts.insert("access_key".to_string(), v.clone());
1468 }
1469 if let Some(v) = sas_token {
1470 opts.insert("sas_token".to_string(), v.clone());
1471 }
1472 }
1473 }
1474
1475 opts
1476 }
1477}