1use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::time::Duration;
7
8pub mod builder;
9pub mod bulk;
10pub mod impl_locy;
11pub mod impl_query;
12pub mod query_builder;
13pub mod schema;
14pub mod session;
15pub mod sync;
16pub mod transaction;
17pub mod xervo;
18
19use object_store::ObjectStore;
20use object_store::local::LocalFileSystem;
21use tracing::info;
22use uni_common::core::snapshot::SnapshotManifest;
23use uni_common::{CloudStorageConfig, UniConfig};
24use uni_common::{Result, UniError};
25use uni_store::cloud::build_cloud_store;
26use uni_xervo::api::{ModelAliasSpec, ModelTask};
27use uni_xervo::runtime::ModelRuntime;
28
29use uni_common::core::schema::SchemaManager;
30use uni_store::runtime::id_allocator::IdAllocator;
31use uni_store::runtime::property_manager::PropertyManager;
32use uni_store::runtime::wal::WriteAheadLog;
33use uni_store::storage::manager::StorageManager;
34
35use tokio::sync::RwLock;
36use uni_store::runtime::writer::Writer;
37
38use crate::shutdown::ShutdownHandle;
39
40pub struct Uni {
81 pub(crate) storage: Arc<StorageManager>,
82 pub(crate) schema: Arc<SchemaManager>,
83 pub(crate) properties: Arc<PropertyManager>,
84 pub(crate) writer: Option<Arc<RwLock<Writer>>>,
85 pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
86 pub(crate) config: UniConfig,
87 pub(crate) procedure_registry: Arc<uni_query::ProcedureRegistry>,
88 pub(crate) shutdown_handle: Arc<ShutdownHandle>,
89}
90
91impl Uni {
92 pub fn open(uri: impl Into<String>) -> UniBuilder {
104 UniBuilder::new(uri.into())
105 }
106
107 pub fn open_existing(uri: impl Into<String>) -> UniBuilder {
109 let mut builder = UniBuilder::new(uri.into());
110 builder.create_if_missing = false;
111 builder
112 }
113
114 pub fn create(uri: impl Into<String>) -> UniBuilder {
116 let mut builder = UniBuilder::new(uri.into());
117 builder.fail_if_exists = true;
118 builder
119 }
120
121 pub fn temporary() -> UniBuilder {
126 let temp_dir = std::env::temp_dir().join(format!("uni_mem_{}", uuid::Uuid::new_v4()));
127 UniBuilder::new(temp_dir.to_string_lossy().to_string())
128 }
129
130 pub fn in_memory() -> UniBuilder {
132 Self::temporary()
133 }
134
135 pub(crate) async fn at_snapshot(&self, snapshot_id: &str) -> Result<Uni> {
140 let manifest = self
141 .storage
142 .snapshot_manager()
143 .load_snapshot(snapshot_id)
144 .await
145 .map_err(UniError::Internal)?;
146
147 let pinned_storage = Arc::new(self.storage.pinned(manifest));
148
149 let prop_manager = Arc::new(PropertyManager::new(
150 pinned_storage.clone(),
151 self.schema.clone(),
152 self.properties.cache_size(),
153 ));
154
155 let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
158
159 Ok(Uni {
160 storage: pinned_storage,
161 schema: self.schema.clone(),
162 properties: prop_manager,
163 writer: None,
164 xervo_runtime: self.xervo_runtime.clone(),
165 config: self.config.clone(),
166 procedure_registry: self.procedure_registry.clone(),
167 shutdown_handle,
168 })
169 }
170
171 pub fn config(&self) -> &UniConfig {
173 &self.config
174 }
175
176 pub fn procedure_registry(&self) -> &Arc<uni_query::ProcedureRegistry> {
178 &self.procedure_registry
179 }
180
181 pub fn get_schema(&self) -> Arc<uni_common::core::schema::Schema> {
183 self.schema.schema()
184 }
185
186 pub fn bulk_writer(&self) -> bulk::BulkWriterBuilder<'_> {
188 bulk::BulkWriterBuilder::new(self)
189 }
190
191 pub fn session(&self) -> session::SessionBuilder<'_> {
193 session::SessionBuilder::new(self)
194 }
195
196 #[doc(hidden)]
198 pub fn schema_manager(&self) -> Arc<SchemaManager> {
199 self.schema.clone()
200 }
201
202 #[doc(hidden)]
203 pub fn writer(&self) -> Option<Arc<RwLock<Writer>>> {
204 self.writer.clone()
205 }
206
207 #[doc(hidden)]
208 pub fn storage(&self) -> Arc<StorageManager> {
209 self.storage.clone()
210 }
211
212 pub async fn flush(&self) -> Result<()> {
217 if let Some(writer_lock) = &self.writer {
218 let mut writer = writer_lock.write().await;
219 writer
220 .flush_to_l1(None)
221 .await
222 .map(|_| ())
223 .map_err(UniError::Internal)
224 } else {
225 Err(UniError::ReadOnly {
226 operation: "flush".to_string(),
227 })
228 }
229 }
230
231 pub async fn create_snapshot(&self, name: Option<&str>) -> Result<String> {
236 if let Some(writer_lock) = &self.writer {
237 let mut writer = writer_lock.write().await;
238 writer
239 .flush_to_l1(name.map(|s| s.to_string()))
240 .await
241 .map_err(UniError::Internal)
242 } else {
243 Err(UniError::ReadOnly {
244 operation: "create_snapshot".to_string(),
245 })
246 }
247 }
248
249 pub async fn create_named_snapshot(&self, name: &str) -> Result<String> {
251 if name.is_empty() {
252 return Err(UniError::Internal(anyhow::anyhow!(
253 "Snapshot name cannot be empty"
254 )));
255 }
256
257 let snapshot_id = self.create_snapshot(Some(name)).await?;
258
259 self.storage
260 .snapshot_manager()
261 .save_named_snapshot(name, &snapshot_id)
262 .await
263 .map_err(UniError::Internal)?;
264
265 Ok(snapshot_id)
266 }
267
268 pub async fn list_snapshots(&self) -> Result<Vec<SnapshotManifest>> {
270 let sm = self.storage.snapshot_manager();
271 let ids = sm.list_snapshots().await.map_err(UniError::Internal)?;
272 let mut manifests = Vec::new();
273 for id in ids {
274 if let Ok(m) = sm.load_snapshot(&id).await {
275 manifests.push(m);
276 }
277 }
278 Ok(manifests)
279 }
280
281 pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
286 self.storage
287 .snapshot_manager()
288 .set_latest_snapshot(snapshot_id)
289 .await
290 .map_err(UniError::Internal)
291 }
292
293 pub async fn label_exists(&self, name: &str) -> Result<bool> {
295 Ok(self.schema.schema().labels.get(name).is_some_and(|l| {
296 matches!(
297 l.state,
298 uni_common::core::schema::SchemaElementState::Active
299 )
300 }))
301 }
302
303 pub async fn edge_type_exists(&self, name: &str) -> Result<bool> {
305 Ok(self.schema.schema().edge_types.get(name).is_some_and(|e| {
306 matches!(
307 e.state,
308 uni_common::core::schema::SchemaElementState::Active
309 )
310 }))
311 }
312
313 pub async fn list_labels(&self) -> Result<Vec<String>> {
319 let mut all_labels = std::collections::HashSet::new();
320
321 for (name, label) in self.schema.schema().labels.iter() {
323 if matches!(
324 label.state,
325 uni_common::core::schema::SchemaElementState::Active
326 ) {
327 all_labels.insert(name.clone());
328 }
329 }
330
331 let query = "MATCH (n) RETURN DISTINCT labels(n) AS labels";
333 let result = self.query(query).await?;
334 for row in &result.rows {
335 if let Ok(labels_list) = row.get::<Vec<String>>("labels") {
336 for label in labels_list {
337 all_labels.insert(label);
338 }
339 }
340 }
341
342 Ok(all_labels.into_iter().collect())
343 }
344
345 pub async fn list_edge_types(&self) -> Result<Vec<String>> {
347 Ok(self
348 .schema
349 .schema()
350 .edge_types
351 .iter()
352 .filter(|(_, e)| {
353 matches!(
354 e.state,
355 uni_common::core::schema::SchemaElementState::Active
356 )
357 })
358 .map(|(name, _)| name.clone())
359 .collect())
360 }
361
362 pub async fn get_label_info(
364 &self,
365 name: &str,
366 ) -> Result<Option<crate::api::schema::LabelInfo>> {
367 let schema = self.schema.schema();
368 if schema.labels.contains_key(name) {
369 let count = if let Ok(ds) = self.storage.vertex_dataset(name) {
370 if let Ok(raw) = ds.open_raw().await {
371 raw.count_rows(None)
372 .await
373 .map_err(|e| UniError::Internal(anyhow::anyhow!(e)))?
374 } else {
375 0
376 }
377 } else {
378 0
379 };
380
381 let mut properties = Vec::new();
382 if let Some(props) = schema.properties.get(name) {
383 for (prop_name, prop_meta) in props {
384 let is_indexed = schema.indexes.iter().any(|idx| match idx {
385 uni_common::core::schema::IndexDefinition::Vector(v) => {
386 v.label == name && v.property == *prop_name
387 }
388 uni_common::core::schema::IndexDefinition::Scalar(s) => {
389 s.label == name && s.properties.contains(prop_name)
390 }
391 uni_common::core::schema::IndexDefinition::FullText(f) => {
392 f.label == name && f.properties.contains(prop_name)
393 }
394 uni_common::core::schema::IndexDefinition::Inverted(inv) => {
395 inv.label == name && inv.property == *prop_name
396 }
397 uni_common::core::schema::IndexDefinition::JsonFullText(j) => {
398 j.label == name
399 }
400 _ => false,
401 });
402
403 properties.push(crate::api::schema::PropertyInfo {
404 name: prop_name.clone(),
405 data_type: format!("{:?}", prop_meta.r#type),
406 nullable: prop_meta.nullable,
407 is_indexed,
408 });
409 }
410 }
411
412 let mut indexes = Vec::new();
413 for idx in schema.indexes.iter().filter(|i| i.label() == name) {
414 use uni_common::core::schema::IndexDefinition;
415 let (idx_type, idx_props) = match idx {
416 IndexDefinition::Vector(v) => ("VECTOR", vec![v.property.clone()]),
417 IndexDefinition::Scalar(s) => ("SCALAR", s.properties.clone()),
418 IndexDefinition::FullText(f) => ("FULLTEXT", f.properties.clone()),
419 IndexDefinition::Inverted(inv) => ("INVERTED", vec![inv.property.clone()]),
420 IndexDefinition::JsonFullText(j) => ("JSON_FTS", vec![j.column.clone()]),
421 _ => continue,
422 };
423
424 indexes.push(crate::api::schema::IndexInfo {
425 name: idx.name().to_string(),
426 index_type: idx_type.to_string(),
427 properties: idx_props,
428 status: "ONLINE".to_string(), });
430 }
431
432 let mut constraints = Vec::new();
433 for c in &schema.constraints {
434 if let uni_common::core::schema::ConstraintTarget::Label(l) = &c.target
435 && l == name
436 {
437 let (ctype, cprops) = match &c.constraint_type {
438 uni_common::core::schema::ConstraintType::Unique { properties } => {
439 ("UNIQUE", properties.clone())
440 }
441 uni_common::core::schema::ConstraintType::Exists { property } => {
442 ("EXISTS", vec![property.clone()])
443 }
444 uni_common::core::schema::ConstraintType::Check { expression } => {
445 ("CHECK", vec![expression.clone()])
446 }
447 _ => ("UNKNOWN", vec![]),
448 };
449
450 constraints.push(crate::api::schema::ConstraintInfo {
451 name: c.name.clone(),
452 constraint_type: ctype.to_string(),
453 properties: cprops,
454 enabled: c.enabled,
455 });
456 }
457 }
458
459 Ok(Some(crate::api::schema::LabelInfo {
460 name: name.to_string(),
461 count,
462 properties,
463 indexes,
464 constraints,
465 }))
466 } else {
467 Ok(None)
468 }
469 }
470
471 pub async fn compact_label(
475 &self,
476 label: &str,
477 ) -> Result<uni_store::compaction::CompactionStats> {
478 self.storage
479 .compact_label(label)
480 .await
481 .map_err(UniError::Internal)
482 }
483
484 pub async fn compact_edge_type(
486 &self,
487 edge_type: &str,
488 ) -> Result<uni_store::compaction::CompactionStats> {
489 self.storage
490 .compact_edge_type(edge_type)
491 .await
492 .map_err(UniError::Internal)
493 }
494
495 pub async fn wait_for_compaction(&self) -> Result<()> {
499 self.storage
500 .wait_for_compaction()
501 .await
502 .map_err(UniError::Internal)
503 }
504
505 pub async fn bulk_insert_vertices(
512 &self,
513 label: &str,
514 properties_list: Vec<uni_common::Properties>,
515 ) -> Result<Vec<uni_common::core::id::Vid>> {
516 let schema = self.schema.schema();
517 schema
519 .labels
520 .get(label)
521 .ok_or_else(|| UniError::LabelNotFound {
522 label: label.to_string(),
523 })?;
524 if let Some(writer_lock) = &self.writer {
525 let mut writer = writer_lock.write().await;
526
527 if properties_list.is_empty() {
528 return Ok(Vec::new());
529 }
530
531 let vids = writer
533 .allocate_vids(properties_list.len())
534 .await
535 .map_err(UniError::Internal)?;
536
537 let _props = writer
539 .insert_vertices_batch(vids.clone(), properties_list, vec![label.to_string()])
540 .await
541 .map_err(UniError::Internal)?;
542
543 Ok(vids)
544 } else {
545 Err(UniError::ReadOnly {
546 operation: "bulk_insert_vertices".to_string(),
547 })
548 }
549 }
550
551 pub async fn bulk_insert_edges(
556 &self,
557 edge_type: &str,
558 edges: Vec<(
559 uni_common::core::id::Vid,
560 uni_common::core::id::Vid,
561 uni_common::Properties,
562 )>,
563 ) -> Result<()> {
564 let schema = self.schema.schema();
565 let edge_meta =
566 schema
567 .edge_types
568 .get(edge_type)
569 .ok_or_else(|| UniError::EdgeTypeNotFound {
570 edge_type: edge_type.to_string(),
571 })?;
572 let type_id = edge_meta.id;
573
574 if let Some(writer_lock) = &self.writer {
575 let mut writer = writer_lock.write().await;
576
577 for (src_vid, dst_vid, props) in edges {
578 let eid = writer.next_eid(type_id).await.map_err(UniError::Internal)?;
579 writer
580 .insert_edge(
581 src_vid,
582 dst_vid,
583 type_id,
584 eid,
585 props,
586 Some(edge_type.to_string()),
587 )
588 .await
589 .map_err(UniError::Internal)?;
590 }
591
592 Ok(())
593 } else {
594 Err(UniError::ReadOnly {
595 operation: "bulk_insert_edges".to_string(),
596 })
597 }
598 }
599
600 pub async fn index_rebuild_status(&self) -> Result<Vec<uni_store::storage::IndexRebuildTask>> {
615 let manager = uni_store::storage::IndexRebuildManager::new(
616 self.storage.clone(),
617 self.schema.clone(),
618 self.config.index_rebuild.clone(),
619 )
620 .await
621 .map_err(UniError::Internal)?;
622
623 Ok(manager.status())
624 }
625
626 pub async fn retry_index_rebuilds(&self) -> Result<Vec<String>> {
636 let manager = uni_store::storage::IndexRebuildManager::new(
637 self.storage.clone(),
638 self.schema.clone(),
639 self.config.index_rebuild.clone(),
640 )
641 .await
642 .map_err(UniError::Internal)?;
643
644 let retried = manager.retry_failed().await.map_err(UniError::Internal)?;
645
646 if !retried.is_empty() {
648 let manager = std::sync::Arc::new(manager);
649 let handle = manager.start_background_worker(self.shutdown_handle.subscribe());
650 self.shutdown_handle.track_task(handle);
651 }
652
653 Ok(retried)
654 }
655
656 pub async fn rebuild_indexes(&self, label: &str, async_: bool) -> Result<Option<String>> {
668 if async_ {
669 let manager = uni_store::storage::IndexRebuildManager::new(
670 self.storage.clone(),
671 self.schema.clone(),
672 self.config.index_rebuild.clone(),
673 )
674 .await
675 .map_err(UniError::Internal)?;
676
677 let task_ids = manager
678 .schedule(vec![label.to_string()])
679 .await
680 .map_err(UniError::Internal)?;
681
682 let manager = std::sync::Arc::new(manager);
683 let handle = manager.start_background_worker(self.shutdown_handle.subscribe());
684 self.shutdown_handle.track_task(handle);
685
686 Ok(task_ids.into_iter().next())
687 } else {
688 let idx_mgr = uni_store::storage::IndexManager::new(
689 self.storage.base_path(),
690 self.schema.clone(),
691 self.storage.lancedb_store_arc(),
692 );
693 idx_mgr
694 .rebuild_indexes_for_label(label)
695 .await
696 .map_err(UniError::Internal)?;
697 Ok(None)
698 }
699 }
700
701 pub async fn is_index_building(&self, label: &str) -> Result<bool> {
706 let manager = uni_store::storage::IndexRebuildManager::new(
707 self.storage.clone(),
708 self.schema.clone(),
709 self.config.index_rebuild.clone(),
710 )
711 .await
712 .map_err(UniError::Internal)?;
713
714 Ok(manager.is_index_building(label))
715 }
716
717 pub async fn shutdown(self) -> Result<()> {
722 if let Some(ref writer) = self.writer {
724 let mut w = writer.write().await;
725 if let Err(e) = w.flush_to_l1(None).await {
726 tracing::error!("Error flushing during shutdown: {}", e);
727 }
728 }
729
730 self.shutdown_handle
731 .shutdown_async()
732 .await
733 .map_err(UniError::Internal)
734 }
735}
736
737impl Drop for Uni {
738 fn drop(&mut self) {
739 self.shutdown_handle.shutdown_blocking();
740 tracing::debug!("Uni dropped, shutdown signal sent");
741 }
742}
743
744#[must_use = "builders do nothing until .build() is called"]
746pub struct UniBuilder {
747 uri: String,
748 config: UniConfig,
749 schema_file: Option<PathBuf>,
750 xervo_catalog: Option<Vec<ModelAliasSpec>>,
751 hybrid_remote_url: Option<String>,
752 cloud_config: Option<CloudStorageConfig>,
753 create_if_missing: bool,
754 fail_if_exists: bool,
755}
756
757impl UniBuilder {
758 pub fn new(uri: String) -> Self {
760 Self {
761 uri,
762 config: UniConfig::default(),
763 schema_file: None,
764 xervo_catalog: None,
765 hybrid_remote_url: None,
766 cloud_config: None,
767 create_if_missing: true,
768 fail_if_exists: false,
769 }
770 }
771
772 pub fn schema_file(mut self, path: impl AsRef<Path>) -> Self {
774 self.schema_file = Some(path.as_ref().to_path_buf());
775 self
776 }
777
778 pub fn xervo_catalog(mut self, catalog: Vec<ModelAliasSpec>) -> Self {
780 self.xervo_catalog = Some(catalog);
781 self
782 }
783
784 pub fn xervo_catalog_from_str(mut self, json: &str) -> Result<Self> {
786 let catalog = uni_xervo::api::catalog_from_str(json)
787 .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?;
788 self.xervo_catalog = Some(catalog);
789 Ok(self)
790 }
791
792 pub fn xervo_catalog_from_file(mut self, path: impl AsRef<Path>) -> Result<Self> {
794 let catalog = uni_xervo::api::catalog_from_file(path)
795 .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?;
796 self.xervo_catalog = Some(catalog);
797 Ok(self)
798 }
799
800 pub fn hybrid(mut self, local_path: impl AsRef<Path>, remote_url: &str) -> Self {
814 self.uri = local_path.as_ref().to_string_lossy().to_string();
815 self.hybrid_remote_url = Some(remote_url.to_string());
816 self
817 }
818
819 pub fn cloud_config(mut self, config: CloudStorageConfig) -> Self {
846 self.cloud_config = Some(config);
847 self
848 }
849
850 pub fn config(mut self, config: UniConfig) -> Self {
852 self.config = config;
853 self
854 }
855
856 pub fn cache_size(mut self, bytes: usize) -> Self {
858 self.config.cache_size = bytes;
859 self
860 }
861
862 pub fn parallelism(mut self, n: usize) -> Self {
864 self.config.parallelism = n;
865 self
866 }
867
868 pub async fn build(self) -> Result<Uni> {
870 let uri = self.uri.clone();
871 let is_remote_uri = uri.contains("://");
872 let is_hybrid = self.hybrid_remote_url.is_some();
873
874 if is_hybrid && is_remote_uri {
875 return Err(UniError::Internal(anyhow::anyhow!(
876 "Hybrid mode requires a local path as primary URI, found: {}",
877 uri
878 )));
879 }
880
881 let (storage_uri, data_store, local_store_opt) = if is_hybrid {
882 let remote_url = self.hybrid_remote_url.as_ref().unwrap();
883
884 let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
886 build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
887 } else {
888 let url = url::Url::parse(remote_url).map_err(|e| {
889 UniError::Io(std::io::Error::new(
890 std::io::ErrorKind::InvalidInput,
891 e.to_string(),
892 ))
893 })?;
894 let (os, _path) =
895 object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
896 Arc::from(os)
897 };
898
899 let path = PathBuf::from(&uri);
901 if path.exists() {
902 if self.fail_if_exists {
903 return Err(UniError::Internal(anyhow::anyhow!(
904 "Database already exists at {}",
905 uri
906 )));
907 }
908 } else {
909 if !self.create_if_missing {
910 return Err(UniError::NotFound { path: path.clone() });
911 }
912 std::fs::create_dir_all(&path).map_err(UniError::Io)?;
913 }
914
915 let local_store = Arc::new(
916 LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
917 );
918
919 (
922 remote_url.clone(),
923 remote_store,
924 Some(local_store as Arc<dyn ObjectStore>),
925 )
926 } else if is_remote_uri {
927 let remote_store: Arc<dyn ObjectStore> = if let Some(cloud_cfg) = &self.cloud_config {
929 build_cloud_store(cloud_cfg).map_err(UniError::Internal)?
930 } else {
931 let url = url::Url::parse(&uri).map_err(|e| {
932 UniError::Io(std::io::Error::new(
933 std::io::ErrorKind::InvalidInput,
934 e.to_string(),
935 ))
936 })?;
937 let (os, _path) =
938 object_store::parse_url(&url).map_err(|e| UniError::Internal(e.into()))?;
939 Arc::from(os)
940 };
941
942 (uri.clone(), remote_store, None)
943 } else {
944 let path = PathBuf::from(&uri);
946 let storage_path = path.join("storage");
947
948 if path.exists() {
949 if self.fail_if_exists {
950 return Err(UniError::Internal(anyhow::anyhow!(
951 "Database already exists at {}",
952 uri
953 )));
954 }
955 } else {
956 if !self.create_if_missing {
957 return Err(UniError::NotFound { path: path.clone() });
958 }
959 std::fs::create_dir_all(&path).map_err(UniError::Io)?;
960 }
961
962 if !storage_path.exists() {
964 std::fs::create_dir_all(&storage_path).map_err(UniError::Io)?;
965 }
966
967 let store = Arc::new(
968 LocalFileSystem::new_with_prefix(&path).map_err(|e| UniError::Io(e.into()))?,
969 );
970 (
971 storage_path.to_string_lossy().to_string(),
972 store.clone() as Arc<dyn ObjectStore>,
973 Some(store as Arc<dyn ObjectStore>),
974 )
975 };
976
977 let schema_obj_path = object_store::path::Path::from("catalog/schema.json");
979 let legacy_schema_obj_path = object_store::path::Path::from("schema.json");
981
982 let has_catalog_schema = match data_store.get(&schema_obj_path).await {
986 Ok(_) => true,
987 Err(object_store::Error::NotFound { .. }) => false,
988 Err(e) => return Err(UniError::Internal(e.into())),
989 };
990 if !has_catalog_schema {
991 match data_store.get(&legacy_schema_obj_path).await {
992 Ok(result) => {
993 let bytes = result
994 .bytes()
995 .await
996 .map_err(|e| UniError::Internal(e.into()))?;
997 data_store
998 .put(&schema_obj_path, bytes.into())
999 .await
1000 .map_err(|e| UniError::Internal(e.into()))?;
1001 info!(
1002 legacy = %legacy_schema_obj_path,
1003 target = %schema_obj_path,
1004 "Migrated legacy schema path to catalog path"
1005 );
1006 }
1007 Err(object_store::Error::NotFound { .. }) => {}
1008 Err(e) => return Err(UniError::Internal(e.into())),
1009 }
1010 }
1011
1012 let schema_manager = Arc::new(
1015 SchemaManager::load_from_store(data_store.clone(), &schema_obj_path)
1016 .await
1017 .map_err(UniError::Internal)?,
1018 );
1019
1020 let lancedb_storage_options = self
1021 .cloud_config
1022 .as_ref()
1023 .map(Self::cloud_config_to_lancedb_storage_options);
1024
1025 let storage = if is_hybrid || is_remote_uri {
1026 StorageManager::new_with_store_and_storage_options(
1029 &storage_uri,
1030 data_store.clone(),
1031 schema_manager.clone(),
1032 self.config.clone(),
1033 lancedb_storage_options.clone(),
1034 )
1035 .await
1036 .map_err(UniError::Internal)?
1037 } else {
1038 StorageManager::new_with_config(
1040 &storage_uri,
1041 schema_manager.clone(),
1042 self.config.clone(),
1043 )
1044 .await
1045 .map_err(UniError::Internal)?
1046 };
1047
1048 let storage = Arc::new(storage);
1049
1050 let shutdown_handle = Arc::new(ShutdownHandle::new(Duration::from_secs(30)));
1052
1053 let compaction_handle = storage
1055 .clone()
1056 .start_background_compaction(shutdown_handle.subscribe());
1057 shutdown_handle.track_task(compaction_handle);
1058
1059 let prop_cache_capacity = self.config.cache_size / 1024;
1061
1062 let prop_manager = Arc::new(PropertyManager::new(
1063 storage.clone(),
1064 schema_manager.clone(),
1065 prop_cache_capacity,
1066 ));
1067
1068 let id_store = local_store_opt
1070 .clone()
1071 .unwrap_or_else(|| data_store.clone());
1072 let wal_store = local_store_opt
1073 .clone()
1074 .unwrap_or_else(|| data_store.clone());
1075
1076 let latest_snapshot = storage
1079 .snapshot_manager()
1080 .load_latest_snapshot()
1081 .await
1082 .map_err(UniError::Internal)?;
1083
1084 let (start_version, wal_high_water_mark) = if let Some(ref snapshot) = latest_snapshot {
1085 (
1086 snapshot.version_high_water_mark + 1,
1087 snapshot.wal_high_water_mark,
1088 )
1089 } else {
1090 let has_manifests = storage
1092 .snapshot_manager()
1093 .has_any_manifests()
1094 .await
1095 .unwrap_or(false);
1096
1097 let wal_check =
1098 WriteAheadLog::new(wal_store.clone(), object_store::path::Path::from("wal"));
1099 let has_wal = wal_check.has_segments().await.unwrap_or(false);
1100
1101 if has_manifests {
1102 let snapshot_ids = storage
1104 .snapshot_manager()
1105 .list_snapshots()
1106 .await
1107 .map_err(UniError::Internal)?;
1108 if let Some(last_id) = snapshot_ids.last() {
1109 let manifest = storage
1110 .snapshot_manager()
1111 .load_snapshot(last_id)
1112 .await
1113 .map_err(UniError::Internal)?;
1114 tracing::warn!(
1115 "Latest snapshot pointer missing but found manifest '{}'. \
1116 Recovering version {}.",
1117 last_id,
1118 manifest.version_high_water_mark
1119 );
1120 (
1121 manifest.version_high_water_mark + 1,
1122 manifest.wal_high_water_mark,
1123 )
1124 } else {
1125 return Err(UniError::Internal(anyhow::anyhow!(
1126 "Snapshot manifests directory exists but contains no valid manifests. \
1127 Possible data corruption."
1128 )));
1129 }
1130 } else if has_wal {
1131 return Err(UniError::Internal(anyhow::anyhow!(
1133 "Database has WAL segments but no snapshot manifest. \
1134 Cannot safely determine version counter -- starting at 0 would cause \
1135 version conflicts and data corruption. \
1136 Restore the snapshot manifest or delete WAL to start fresh."
1137 )));
1138 } else {
1139 (0, 0)
1141 }
1142 };
1143
1144 let allocator = Arc::new(
1145 IdAllocator::new(
1146 id_store,
1147 object_store::path::Path::from("id_allocator.json"),
1148 1000,
1149 )
1150 .await
1151 .map_err(UniError::Internal)?,
1152 );
1153
1154 let wal = if !self.config.wal_enabled {
1155 None
1157 } else if is_remote_uri && !is_hybrid {
1158 Some(Arc::new(WriteAheadLog::new(
1160 wal_store,
1161 object_store::path::Path::from("wal"),
1162 )))
1163 } else if is_hybrid || !is_remote_uri {
1164 Some(Arc::new(WriteAheadLog::new(
1167 wal_store,
1168 object_store::path::Path::from("wal"),
1169 )))
1170 } else {
1171 None
1172 };
1173
1174 let writer = Arc::new(RwLock::new(
1175 Writer::new_with_config(
1176 storage.clone(),
1177 schema_manager.clone(),
1178 start_version,
1179 self.config.clone(),
1180 wal,
1181 Some(allocator),
1182 )
1183 .await
1184 .map_err(UniError::Internal)?,
1185 ));
1186
1187 let required_embed_aliases: std::collections::BTreeSet<String> = schema_manager
1188 .schema()
1189 .indexes
1190 .iter()
1191 .filter_map(|idx| {
1192 if let uni_common::core::schema::IndexDefinition::Vector(cfg) = idx {
1193 cfg.embedding_config.as_ref().map(|emb| emb.alias.clone())
1194 } else {
1195 None
1196 }
1197 })
1198 .collect();
1199
1200 if !required_embed_aliases.is_empty() && self.xervo_catalog.is_none() {
1201 return Err(UniError::Internal(anyhow::anyhow!(
1202 "Uni-Xervo catalog is required because schema has vector indexes with embedding aliases"
1203 )));
1204 }
1205
1206 let xervo_runtime = if let Some(catalog) = self.xervo_catalog {
1207 for alias in &required_embed_aliases {
1208 let spec = catalog.iter().find(|s| &s.alias == alias).ok_or_else(|| {
1209 UniError::Internal(anyhow::anyhow!(
1210 "Missing Uni-Xervo alias '{}' referenced by vector index embedding config",
1211 alias
1212 ))
1213 })?;
1214 if spec.task != ModelTask::Embed {
1215 return Err(UniError::Internal(anyhow::anyhow!(
1216 "Uni-Xervo alias '{}' must be an embed task",
1217 alias
1218 )));
1219 }
1220 }
1221
1222 let mut runtime_builder = ModelRuntime::builder().catalog(catalog);
1223 #[cfg(feature = "provider-candle")]
1224 {
1225 runtime_builder = runtime_builder
1226 .register_provider(uni_xervo::provider::LocalCandleProvider::new());
1227 }
1228 #[cfg(feature = "provider-fastembed")]
1229 {
1230 runtime_builder = runtime_builder
1231 .register_provider(uni_xervo::provider::LocalFastEmbedProvider::new());
1232 }
1233 #[cfg(feature = "provider-openai")]
1234 {
1235 runtime_builder = runtime_builder
1236 .register_provider(uni_xervo::provider::RemoteOpenAIProvider::new());
1237 }
1238 #[cfg(feature = "provider-gemini")]
1239 {
1240 runtime_builder = runtime_builder
1241 .register_provider(uni_xervo::provider::RemoteGeminiProvider::new());
1242 }
1243 #[cfg(feature = "provider-vertexai")]
1244 {
1245 runtime_builder = runtime_builder
1246 .register_provider(uni_xervo::provider::RemoteVertexAIProvider::new());
1247 }
1248 #[cfg(feature = "provider-mistral")]
1249 {
1250 runtime_builder = runtime_builder
1251 .register_provider(uni_xervo::provider::RemoteMistralProvider::new());
1252 }
1253 #[cfg(feature = "provider-anthropic")]
1254 {
1255 runtime_builder = runtime_builder
1256 .register_provider(uni_xervo::provider::RemoteAnthropicProvider::new());
1257 }
1258 #[cfg(feature = "provider-voyageai")]
1259 {
1260 runtime_builder = runtime_builder
1261 .register_provider(uni_xervo::provider::RemoteVoyageAIProvider::new());
1262 }
1263 #[cfg(feature = "provider-cohere")]
1264 {
1265 runtime_builder = runtime_builder
1266 .register_provider(uni_xervo::provider::RemoteCohereProvider::new());
1267 }
1268 #[cfg(feature = "provider-azure-openai")]
1269 {
1270 runtime_builder = runtime_builder
1271 .register_provider(uni_xervo::provider::RemoteAzureOpenAIProvider::new());
1272 }
1273 #[cfg(feature = "provider-mistralrs")]
1274 {
1275 runtime_builder = runtime_builder
1276 .register_provider(uni_xervo::provider::LocalMistralRsProvider::new());
1277 }
1278
1279 Some(
1280 runtime_builder
1281 .build()
1282 .await
1283 .map_err(|e| UniError::Internal(anyhow::anyhow!(e.to_string())))?,
1284 )
1285 } else {
1286 None
1287 };
1288
1289 if let Some(ref runtime) = xervo_runtime {
1290 let mut writer_guard = writer.write().await;
1291 writer_guard.set_xervo_runtime(runtime.clone());
1292 }
1293
1294 {
1297 let w = writer.read().await;
1298 let replayed = w
1299 .replay_wal(wal_high_water_mark)
1300 .await
1301 .map_err(UniError::Internal)?;
1302 if replayed > 0 {
1303 info!("WAL recovery: replayed {} mutations", replayed);
1304 }
1305 }
1306
1307 if let Some(interval) = self.config.auto_flush_interval {
1309 let writer_clone = writer.clone();
1310 let mut shutdown_rx = shutdown_handle.subscribe();
1311
1312 let handle = tokio::spawn(async move {
1313 let mut ticker = tokio::time::interval(interval);
1314 loop {
1315 tokio::select! {
1316 _ = ticker.tick() => {
1317 let mut w = writer_clone.write().await;
1318 if let Err(e) = w.check_flush().await {
1319 tracing::warn!("Background flush check failed: {}", e);
1320 }
1321 }
1322 _ = shutdown_rx.recv() => {
1323 tracing::info!("Auto-flush shutting down, performing final flush");
1324 let mut w = writer_clone.write().await;
1325 let _ = w.flush_to_l1(None).await;
1326 break;
1327 }
1328 }
1329 }
1330 });
1331
1332 shutdown_handle.track_task(handle);
1333 }
1334
1335 Ok(Uni {
1336 storage,
1337 schema: schema_manager,
1338 properties: prop_manager,
1339 writer: Some(writer),
1340 xervo_runtime,
1341 config: self.config,
1342 procedure_registry: Arc::new(uni_query::ProcedureRegistry::new()),
1343 shutdown_handle,
1344 })
1345 }
1346
1347 pub fn build_sync(self) -> Result<Uni> {
1349 let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
1350 rt.block_on(self.build())
1351 }
1352
1353 fn cloud_config_to_lancedb_storage_options(
1354 config: &CloudStorageConfig,
1355 ) -> std::collections::HashMap<String, String> {
1356 let mut opts = std::collections::HashMap::new();
1357
1358 match config {
1359 CloudStorageConfig::S3 {
1360 bucket,
1361 region,
1362 endpoint,
1363 access_key_id,
1364 secret_access_key,
1365 session_token,
1366 virtual_hosted_style,
1367 } => {
1368 opts.insert("bucket".to_string(), bucket.clone());
1369 opts.insert(
1370 "virtual_hosted_style_request".to_string(),
1371 virtual_hosted_style.to_string(),
1372 );
1373
1374 if let Some(r) = region {
1375 opts.insert("region".to_string(), r.clone());
1376 }
1377 if let Some(ep) = endpoint {
1378 opts.insert("endpoint".to_string(), ep.clone());
1379 if ep.starts_with("http://") {
1380 opts.insert("allow_http".to_string(), "true".to_string());
1381 }
1382 }
1383 if let Some(v) = access_key_id {
1384 opts.insert("access_key_id".to_string(), v.clone());
1385 }
1386 if let Some(v) = secret_access_key {
1387 opts.insert("secret_access_key".to_string(), v.clone());
1388 }
1389 if let Some(v) = session_token {
1390 opts.insert("session_token".to_string(), v.clone());
1391 }
1392 }
1393 CloudStorageConfig::Gcs {
1394 bucket,
1395 service_account_path,
1396 service_account_key,
1397 } => {
1398 opts.insert("bucket".to_string(), bucket.clone());
1399 if let Some(v) = service_account_path {
1400 opts.insert("service_account".to_string(), v.clone());
1401 opts.insert("application_credentials".to_string(), v.clone());
1402 }
1403 if let Some(v) = service_account_key {
1404 opts.insert("service_account_key".to_string(), v.clone());
1405 }
1406 }
1407 CloudStorageConfig::Azure {
1408 container,
1409 account,
1410 access_key,
1411 sas_token,
1412 } => {
1413 opts.insert("account_name".to_string(), account.clone());
1414 opts.insert("container_name".to_string(), container.clone());
1415 if let Some(v) = access_key {
1416 opts.insert("access_key".to_string(), v.clone());
1417 }
1418 if let Some(v) = sas_token {
1419 opts.insert("sas_token".to_string(), v.clone());
1420 }
1421 }
1422 }
1423
1424 opts
1425 }
1426}