1use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20use crate::traits::{IndexStorage, IndexType, VectorStorage};
21
22fn s3_concurrent_ops() -> usize {
23 std::env::var("DAKERA_S3_CONCURRENT_OPS")
24 .ok()
25 .and_then(|v| v.parse().ok())
26 .unwrap_or(16)
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31struct NamespaceMetadata {
32 dimension: Option<usize>,
33 vector_count: usize,
34 created_at: u64,
35 updated_at: u64,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40struct StoredVector {
41 id: String,
42 values: Vec<f32>,
43 metadata: Option<serde_json::Value>,
44}
45
46impl From<Vector> for StoredVector {
47 fn from(v: Vector) -> Self {
48 Self {
49 id: v.id,
50 values: v.values,
51 metadata: v.metadata,
52 }
53 }
54}
55
56impl From<StoredVector> for Vector {
57 fn from(v: StoredVector) -> Self {
58 Self {
59 id: v.id,
60 values: v.values,
61 metadata: v.metadata,
62 ttl_seconds: None,
63 expires_at: None,
64 }
65 }
66}
67
68#[derive(Debug, Clone, Default)]
70pub enum ObjectStorageConfig {
71 #[default]
73 Memory,
74 Filesystem { root: String },
76 S3 {
78 bucket: String,
79 region: Option<String>,
80 endpoint: Option<String>,
81 access_key_id: Option<String>,
82 secret_access_key: Option<String>,
83 },
84 Azure {
86 container: String,
87 account_name: String,
88 account_key: Option<String>,
89 sas_token: Option<String>,
90 endpoint: Option<String>,
91 },
92 Gcs {
94 bucket: String,
95 credential_path: Option<String>,
96 endpoint: Option<String>,
97 },
98}
99
100#[derive(Clone)]
102pub struct ObjectStorage {
103 operator: Operator,
104 fs_root: Option<std::path::PathBuf>,
109}
110
111impl ObjectStorage {
112 pub fn new(config: ObjectStorageConfig) -> Result<Self> {
114 let operator = Self::build_operator(&config)?;
115 let fs_root = match &config {
116 ObjectStorageConfig::Filesystem { root } => Some(std::path::PathBuf::from(root)),
117 _ => None,
118 };
119 Ok(Self { operator, fs_root })
120 }
121
122 fn is_fs_safe_segment(s: &str) -> bool {
126 !s.is_empty()
127 && s != "."
128 && s != ".."
129 && !s.contains('/')
130 && !s.contains('\\')
131 && !s.contains('\0')
132 }
133
134 async fn write_vectors_fs(
148 &self,
149 root: &std::path::Path,
150 namespace: &NamespaceId,
151 vectors: Vec<Vector>,
152 ) -> Result<usize> {
153 let vectors_dir = root.join(format!("namespaces/{}/vectors", namespace));
154 tokio::fs::create_dir_all(&vectors_dir)
155 .await
156 .map_err(|e| DakeraError::Storage(e.to_string()))?;
157
158 let results: Vec<Result<bool>> = stream::iter(vectors)
159 .map(|vector| {
160 let dir = vectors_dir.clone();
161 async move {
162 let id = vector.id.clone();
163 let stored: StoredVector = vector.into();
164 let data = serde_json::to_vec(&stored)
165 .map_err(|e| DakeraError::Storage(e.to_string()))?;
166 let final_path = dir.join(format!("{}.json", id));
167 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
168 let tmp_path = dir.join(format!(".{}.tmp.{}", id, seq));
170 tokio::task::spawn_blocking(move || -> Result<bool> {
171 use std::io::Write;
172 let is_new = !final_path.exists();
175 let mut f = std::fs::File::create(&tmp_path)
176 .map_err(|e| DakeraError::Storage(e.to_string()))?;
177 f.write_all(&data)
178 .map_err(|e| DakeraError::Storage(e.to_string()))?;
179 f.sync_all()
182 .map_err(|e| DakeraError::Storage(e.to_string()))?;
183 drop(f);
184 if let Err(e) = std::fs::rename(&tmp_path, &final_path) {
185 let _ = std::fs::remove_file(&tmp_path);
186 return Err(DakeraError::Storage(e.to_string()));
187 }
188 Ok(is_new)
189 })
190 .await
191 .map_err(|e| DakeraError::Storage(e.to_string()))?
192 }
193 })
194 .buffer_unordered(s3_concurrent_ops())
195 .collect()
196 .await;
197
198 let mut new_inserts = 0usize;
199 for r in results {
200 if r? {
201 new_inserts += 1;
202 }
203 }
204 Ok(new_inserts)
205 }
206
207 pub fn memory() -> Result<Self> {
209 Self::new(ObjectStorageConfig::Memory)
210 }
211
212 pub fn filesystem(root: impl Into<String>) -> Result<Self> {
214 Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
215 }
216
217 pub fn s3(bucket: impl Into<String>) -> Result<Self> {
219 Self::new(ObjectStorageConfig::S3 {
220 bucket: bucket.into(),
221 region: None,
222 endpoint: None,
223 access_key_id: None,
224 secret_access_key: None,
225 })
226 }
227
228 pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
230 Self::new(ObjectStorageConfig::Azure {
231 container: container.into(),
232 account_name: account_name.into(),
233 account_key: None,
234 sas_token: None,
235 endpoint: None,
236 })
237 }
238
239 pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
241 Self::new(ObjectStorageConfig::Gcs {
242 bucket: bucket.into(),
243 credential_path: None,
244 endpoint: None,
245 })
246 }
247
248 pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
249 match config {
250 ObjectStorageConfig::Memory => {
251 let builder = services::Memory::default();
252 Operator::new(builder)
253 .map(|op| op.finish())
254 .map_err(|e| DakeraError::Storage(e.to_string()))
255 }
256 ObjectStorageConfig::Filesystem { root } => {
257 let builder = services::Fs::default().root(root);
258 Operator::new(builder)
259 .map(|op| op.layer(Self::retry_layer()).finish())
260 .map_err(|e| DakeraError::Storage(e.to_string()))
261 }
262 ObjectStorageConfig::S3 {
263 bucket,
264 region,
265 endpoint,
266 access_key_id,
267 secret_access_key,
268 } => {
269 let mut builder = services::S3::default().bucket(bucket);
270
271 if let Some(region) = region {
272 builder = builder.region(region);
273 }
274 if let Some(endpoint) = endpoint {
275 builder = builder.endpoint(endpoint);
276 }
277 if let Some(key) = access_key_id {
278 builder = builder.access_key_id(key);
279 }
280 if let Some(secret) = secret_access_key {
281 builder = builder.secret_access_key(secret);
282 }
283
284 Operator::new(builder)
285 .map(|op| op.layer(Self::retry_layer()).finish())
286 .map_err(|e| DakeraError::Storage(e.to_string()))
287 }
288 ObjectStorageConfig::Azure {
289 container,
290 account_name,
291 account_key,
292 sas_token,
293 endpoint,
294 } => {
295 let mut builder = services::Azblob::default()
296 .container(container)
297 .account_name(account_name);
298
299 if let Some(key) = account_key {
300 builder = builder.account_key(key);
301 }
302 if let Some(token) = sas_token {
303 builder = builder.sas_token(token);
304 }
305 if let Some(endpoint) = endpoint {
306 builder = builder.endpoint(endpoint);
307 }
308
309 Operator::new(builder)
310 .map(|op| op.layer(Self::retry_layer()).finish())
311 .map_err(|e| DakeraError::Storage(e.to_string()))
312 }
313 ObjectStorageConfig::Gcs {
314 bucket,
315 credential_path,
316 endpoint,
317 } => {
318 let mut builder = services::Gcs::default().bucket(bucket);
319
320 if let Some(cred_path) = credential_path {
321 builder = builder.credential_path(cred_path);
322 }
323 if let Some(endpoint) = endpoint {
324 builder = builder.endpoint(endpoint);
325 }
326
327 Operator::new(builder)
328 .map(|op| op.layer(Self::retry_layer()).finish())
329 .map_err(|e| DakeraError::Storage(e.to_string()))
330 }
331 }
332 }
333
334 fn retry_layer() -> RetryLayer {
335 let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
339 .ok()
340 .and_then(|v| v.parse().ok())
341 .unwrap_or(3);
342 let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
343 .ok()
344 .and_then(|v| v.parse().ok())
345 .unwrap_or(500);
346 let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
347 .ok()
348 .and_then(|v| v.parse().ok())
349 .unwrap_or(60);
350
351 tracing::info!(
352 max_times,
353 min_delay_ms,
354 max_delay_secs,
355 "S3 retry layer configured"
356 );
357
358 RetryLayer::new()
359 .with_max_times(max_times)
360 .with_min_delay(Duration::from_millis(min_delay_ms))
361 .with_max_delay(Duration::from_secs(max_delay_secs))
362 .with_jitter()
363 .with_factor(2.0)
364 }
365
366 fn vector_path(namespace: &str, vector_id: &str) -> String {
368 format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
369 }
370
371 fn namespace_meta_path(namespace: &str) -> String {
373 format!("namespaces/{}/meta.json", namespace)
374 }
375
376 fn namespace_vectors_prefix(namespace: &str) -> String {
378 format!("namespaces/{}/vectors/", namespace)
379 }
380
381 async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
383 let path = Self::namespace_meta_path(namespace);
384 match self.operator.read(&path).await {
385 Ok(data) => {
386 let bytes = data.to_vec();
387 if bytes.is_empty() {
388 tracing::warn!(
389 namespace = %namespace,
390 path = %path,
391 "Empty namespace metadata file detected, treating as missing"
392 );
393 return Ok(None);
394 }
395 match serde_json::from_slice(&bytes) {
396 Ok(meta) => Ok(Some(meta)),
397 Err(e) => {
398 tracing::warn!(
399 namespace = %namespace,
400 path = %path,
401 error = %e,
402 bytes_len = bytes.len(),
403 "Corrupted namespace metadata, treating as missing and will be recreated"
404 );
405 Ok(None)
406 }
407 }
408 }
409 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
410 Err(e) => Err(DakeraError::Storage(e.to_string())),
411 }
412 }
413
414 async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
418 let path = Self::namespace_meta_path(namespace);
419 let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
420 self.write_atomic(&path, data).await
421 }
422
423 async fn write_atomic(&self, path: &str, data: Vec<u8>) -> Result<()> {
431 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
432 let tmp_path = format!("{}.tmp.{}.{}", path, Self::now(), seq);
433 let rename_result = async {
434 self.operator
435 .write(&tmp_path, data.clone())
436 .await
437 .map_err(|e| DakeraError::Storage(e.to_string()))?;
438 self.operator
439 .rename(&tmp_path, path)
440 .await
441 .map_err(|e| DakeraError::Storage(e.to_string()))?;
442 Ok::<(), DakeraError>(())
443 }
444 .await;
445 if let Err(e) = rename_result {
446 tracing::debug!(path = %path, error = %e, "atomic rename failed, falling back to direct write");
448 let _ = self.operator.delete(&tmp_path).await;
449 self.operator
450 .write(path, data)
451 .await
452 .map_err(|e| DakeraError::Storage(e.to_string()))?;
453 }
454 Ok(())
455 }
456
457 fn now() -> u64 {
459 std::time::SystemTime::now()
460 .duration_since(std::time::UNIX_EPOCH)
461 .unwrap_or_default()
462 .as_secs()
463 }
464}
465
466#[async_trait]
467impl VectorStorage for ObjectStorage {
468 async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
469 if vectors.is_empty() {
470 return Ok(0);
471 }
472
473 let mut meta = self
475 .read_namespace_meta(namespace)
476 .await?
477 .unwrap_or_else(|| NamespaceMetadata {
478 dimension: None,
479 vector_count: 0,
480 created_at: Self::now(),
481 updated_at: Self::now(),
482 });
483
484 let first_dim = vectors[0].values.len();
486 if let Some(dim) = meta.dimension {
487 for v in &vectors {
488 if v.values.len() != dim {
489 return Err(DakeraError::DimensionMismatch {
490 expected: dim,
491 actual: v.values.len(),
492 });
493 }
494 }
495 } else {
496 meta.dimension = Some(first_dim);
497 }
498
499 let total = vectors.len();
517
518 let fs_root = if Self::is_fs_safe_segment(namespace)
528 && vectors.iter().all(|v| Self::is_fs_safe_segment(&v.id))
529 {
530 self.fs_root.clone()
531 } else {
532 None
533 };
534
535 let new_inserts = if let Some(root) = fs_root {
536 self.write_vectors_fs(&root, namespace, vectors).await?
538 } else {
539 let op = self.operator.clone();
540 let ns = namespace.clone();
541 let results: Vec<Result<bool>> = stream::iter(vectors)
542 .map(|vector| {
543 let op = op.clone();
544 let ns = ns.clone();
545 async move {
546 let path = ObjectStorage::vector_path(&ns, &vector.id);
547 let stored: StoredVector = vector.into();
548 let data = serde_json::to_vec(&stored)
549 .map_err(|e| DakeraError::Storage(e.to_string()))?;
550
551 let exists = op
554 .exists(&path)
555 .await
556 .map_err(|e| DakeraError::Storage(e.to_string()))?;
557
558 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
560 let now_secs = std::time::SystemTime::now()
561 .duration_since(std::time::UNIX_EPOCH)
562 .unwrap_or_default()
563 .as_secs();
564 let tmp_path = format!("{}.tmp.{}.{}", path, now_secs, seq);
565 let rename_result = async {
566 op.write(&tmp_path, data.clone())
567 .await
568 .map_err(|e| DakeraError::Storage(e.to_string()))?;
569 op.rename(&tmp_path, &path)
570 .await
571 .map_err(|e| DakeraError::Storage(e.to_string()))?;
572 Ok::<(), DakeraError>(())
573 }
574 .await;
575 if let Err(e) = rename_result {
576 tracing::debug!(
577 path = %path,
578 error = %e,
579 "atomic rename failed, falling back to direct write"
580 );
581 let _ = op.delete(&tmp_path).await;
582 op.write(&path, data)
583 .await
584 .map_err(|e| DakeraError::Storage(e.to_string()))?;
585 }
586
587 Ok::<bool, DakeraError>(!exists)
588 }
589 })
590 .buffer_unordered(s3_concurrent_ops())
591 .collect()
592 .await;
593
594 let mut count = 0usize;
595 for r in results {
596 if r? {
597 count += 1;
598 }
599 }
600 count
601 };
602
603 meta.vector_count += new_inserts;
604 meta.updated_at = Self::now();
605 self.write_namespace_meta(namespace, &meta).await?;
606
607 tracing::debug!(
608 namespace = namespace,
609 upserted = total,
610 "Upserted vectors to object storage"
611 );
612
613 Ok(total)
614 }
615
616 async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
617 if ids.is_empty() {
618 return Ok(Vec::new());
619 }
620
621 let now = std::time::SystemTime::now()
622 .duration_since(std::time::UNIX_EPOCH)
623 .unwrap_or_default()
624 .as_secs();
625
626 let read_tasks: Vec<_> = ids
627 .iter()
628 .map(|id| {
629 let operator = self.operator.clone();
630 let path = Self::vector_path(namespace, id);
631 let id = id.clone();
632 async move {
633 match operator.read(&path).await {
634 Ok(data) => {
635 let bytes = data.to_vec();
636 if bytes.is_empty() {
637 tracing::warn!(
638 vector_id = %id,
639 "Empty vector file detected, skipping"
640 );
641 return Ok(None);
642 }
643 match serde_json::from_slice::<StoredVector>(&bytes) {
644 Ok(stored) => {
645 let vector: Vector = stored.into();
646 if !vector.is_expired_at(now) {
647 Ok(Some(vector))
648 } else {
649 Ok(None)
650 }
651 }
652 Err(e) => {
653 tracing::warn!(
654 vector_id = %id,
655 error = %e,
656 bytes_len = bytes.len(),
657 "Corrupted vector file detected, skipping"
658 );
659 Ok(None)
660 }
661 }
662 }
663 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
664 Err(e) => Err(DakeraError::Storage(e.to_string())),
665 }
666 }
667 })
668 .collect();
669
670 let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
671 .buffer_unordered(s3_concurrent_ops())
672 .collect()
673 .await;
674
675 let mut vectors = Vec::with_capacity(ids.len());
676 for result in results {
677 if let Some(v) = result? {
678 vectors.push(v);
679 }
680 }
681 Ok(vectors)
682 }
683
684 async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
685 let prefix = Self::namespace_vectors_prefix(namespace);
686
687 let entries = self
688 .operator
689 .list(&prefix)
690 .await
691 .map_err(|e| DakeraError::Storage(e.to_string()))?;
692
693 let json_paths: Vec<String> = entries
694 .into_iter()
695 .filter(|e| e.path().ends_with(".json"))
696 .map(|e| e.path().to_string())
697 .collect();
698
699 if json_paths.is_empty() {
700 return Ok(Vec::new());
701 }
702
703 let now = std::time::SystemTime::now()
704 .duration_since(std::time::UNIX_EPOCH)
705 .unwrap_or_default()
706 .as_secs();
707
708 let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
709 let operator = self.operator.clone();
710 async move {
711 match operator.read(&path).await {
712 Ok(data) => {
713 let bytes = data.to_vec();
714 if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
715 let vector: Vector = stored.into();
716 if !vector.is_expired_at(now) {
717 return Some(vector);
718 }
719 }
720 None
721 }
722 Err(e) => {
723 tracing::warn!(path = %path, error = %e, "Failed to read vector");
724 None
725 }
726 }
727 }
728 }))
729 .buffer_unordered(s3_concurrent_ops())
730 .collect()
731 .await;
732
733 Ok(results.into_iter().flatten().collect())
734 }
735
736 async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
737 if ids.is_empty() {
738 return Ok(0);
739 }
740
741 let delete_tasks: Vec<_> = ids
742 .iter()
743 .map(|id| {
744 let operator = self.operator.clone();
745 let path = Self::vector_path(namespace, id);
746 async move {
747 let exists = operator
748 .exists(&path)
749 .await
750 .map_err(|e| DakeraError::Storage(e.to_string()))?;
751 if exists {
752 match operator.delete(&path).await {
753 Ok(_) => Ok(true),
754 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
755 Err(e) => Err(DakeraError::Storage(e.to_string())),
756 }
757 } else {
758 Ok(false)
759 }
760 }
761 })
762 .collect();
763
764 let results: Vec<Result<bool>> = stream::iter(delete_tasks)
765 .buffer_unordered(s3_concurrent_ops())
766 .collect()
767 .await;
768
769 let mut deleted = 0;
770 for result in results {
771 if result? {
772 deleted += 1;
773 }
774 }
775
776 if deleted > 0 {
778 if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
779 meta.vector_count = meta.vector_count.saturating_sub(deleted);
780 meta.updated_at = Self::now();
781 self.write_namespace_meta(namespace, &meta).await?;
782 }
783 }
784
785 Ok(deleted)
786 }
787
788 async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
789 Ok(self.read_namespace_meta(namespace).await?.is_some())
790 }
791
792 async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
793 if self.read_namespace_meta(namespace).await?.is_none() {
794 let meta = NamespaceMetadata {
795 dimension: None,
796 vector_count: 0,
797 created_at: Self::now(),
798 updated_at: Self::now(),
799 };
800 self.write_namespace_meta(namespace, &meta).await?;
801 }
802 Ok(())
803 }
804
805 async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
806 Ok(self
807 .read_namespace_meta(namespace)
808 .await?
809 .and_then(|m| m.dimension))
810 }
811
812 async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
813 Ok(self
814 .read_namespace_meta(namespace)
815 .await?
816 .map(|m| m.vector_count)
817 .unwrap_or(0))
818 }
819
820 async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
821 let entries = self
822 .operator
823 .list("namespaces/")
824 .await
825 .map_err(|e| DakeraError::Storage(e.to_string()))?;
826
827 let mut namespaces = Vec::new();
828 for entry in entries {
829 let path = entry.path();
830 if let Some(ns) = path.strip_prefix("namespaces/") {
832 let ns = ns.trim_end_matches('/');
833 if !ns.is_empty() && !ns.contains('/') {
834 if self.read_namespace_meta(ns).await?.is_some() {
839 namespaces.push(ns.to_string());
840 }
841 }
842 }
843 }
844
845 Ok(namespaces)
846 }
847
848 async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
849 if !self.namespace_exists(namespace).await? {
851 return Ok(false);
852 }
853
854 let prefix = format!("namespaces/{}/", namespace);
858 self.operator
859 .delete_with(&prefix)
860 .recursive(true)
861 .await
862 .map_err(|e| DakeraError::Storage(e.to_string()))?;
863
864 tracing::debug!(
865 namespace = namespace,
866 "Deleted namespace from object storage"
867 );
868
869 Ok(true)
870 }
871
872 async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
873 Ok(0)
876 }
877
878 async fn cleanup_all_expired(&self) -> Result<usize> {
879 Ok(0)
881 }
882}
883
884#[async_trait]
885impl IndexStorage for ObjectStorage {
886 async fn save_index(
887 &self,
888 namespace: &NamespaceId,
889 index_type: IndexType,
890 data: Vec<u8>,
891 ) -> Result<()> {
892 let path = format!(
893 "namespaces/{}/indexes/{}.bin",
894 namespace,
895 index_type.as_str()
896 );
897 self.operator
898 .write(&path, data)
899 .await
900 .map_err(|e| DakeraError::Storage(e.to_string()))?;
901
902 tracing::debug!(
903 namespace = namespace,
904 index_type = index_type.as_str(),
905 "Saved index to object storage"
906 );
907 Ok(())
908 }
909
910 async fn load_index(
911 &self,
912 namespace: &NamespaceId,
913 index_type: IndexType,
914 ) -> Result<Option<Vec<u8>>> {
915 let path = format!(
916 "namespaces/{}/indexes/{}.bin",
917 namespace,
918 index_type.as_str()
919 );
920 match self.operator.read(&path).await {
921 Ok(data) => {
922 tracing::debug!(
923 namespace = namespace,
924 index_type = index_type.as_str(),
925 size = data.len(),
926 "Loaded index from object storage"
927 );
928 Ok(Some(data.to_vec()))
929 }
930 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
931 Err(e) => Err(DakeraError::Storage(e.to_string())),
932 }
933 }
934
935 async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
936 let path = format!(
937 "namespaces/{}/indexes/{}.bin",
938 namespace,
939 index_type.as_str()
940 );
941 let exists = self
942 .operator
943 .exists(&path)
944 .await
945 .map_err(|e| DakeraError::Storage(e.to_string()))?;
946
947 if exists {
948 self.operator
949 .delete(&path)
950 .await
951 .map_err(|e| DakeraError::Storage(e.to_string()))?;
952 tracing::debug!(
953 namespace = namespace,
954 index_type = index_type.as_str(),
955 "Deleted index from object storage"
956 );
957 }
958 Ok(exists)
959 }
960
961 async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
962 let path = format!(
963 "namespaces/{}/indexes/{}.bin",
964 namespace,
965 index_type.as_str()
966 );
967 self.operator
968 .exists(&path)
969 .await
970 .map_err(|e| DakeraError::Storage(e.to_string()))
971 }
972
973 async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
974 let prefix = format!("namespaces/{}/indexes/", namespace);
975 let entries = self
976 .operator
977 .list(&prefix)
978 .await
979 .map_err(|e| DakeraError::Storage(e.to_string()))?;
980
981 let mut indexes = Vec::new();
982 for entry in entries {
983 let path = entry.path();
984 if path.ends_with(".bin") {
985 if let Some(filename) = path.strip_prefix(&prefix) {
987 let name = filename.trim_end_matches(".bin");
988 match name {
989 "hnsw" => indexes.push(IndexType::Hnsw),
990 "pq" => indexes.push(IndexType::Pq),
991 "ivf" => indexes.push(IndexType::Ivf),
992 "spfresh" => indexes.push(IndexType::SpFresh),
993 "fulltext" => indexes.push(IndexType::FullText),
994 _ => {} }
996 }
997 }
998 }
999
1000 Ok(indexes)
1001 }
1002}
1003
1004pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
1007 ObjectStorage::build_operator(config)
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012 use super::*;
1013
1014 #[tokio::test]
1015 async fn test_object_storage_memory() {
1016 let storage = ObjectStorage::memory().unwrap();
1017 let namespace = "test".to_string();
1018
1019 storage.ensure_namespace(&namespace).await.unwrap();
1021 assert!(storage.namespace_exists(&namespace).await.unwrap());
1022
1023 let vectors = vec![
1025 Vector {
1026 id: "v1".to_string(),
1027 values: vec![1.0, 2.0, 3.0],
1028 metadata: None,
1029 ttl_seconds: None,
1030 expires_at: None,
1031 },
1032 Vector {
1033 id: "v2".to_string(),
1034 values: vec![4.0, 5.0, 6.0],
1035 metadata: Some(serde_json::json!({"key": "value"})),
1036 ttl_seconds: None,
1037 expires_at: None,
1038 },
1039 ];
1040
1041 let count = storage.upsert(&namespace, vectors).await.unwrap();
1042 assert_eq!(count, 2);
1043
1044 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
1046 assert_eq!(results.len(), 1);
1047 assert_eq!(results[0].id, "v1");
1048 assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
1049
1050 let all = storage.get_all(&namespace).await.unwrap();
1052 assert_eq!(all.len(), 2);
1053
1054 assert_eq!(storage.count(&namespace).await.unwrap(), 2);
1056
1057 let deleted = storage
1059 .delete(&namespace, &["v1".to_string()])
1060 .await
1061 .unwrap();
1062 assert_eq!(deleted, 1);
1063 assert!(storage
1064 .get(&namespace, &["v1".to_string()])
1065 .await
1066 .unwrap()
1067 .is_empty());
1068 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
1069 }
1070
1071 #[tokio::test]
1072 async fn test_object_storage_dimension_mismatch() {
1073 let storage = ObjectStorage::memory().unwrap();
1074 let namespace = "test".to_string();
1075 storage.ensure_namespace(&namespace).await.unwrap();
1076
1077 let v1 = vec![Vector {
1079 id: "v1".to_string(),
1080 values: vec![1.0, 2.0, 3.0],
1081 metadata: None,
1082 ttl_seconds: None,
1083 expires_at: None,
1084 }];
1085 storage.upsert(&namespace, v1).await.unwrap();
1086
1087 let v2 = vec![Vector {
1089 id: "v2".to_string(),
1090 values: vec![1.0, 2.0], metadata: None,
1092 ttl_seconds: None,
1093 expires_at: None,
1094 }];
1095 let result = storage.upsert(&namespace, v2).await;
1096 assert!(result.is_err());
1097 }
1098
1099 #[tokio::test]
1100 async fn test_object_storage_upsert() {
1101 let storage = ObjectStorage::memory().unwrap();
1102 let namespace = "test".to_string();
1103 storage.ensure_namespace(&namespace).await.unwrap();
1104
1105 let v1 = vec![Vector {
1107 id: "v1".to_string(),
1108 values: vec![1.0, 2.0],
1109 metadata: None,
1110 ttl_seconds: None,
1111 expires_at: None,
1112 }];
1113 storage.upsert(&namespace, v1).await.unwrap();
1114
1115 let v1_updated = vec![Vector {
1117 id: "v1".to_string(),
1118 values: vec![3.0, 4.0],
1119 metadata: None,
1120 ttl_seconds: None,
1121 expires_at: None,
1122 }];
1123 storage.upsert(&namespace, v1_updated).await.unwrap();
1124
1125 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
1127 assert_eq!(results.len(), 1);
1128 assert_eq!(results[0].values, vec![3.0, 4.0]);
1129
1130 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
1132 }
1133
1134 #[tokio::test]
1135 async fn test_index_storage() {
1136 let storage = ObjectStorage::memory().unwrap();
1137 let namespace = "test_index".to_string();
1138
1139 assert!(!storage
1141 .index_exists(&namespace, IndexType::Hnsw)
1142 .await
1143 .unwrap());
1144
1145 let index_data = b"fake hnsw index data for testing".to_vec();
1147 storage
1148 .save_index(&namespace, IndexType::Hnsw, index_data.clone())
1149 .await
1150 .unwrap();
1151
1152 assert!(storage
1154 .index_exists(&namespace, IndexType::Hnsw)
1155 .await
1156 .unwrap());
1157 assert!(!storage
1158 .index_exists(&namespace, IndexType::Pq)
1159 .await
1160 .unwrap());
1161
1162 let loaded = storage
1164 .load_index(&namespace, IndexType::Hnsw)
1165 .await
1166 .unwrap();
1167 assert!(loaded.is_some());
1168 assert_eq!(loaded.unwrap(), index_data);
1169
1170 let pq_data = b"fake pq index data".to_vec();
1172 storage
1173 .save_index(&namespace, IndexType::Pq, pq_data)
1174 .await
1175 .unwrap();
1176
1177 let indexes = storage.list_indexes(&namespace).await.unwrap();
1179 assert_eq!(indexes.len(), 2);
1180 assert!(indexes.contains(&IndexType::Hnsw));
1181 assert!(indexes.contains(&IndexType::Pq));
1182
1183 let deleted = storage
1185 .delete_index(&namespace, IndexType::Hnsw)
1186 .await
1187 .unwrap();
1188 assert!(deleted);
1189 assert!(!storage
1190 .index_exists(&namespace, IndexType::Hnsw)
1191 .await
1192 .unwrap());
1193
1194 let deleted = storage
1196 .delete_index(&namespace, IndexType::Hnsw)
1197 .await
1198 .unwrap();
1199 assert!(!deleted);
1200
1201 let loaded = storage
1203 .load_index(&namespace, IndexType::Hnsw)
1204 .await
1205 .unwrap();
1206 assert!(loaded.is_none());
1207 }
1208
1209 fn make_vector(id: &str, dim: usize) -> Vector {
1212 Vector {
1213 id: id.to_string(),
1214 values: vec![0.1; dim],
1215 metadata: None,
1216 ttl_seconds: None,
1217 expires_at: None,
1218 }
1219 }
1220
1221 #[tokio::test]
1222 async fn test_upsert_batch_parallel_all_new() {
1223 let storage = ObjectStorage::memory().unwrap();
1224 let ns = "batch_all_new".to_string();
1225 storage.ensure_namespace(&ns).await.unwrap();
1226
1227 let vectors: Vec<Vector> = (0..50).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1229 let count = storage.upsert(&ns, vectors).await.unwrap();
1230
1231 assert_eq!(count, 50);
1232 assert_eq!(storage.count(&ns).await.unwrap(), 50);
1233 }
1234
1235 #[tokio::test]
1236 async fn test_upsert_batch_parallel_idempotent_count() {
1237 let storage = ObjectStorage::memory().unwrap();
1238 let ns = "batch_idempotent".to_string();
1239 storage.ensure_namespace(&ns).await.unwrap();
1240
1241 let vectors: Vec<Vector> = (0..10).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1242 storage.upsert(&ns, vectors.clone()).await.unwrap();
1243
1244 storage.upsert(&ns, vectors).await.unwrap();
1246 assert_eq!(storage.count(&ns).await.unwrap(), 10);
1247 }
1248
1249 #[tokio::test]
1250 async fn test_upsert_batch_parallel_empty() {
1251 let storage = ObjectStorage::memory().unwrap();
1252 let ns = "batch_empty".to_string();
1253 storage.ensure_namespace(&ns).await.unwrap();
1254
1255 let count = storage.upsert(&ns, vec![]).await.unwrap();
1256 assert_eq!(count, 0);
1257 assert_eq!(storage.count(&ns).await.unwrap(), 0);
1258 }
1259
1260 #[tokio::test]
1261 async fn test_upsert_batch_parallel_large_batch() {
1262 let storage = ObjectStorage::memory().unwrap();
1264 let ns = "batch_large".to_string();
1265 storage.ensure_namespace(&ns).await.unwrap();
1266
1267 let vectors: Vec<Vector> = (0..200).map(|i| make_vector(&format!("v{i}"), 8)).collect();
1268 let count = storage.upsert(&ns, vectors).await.unwrap();
1269
1270 assert_eq!(count, 200);
1271 assert_eq!(storage.count(&ns).await.unwrap(), 200);
1272 }
1273
1274 #[tokio::test]
1278 async fn test_upsert_count_mixed_insert_update_batch() {
1279 let storage = ObjectStorage::memory().unwrap();
1280 let ns = "mixed".to_string();
1281 storage.ensure_namespace(&ns).await.unwrap();
1282
1283 storage
1285 .upsert(&ns, vec![make_vector("v0", 4), make_vector("v1", 4)])
1286 .await
1287 .unwrap();
1288 assert_eq!(storage.count(&ns).await.unwrap(), 2);
1289
1290 storage
1292 .upsert(
1293 &ns,
1294 vec![
1295 make_vector("v1", 4),
1296 make_vector("v2", 4),
1297 make_vector("v3", 4),
1298 ],
1299 )
1300 .await
1301 .unwrap();
1302 assert_eq!(storage.count(&ns).await.unwrap(), 4);
1303
1304 storage
1306 .upsert(&ns, vec![make_vector("v0", 4), make_vector("v3", 4)])
1307 .await
1308 .unwrap();
1309 assert_eq!(storage.count(&ns).await.unwrap(), 4);
1310 }
1311
1312 #[tokio::test]
1316 async fn test_upsert_fs_fast_path_roundtrip() {
1317 let tmp = tempfile::tempdir().unwrap();
1318 let storage = ObjectStorage::filesystem(tmp.path().to_str().unwrap()).unwrap();
1319 let ns = "fsfast".to_string();
1320 storage.ensure_namespace(&ns).await.unwrap();
1321
1322 storage
1324 .upsert(
1325 &ns,
1326 vec![
1327 make_vector("a", 4),
1328 make_vector("b", 4),
1329 make_vector("c", 4),
1330 ],
1331 )
1332 .await
1333 .unwrap();
1334 assert_eq!(storage.count(&ns).await.unwrap(), 3);
1335
1336 let got = storage
1338 .get(&ns, &["a".to_string(), "b".to_string()])
1339 .await
1340 .unwrap();
1341 assert_eq!(got.len(), 2);
1342
1343 storage
1345 .upsert(&ns, vec![make_vector("b", 4), make_vector("d", 4)])
1346 .await
1347 .unwrap();
1348 assert_eq!(storage.count(&ns).await.unwrap(), 4);
1349
1350 storage
1353 .upsert(
1354 &ns,
1355 vec![make_vector("safe1", 4), make_vector("../escape", 4)],
1356 )
1357 .await
1358 .unwrap();
1359 let got = storage.get(&ns, &["safe1".to_string()]).await.unwrap();
1360 assert_eq!(got.len(), 1);
1361 assert!(!tmp.path().parent().unwrap().join("escape.json").exists());
1363 }
1364
1365 #[tokio::test]
1371 async fn test_upsert_incremental_count_growing_namespace_fs() {
1372 let tmp = tempfile::tempdir().unwrap();
1373 let storage = ObjectStorage::filesystem(tmp.path().to_str().unwrap()).unwrap();
1374 let ns = "grow".to_string();
1375 storage.ensure_namespace(&ns).await.unwrap();
1376
1377 const BATCHES: usize = 40;
1378 for b in 0..BATCHES {
1379 let new_id = format!("v{}", b + 1);
1382 let upd_id = format!("v{}", b);
1383 storage
1384 .upsert(&ns, vec![make_vector(&upd_id, 4), make_vector(&new_id, 4)])
1385 .await
1386 .unwrap();
1387 assert_eq!(storage.count(&ns).await.unwrap(), b + 2);
1389 }
1390 assert_eq!(storage.count(&ns).await.unwrap(), BATCHES + 1);
1391 }
1392
1393 #[tokio::test]
1396 async fn test_upsert_incremental_count_growing_namespace_opendal() {
1397 let storage = ObjectStorage::memory().unwrap();
1398 let ns = "grow-od".to_string();
1399 storage.ensure_namespace(&ns).await.unwrap();
1400
1401 const BATCHES: usize = 30;
1402 for b in 0..BATCHES {
1403 let new_id = format!("v{}", b + 1);
1404 let upd_id = format!("v{}", b);
1405 storage
1406 .upsert(&ns, vec![make_vector(&upd_id, 4), make_vector(&new_id, 4)])
1407 .await
1408 .unwrap();
1409 assert_eq!(storage.count(&ns).await.unwrap(), b + 2);
1410 }
1411 assert_eq!(storage.count(&ns).await.unwrap(), BATCHES + 1);
1412 }
1413
1414 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1425 #[ignore]
1426 async fn profile_upsert_substages() {
1427 use std::time::Instant;
1428 const N: usize = 200;
1429 const DIM: usize = 1024; let conc = s3_concurrent_ops();
1431
1432 let make = |i: usize| -> Vector {
1434 Vector {
1435 id: format!("prof-{i}"),
1436 values: vec![0.0123_f32; DIM],
1437 metadata: Some(serde_json::json!({
1438 "_embedding_kind": "static",
1439 "tags": ["dia_1", "speaker_a", "session_3", "2026-06-03"],
1440 "importance": 0.7,
1441 "session_id": format!("sess-{}", i % 8),
1442 })),
1443 ttl_seconds: None,
1444 expires_at: None,
1445 }
1446 };
1447 let vectors: Vec<Vector> = (0..N).map(make).collect();
1448 let payload_bytes = serde_json::to_vec(&StoredVector::from(vectors[0].clone()))
1449 .unwrap()
1450 .len();
1451
1452 let tmp0 = tempfile::tempdir().unwrap();
1454 let s0 = ObjectStorage::filesystem(tmp0.path().to_str().unwrap()).unwrap();
1455 let ns = "prof".to_string();
1456 s0.ensure_namespace(&ns).await.unwrap();
1457 let t = Instant::now();
1458 let n = s0.upsert(&ns, vectors.clone()).await.unwrap();
1459 let v0_ms = t.elapsed().as_secs_f64() * 1000.0;
1460 assert_eq!(n, N);
1461
1462 async fn timed<F, Fut>(label: &str, n: usize, conc: usize, f: F) -> f64
1464 where
1465 F: Fn(usize, Operator) -> Fut,
1466 Fut: std::future::Future<Output = ()>,
1467 {
1468 let tmp = tempfile::tempdir().unwrap();
1469 let op = ObjectStorage::filesystem(tmp.path().to_str().unwrap())
1470 .unwrap()
1471 .operator;
1472 let t = Instant::now();
1473 stream::iter(0..n)
1474 .map(|i| {
1475 let op = op.clone();
1476 f(i, op)
1477 })
1478 .buffer_unordered(conc)
1479 .collect::<Vec<()>>()
1480 .await;
1481 let ms = t.elapsed().as_secs_f64() * 1000.0;
1482 drop(tmp);
1484 let _ = label;
1485 ms
1486 }
1487
1488 let data: Vec<Vec<u8>> = vectors
1489 .iter()
1490 .map(|v| serde_json::to_vec(&StoredVector::from(v.clone())).unwrap())
1491 .collect();
1492 let path_of = |i: usize| ObjectStorage::vector_path(&ns, &format!("prof-{i}"));
1493
1494 let d = data.clone();
1496 let p_exists = timed("exists", N, conc, move |i, op| {
1497 let path = path_of(i);
1498 let _ = &d;
1499 async move {
1500 let _ = op.exists(&path).await;
1501 }
1502 })
1503 .await;
1504
1505 let d = data.clone();
1507 let p_write_direct = timed("write_direct", N, conc, move |i, op| {
1508 let path = path_of(i);
1509 let bytes = d[i].clone();
1510 async move {
1511 op.write(&path, bytes).await.unwrap();
1512 }
1513 })
1514 .await;
1515
1516 let d = data.clone();
1518 let p_tmp_rename = timed("write_tmp_rename", N, conc, move |i, op| {
1519 let path = path_of(i);
1520 let bytes = d[i].clone();
1521 async move {
1522 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1523 let tmp_path = format!("{path}.tmp.{seq}");
1524 op.write(&tmp_path, bytes).await.unwrap();
1525 op.rename(&tmp_path, &path).await.unwrap();
1526 }
1527 })
1528 .await;
1529
1530 let d = data.clone();
1532 let p_full = timed("exists+tmp+rename", N, conc, move |i, op| {
1533 let path = path_of(i);
1534 let bytes = d[i].clone();
1535 async move {
1536 let _ = op.exists(&path).await;
1537 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1538 let tmp_path = format!("{path}.tmp.{seq}");
1539 op.write(&tmp_path, bytes).await.unwrap();
1540 op.rename(&tmp_path, &path).await.unwrap();
1541 }
1542 })
1543 .await;
1544
1545 let mut sweep = Vec::new();
1548 for &c in &[8usize, 16, 32, 64, 128] {
1549 let d = data.clone();
1550 let ms = timed("sweep", N, c, move |i, op| {
1551 let path = path_of(i);
1552 let bytes = d[i].clone();
1553 async move {
1554 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1555 let tmp_path = format!("{path}.tmp.{seq}");
1556 op.write(&tmp_path, bytes).await.unwrap();
1557 op.rename(&tmp_path, &path).await.unwrap();
1558 }
1559 })
1560 .await;
1561 sweep.push((c, ms));
1562 }
1563
1564 async fn raw_fs(n: usize, conc: usize, bytes: usize, do_sync: bool) -> f64 {
1568 let dir = tempfile::tempdir().unwrap();
1569 let root = dir.path().to_path_buf();
1570 let payload = vec![0u8; bytes];
1571 let t = Instant::now();
1572 stream::iter(0..n)
1573 .map(|i| {
1574 let root = root.clone();
1575 let payload = payload.clone();
1576 async move {
1577 tokio::task::spawn_blocking(move || {
1578 use std::io::Write;
1579 let p = root.join(format!("f{i}.bin"));
1580 let mut f = std::fs::File::create(&p).unwrap();
1581 f.write_all(&payload).unwrap();
1582 if do_sync {
1583 f.sync_all().unwrap();
1584 }
1585 })
1586 .await
1587 .unwrap();
1588 }
1589 })
1590 .buffer_unordered(conc)
1591 .collect::<Vec<()>>()
1592 .await;
1593 let ms = t.elapsed().as_secs_f64() * 1000.0;
1594 drop(dir);
1595 ms
1596 }
1597 let raw_nosync = raw_fs(N, 16, payload_bytes, false).await;
1598 let raw_sync = raw_fs(N, 16, payload_bytes, true).await;
1599
1600 let rate = |ms: f64| N as f64 / (ms / 1000.0);
1601 eprintln!("\n=== DAK-6287 storage_upsert sub-stage profile (fs backend) ===");
1602 eprintln!("N={N} dim={DIM} payload={payload_bytes}B concurrency={conc}");
1603 eprintln!(
1604 "V0 upsert() REAL path : {v0_ms:8.2} ms {:8.1} mem/s",
1605 rate(v0_ms)
1606 );
1607 eprintln!(
1608 "P_full exists+tmp+rename : {p_full:8.2} ms {:8.1} mem/s",
1609 rate(p_full)
1610 );
1611 eprintln!(
1612 "P_exists (stat only) : {p_exists:8.2} ms {:8.1} mem/s",
1613 rate(p_exists)
1614 );
1615 eprintln!(
1616 "P_write_tmp_rename : {p_tmp_rename:8.2} ms {:8.1} mem/s",
1617 rate(p_tmp_rename)
1618 );
1619 eprintln!(
1620 "P_write_direct : {p_write_direct:8.2} ms {:8.1} mem/s",
1621 rate(p_write_direct)
1622 );
1623 eprintln!("--- attribution (of P_full) ---");
1624 eprintln!("exists share : {:5.1}%", 100.0 * p_exists / p_full);
1625 eprintln!("rename overhead: {:5.1}% (tmp+rename {p_tmp_rename:.1} vs direct {p_write_direct:.1})",
1626 100.0 * (p_tmp_rename - p_write_direct).max(0.0) / p_full);
1627 eprintln!("--- concurrency sweep (write tmp+rename) ---");
1628 for (c, ms) in &sweep {
1629 eprintln!("conc={c:4} : {ms:8.2} ms {:8.1} mem/s", rate(*ms));
1630 }
1631 eprintln!("--- fsync isolation (raw std::fs, conc=16, {payload_bytes}B) ---");
1632 eprintln!(
1633 "write no-sync : {raw_nosync:8.2} ms {:8.1} mem/s",
1634 rate(raw_nosync)
1635 );
1636 eprintln!(
1637 "write+sync_all: {raw_sync:8.2} ms {:8.1} mem/s",
1638 rate(raw_sync)
1639 );
1640 eprintln!(
1641 "=> fsync cost factor: {:.1}x",
1642 raw_sync / raw_nosync.max(0.001)
1643 );
1644 }
1645}