1use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20use crate::traits::{IndexStorage, IndexType, VectorStorage};
21
22fn s3_concurrent_ops() -> usize {
23 std::env::var("DAKERA_S3_CONCURRENT_OPS")
24 .ok()
25 .and_then(|v| v.parse().ok())
26 .unwrap_or(16)
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31struct NamespaceMetadata {
32 dimension: Option<usize>,
33 vector_count: usize,
34 created_at: u64,
35 updated_at: u64,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40struct StoredVector {
41 id: String,
42 values: Vec<f32>,
43 metadata: Option<serde_json::Value>,
44}
45
46impl From<Vector> for StoredVector {
47 fn from(v: Vector) -> Self {
48 Self {
49 id: v.id,
50 values: v.values,
51 metadata: v.metadata,
52 }
53 }
54}
55
56impl From<StoredVector> for Vector {
57 fn from(v: StoredVector) -> Self {
58 Self {
59 id: v.id,
60 values: v.values,
61 metadata: v.metadata,
62 ttl_seconds: None,
63 expires_at: None,
64 }
65 }
66}
67
68#[derive(Debug, Clone, Default)]
70pub enum ObjectStorageConfig {
71 #[default]
73 Memory,
74 Filesystem { root: String },
76 S3 {
78 bucket: String,
79 region: Option<String>,
80 endpoint: Option<String>,
81 access_key_id: Option<String>,
82 secret_access_key: Option<String>,
83 },
84 Azure {
86 container: String,
87 account_name: String,
88 account_key: Option<String>,
89 sas_token: Option<String>,
90 endpoint: Option<String>,
91 },
92 Gcs {
94 bucket: String,
95 credential_path: Option<String>,
96 endpoint: Option<String>,
97 },
98}
99
100#[derive(Clone)]
102pub struct ObjectStorage {
103 operator: Operator,
104 fs_root: Option<std::path::PathBuf>,
109 needs_tmp_rename: bool,
121}
122
123impl ObjectStorage {
124 pub fn new(config: ObjectStorageConfig) -> Result<Self> {
126 let operator = Self::build_operator(&config)?;
127 let fs_root = match &config {
128 ObjectStorageConfig::Filesystem { root } => Some(std::path::PathBuf::from(root)),
129 _ => None,
130 };
131 let needs_tmp_rename = Self::needs_tmp_rename_for(&config);
132 Ok(Self {
133 operator,
134 fs_root,
135 needs_tmp_rename,
136 })
137 }
138
139 fn needs_tmp_rename_for(config: &ObjectStorageConfig) -> bool {
146 matches!(config, ObjectStorageConfig::Filesystem { .. })
147 }
148
149 fn is_fs_safe_segment(s: &str) -> bool {
153 !s.is_empty()
154 && s != "."
155 && s != ".."
156 && !s.contains('/')
157 && !s.contains('\\')
158 && !s.contains('\0')
159 }
160
161 async fn write_vectors_fs(
175 &self,
176 root: &std::path::Path,
177 namespace: &NamespaceId,
178 vectors: Vec<Vector>,
179 ) -> Result<usize> {
180 let vectors_dir = root.join(format!("namespaces/{}/vectors", namespace));
181 tokio::fs::create_dir_all(&vectors_dir)
182 .await
183 .map_err(|e| DakeraError::Storage(e.to_string()))?;
184
185 let results: Vec<Result<bool>> = stream::iter(vectors)
186 .map(|vector| {
187 let dir = vectors_dir.clone();
188 async move {
189 let id = vector.id.clone();
190 let stored: StoredVector = vector.into();
191 let data = serde_json::to_vec(&stored)
192 .map_err(|e| DakeraError::Storage(e.to_string()))?;
193 let final_path = dir.join(format!("{}.json", id));
194 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
195 let tmp_path = dir.join(format!(".{}.tmp.{}", id, seq));
197 tokio::task::spawn_blocking(move || -> Result<bool> {
198 use std::io::Write;
199 let is_new = !final_path.exists();
202 let mut f = std::fs::File::create(&tmp_path)
203 .map_err(|e| DakeraError::Storage(e.to_string()))?;
204 f.write_all(&data)
205 .map_err(|e| DakeraError::Storage(e.to_string()))?;
206 f.sync_all()
209 .map_err(|e| DakeraError::Storage(e.to_string()))?;
210 drop(f);
211 if let Err(e) = std::fs::rename(&tmp_path, &final_path) {
212 let _ = std::fs::remove_file(&tmp_path);
213 return Err(DakeraError::Storage(e.to_string()));
214 }
215 Ok(is_new)
216 })
217 .await
218 .map_err(|e| DakeraError::Storage(e.to_string()))?
219 }
220 })
221 .buffer_unordered(s3_concurrent_ops())
222 .collect()
223 .await;
224
225 let mut new_inserts = 0usize;
226 for r in results {
227 if r? {
228 new_inserts += 1;
229 }
230 }
231 Ok(new_inserts)
232 }
233
234 pub fn memory() -> Result<Self> {
236 Self::new(ObjectStorageConfig::Memory)
237 }
238
239 pub fn filesystem(root: impl Into<String>) -> Result<Self> {
241 Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
242 }
243
244 pub fn s3(bucket: impl Into<String>) -> Result<Self> {
246 Self::new(ObjectStorageConfig::S3 {
247 bucket: bucket.into(),
248 region: None,
249 endpoint: None,
250 access_key_id: None,
251 secret_access_key: None,
252 })
253 }
254
255 pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
257 Self::new(ObjectStorageConfig::Azure {
258 container: container.into(),
259 account_name: account_name.into(),
260 account_key: None,
261 sas_token: None,
262 endpoint: None,
263 })
264 }
265
266 pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
268 Self::new(ObjectStorageConfig::Gcs {
269 bucket: bucket.into(),
270 credential_path: None,
271 endpoint: None,
272 })
273 }
274
275 pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
276 match config {
277 ObjectStorageConfig::Memory => {
278 let builder = services::Memory::default();
279 Operator::new(builder)
280 .map(|op| op.finish())
281 .map_err(|e| DakeraError::Storage(e.to_string()))
282 }
283 ObjectStorageConfig::Filesystem { root } => {
284 let builder = services::Fs::default().root(root);
285 Operator::new(builder)
286 .map(|op| op.layer(Self::retry_layer()).finish())
287 .map_err(|e| DakeraError::Storage(e.to_string()))
288 }
289 ObjectStorageConfig::S3 {
290 bucket,
291 region,
292 endpoint,
293 access_key_id,
294 secret_access_key,
295 } => {
296 let mut builder = services::S3::default().bucket(bucket);
297
298 if let Some(region) = region {
299 builder = builder.region(region);
300 }
301 if let Some(endpoint) = endpoint {
302 builder = builder.endpoint(endpoint);
303 }
304 if let Some(key) = access_key_id {
305 builder = builder.access_key_id(key);
306 }
307 if let Some(secret) = secret_access_key {
308 builder = builder.secret_access_key(secret);
309 }
310
311 Operator::new(builder)
312 .map(|op| op.layer(Self::retry_layer()).finish())
313 .map_err(|e| DakeraError::Storage(e.to_string()))
314 }
315 ObjectStorageConfig::Azure {
316 container,
317 account_name,
318 account_key,
319 sas_token,
320 endpoint,
321 } => {
322 let mut builder = services::Azblob::default()
323 .container(container)
324 .account_name(account_name);
325
326 if let Some(key) = account_key {
327 builder = builder.account_key(key);
328 }
329 if let Some(token) = sas_token {
330 builder = builder.sas_token(token);
331 }
332 if let Some(endpoint) = endpoint {
333 builder = builder.endpoint(endpoint);
334 }
335
336 Operator::new(builder)
337 .map(|op| op.layer(Self::retry_layer()).finish())
338 .map_err(|e| DakeraError::Storage(e.to_string()))
339 }
340 ObjectStorageConfig::Gcs {
341 bucket,
342 credential_path,
343 endpoint,
344 } => {
345 let mut builder = services::Gcs::default().bucket(bucket);
346
347 if let Some(cred_path) = credential_path {
348 builder = builder.credential_path(cred_path);
349 }
350 if let Some(endpoint) = endpoint {
351 builder = builder.endpoint(endpoint);
352 }
353
354 Operator::new(builder)
355 .map(|op| op.layer(Self::retry_layer()).finish())
356 .map_err(|e| DakeraError::Storage(e.to_string()))
357 }
358 }
359 }
360
361 fn retry_layer() -> RetryLayer {
362 let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
366 .ok()
367 .and_then(|v| v.parse().ok())
368 .unwrap_or(3);
369 let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
370 .ok()
371 .and_then(|v| v.parse().ok())
372 .unwrap_or(500);
373 let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
374 .ok()
375 .and_then(|v| v.parse().ok())
376 .unwrap_or(60);
377
378 tracing::info!(
379 max_times,
380 min_delay_ms,
381 max_delay_secs,
382 "S3 retry layer configured"
383 );
384
385 RetryLayer::new()
386 .with_max_times(max_times)
387 .with_min_delay(Duration::from_millis(min_delay_ms))
388 .with_max_delay(Duration::from_secs(max_delay_secs))
389 .with_jitter()
390 .with_factor(2.0)
391 }
392
393 fn vector_path(namespace: &str, vector_id: &str) -> String {
395 format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
396 }
397
398 fn namespace_meta_path(namespace: &str) -> String {
400 format!("namespaces/{}/meta.json", namespace)
401 }
402
403 fn namespace_vectors_prefix(namespace: &str) -> String {
405 format!("namespaces/{}/vectors/", namespace)
406 }
407
408 async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
410 let path = Self::namespace_meta_path(namespace);
411 match self.operator.read(&path).await {
412 Ok(data) => {
413 let bytes = data.to_vec();
414 if bytes.is_empty() {
415 tracing::warn!(
416 namespace = %namespace,
417 path = %path,
418 "Empty namespace metadata file detected, treating as missing"
419 );
420 return Ok(None);
421 }
422 match serde_json::from_slice(&bytes) {
423 Ok(meta) => Ok(Some(meta)),
424 Err(e) => {
425 tracing::warn!(
426 namespace = %namespace,
427 path = %path,
428 error = %e,
429 bytes_len = bytes.len(),
430 "Corrupted namespace metadata, treating as missing and will be recreated"
431 );
432 Ok(None)
433 }
434 }
435 }
436 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
437 Err(e) => Err(DakeraError::Storage(e.to_string())),
438 }
439 }
440
441 async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
445 let path = Self::namespace_meta_path(namespace);
446 let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
447 self.write_atomic(&path, data).await
448 }
449
450 async fn write_atomic(&self, path: &str, data: Vec<u8>) -> Result<()> {
465 if !self.needs_tmp_rename {
466 return self
468 .operator
469 .write(path, data)
470 .await
471 .map(|_| ())
472 .map_err(|e| DakeraError::Storage(e.to_string()));
473 }
474 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
475 let tmp_path = format!("{}.tmp.{}.{}", path, Self::now(), seq);
476 let rename_result = async {
477 self.operator
478 .write(&tmp_path, data.clone())
479 .await
480 .map_err(|e| DakeraError::Storage(e.to_string()))?;
481 self.operator
482 .rename(&tmp_path, path)
483 .await
484 .map_err(|e| DakeraError::Storage(e.to_string()))?;
485 Ok::<(), DakeraError>(())
486 }
487 .await;
488 if let Err(e) = rename_result {
489 tracing::debug!(path = %path, error = %e, "atomic rename failed, falling back to direct write");
491 let _ = self.operator.delete(&tmp_path).await;
492 self.operator
493 .write(path, data)
494 .await
495 .map_err(|e| DakeraError::Storage(e.to_string()))?;
496 }
497 Ok(())
498 }
499
500 fn now() -> u64 {
502 std::time::SystemTime::now()
503 .duration_since(std::time::UNIX_EPOCH)
504 .unwrap_or_default()
505 .as_secs()
506 }
507}
508
509#[async_trait]
510impl VectorStorage for ObjectStorage {
511 async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
512 if vectors.is_empty() {
513 return Ok(0);
514 }
515
516 let mut meta = self
518 .read_namespace_meta(namespace)
519 .await?
520 .unwrap_or_else(|| NamespaceMetadata {
521 dimension: None,
522 vector_count: 0,
523 created_at: Self::now(),
524 updated_at: Self::now(),
525 });
526
527 let first_dim = vectors[0].values.len();
529 if let Some(dim) = meta.dimension {
530 for v in &vectors {
531 if v.values.len() != dim {
532 return Err(DakeraError::DimensionMismatch {
533 expected: dim,
534 actual: v.values.len(),
535 });
536 }
537 }
538 } else {
539 meta.dimension = Some(first_dim);
540 }
541
542 let total = vectors.len();
560
561 let fs_root = if Self::is_fs_safe_segment(namespace)
571 && vectors.iter().all(|v| Self::is_fs_safe_segment(&v.id))
572 {
573 self.fs_root.clone()
574 } else {
575 None
576 };
577
578 let new_inserts = if let Some(root) = fs_root {
579 self.write_vectors_fs(&root, namespace, vectors).await?
581 } else {
582 let op = self.operator.clone();
583 let ns = namespace.clone();
584 let needs_tmp_rename = self.needs_tmp_rename;
587 let results: Vec<Result<bool>> = stream::iter(vectors)
588 .map(|vector| {
589 let op = op.clone();
590 let ns = ns.clone();
591 async move {
592 let path = ObjectStorage::vector_path(&ns, &vector.id);
593 let stored: StoredVector = vector.into();
594 let data = serde_json::to_vec(&stored)
595 .map_err(|e| DakeraError::Storage(e.to_string()))?;
596
597 let exists = op
600 .exists(&path)
601 .await
602 .map_err(|e| DakeraError::Storage(e.to_string()))?;
603
604 if needs_tmp_rename {
605 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
609 let now_secs = std::time::SystemTime::now()
610 .duration_since(std::time::UNIX_EPOCH)
611 .unwrap_or_default()
612 .as_secs();
613 let tmp_path = format!("{}.tmp.{}.{}", path, now_secs, seq);
614 let rename_result = async {
615 op.write(&tmp_path, data.clone())
616 .await
617 .map_err(|e| DakeraError::Storage(e.to_string()))?;
618 op.rename(&tmp_path, &path)
619 .await
620 .map_err(|e| DakeraError::Storage(e.to_string()))?;
621 Ok::<(), DakeraError>(())
622 }
623 .await;
624 if let Err(e) = rename_result {
625 tracing::debug!(
626 path = %path,
627 error = %e,
628 "atomic rename failed, falling back to direct write"
629 );
630 let _ = op.delete(&tmp_path).await;
631 op.write(&path, data)
632 .await
633 .map_err(|e| DakeraError::Storage(e.to_string()))?;
634 }
635 } else {
636 op.write(&path, data)
643 .await
644 .map_err(|e| DakeraError::Storage(e.to_string()))?;
645 }
646
647 Ok::<bool, DakeraError>(!exists)
648 }
649 })
650 .buffer_unordered(s3_concurrent_ops())
651 .collect()
652 .await;
653
654 let mut count = 0usize;
655 for r in results {
656 if r? {
657 count += 1;
658 }
659 }
660 count
661 };
662
663 meta.vector_count += new_inserts;
664 meta.updated_at = Self::now();
665 self.write_namespace_meta(namespace, &meta).await?;
666
667 tracing::debug!(
668 namespace = namespace,
669 upserted = total,
670 "Upserted vectors to object storage"
671 );
672
673 Ok(total)
674 }
675
676 async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
677 if ids.is_empty() {
678 return Ok(Vec::new());
679 }
680
681 let now = std::time::SystemTime::now()
682 .duration_since(std::time::UNIX_EPOCH)
683 .unwrap_or_default()
684 .as_secs();
685
686 let read_tasks: Vec<_> = ids
687 .iter()
688 .map(|id| {
689 let operator = self.operator.clone();
690 let path = Self::vector_path(namespace, id);
691 let id = id.clone();
692 async move {
693 match operator.read(&path).await {
694 Ok(data) => {
695 let bytes = data.to_vec();
696 if bytes.is_empty() {
697 tracing::warn!(
698 vector_id = %id,
699 "Empty vector file detected, skipping"
700 );
701 return Ok(None);
702 }
703 match serde_json::from_slice::<StoredVector>(&bytes) {
704 Ok(stored) => {
705 let vector: Vector = stored.into();
706 if !vector.is_expired_at(now) {
707 Ok(Some(vector))
708 } else {
709 Ok(None)
710 }
711 }
712 Err(e) => {
713 tracing::warn!(
714 vector_id = %id,
715 error = %e,
716 bytes_len = bytes.len(),
717 "Corrupted vector file detected, skipping"
718 );
719 Ok(None)
720 }
721 }
722 }
723 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
724 Err(e) => Err(DakeraError::Storage(e.to_string())),
725 }
726 }
727 })
728 .collect();
729
730 let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
731 .buffer_unordered(s3_concurrent_ops())
732 .collect()
733 .await;
734
735 let mut vectors = Vec::with_capacity(ids.len());
736 for result in results {
737 if let Some(v) = result? {
738 vectors.push(v);
739 }
740 }
741 Ok(vectors)
742 }
743
744 async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
745 let prefix = Self::namespace_vectors_prefix(namespace);
746
747 let entries = self
748 .operator
749 .list(&prefix)
750 .await
751 .map_err(|e| DakeraError::Storage(e.to_string()))?;
752
753 let json_paths: Vec<String> = entries
754 .into_iter()
755 .filter(|e| e.path().ends_with(".json"))
756 .map(|e| e.path().to_string())
757 .collect();
758
759 if json_paths.is_empty() {
760 return Ok(Vec::new());
761 }
762
763 let now = std::time::SystemTime::now()
764 .duration_since(std::time::UNIX_EPOCH)
765 .unwrap_or_default()
766 .as_secs();
767
768 let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
769 let operator = self.operator.clone();
770 async move {
771 match operator.read(&path).await {
772 Ok(data) => {
773 let bytes = data.to_vec();
774 if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
775 let vector: Vector = stored.into();
776 if !vector.is_expired_at(now) {
777 return Some(vector);
778 }
779 }
780 None
781 }
782 Err(e) => {
783 tracing::warn!(path = %path, error = %e, "Failed to read vector");
784 None
785 }
786 }
787 }
788 }))
789 .buffer_unordered(s3_concurrent_ops())
790 .collect()
791 .await;
792
793 Ok(results.into_iter().flatten().collect())
794 }
795
796 async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
797 if ids.is_empty() {
798 return Ok(0);
799 }
800
801 let delete_tasks: Vec<_> = ids
802 .iter()
803 .map(|id| {
804 let operator = self.operator.clone();
805 let path = Self::vector_path(namespace, id);
806 async move {
807 let exists = operator
808 .exists(&path)
809 .await
810 .map_err(|e| DakeraError::Storage(e.to_string()))?;
811 if exists {
812 match operator.delete(&path).await {
813 Ok(_) => Ok(true),
814 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
815 Err(e) => Err(DakeraError::Storage(e.to_string())),
816 }
817 } else {
818 Ok(false)
819 }
820 }
821 })
822 .collect();
823
824 let results: Vec<Result<bool>> = stream::iter(delete_tasks)
825 .buffer_unordered(s3_concurrent_ops())
826 .collect()
827 .await;
828
829 let mut deleted = 0;
830 for result in results {
831 if result? {
832 deleted += 1;
833 }
834 }
835
836 if deleted > 0 {
838 if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
839 meta.vector_count = meta.vector_count.saturating_sub(deleted);
840 meta.updated_at = Self::now();
841 self.write_namespace_meta(namespace, &meta).await?;
842 }
843 }
844
845 Ok(deleted)
846 }
847
848 async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
849 Ok(self.read_namespace_meta(namespace).await?.is_some())
850 }
851
852 async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
853 if self.read_namespace_meta(namespace).await?.is_none() {
854 let meta = NamespaceMetadata {
855 dimension: None,
856 vector_count: 0,
857 created_at: Self::now(),
858 updated_at: Self::now(),
859 };
860 self.write_namespace_meta(namespace, &meta).await?;
861 }
862 Ok(())
863 }
864
865 async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
866 Ok(self
867 .read_namespace_meta(namespace)
868 .await?
869 .and_then(|m| m.dimension))
870 }
871
872 async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
873 Ok(self
874 .read_namespace_meta(namespace)
875 .await?
876 .map(|m| m.vector_count)
877 .unwrap_or(0))
878 }
879
880 async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
881 let entries = self
882 .operator
883 .list("namespaces/")
884 .await
885 .map_err(|e| DakeraError::Storage(e.to_string()))?;
886
887 let mut namespaces = Vec::new();
888 for entry in entries {
889 let path = entry.path();
890 if let Some(ns) = path.strip_prefix("namespaces/") {
892 let ns = ns.trim_end_matches('/');
893 if !ns.is_empty() && !ns.contains('/') {
894 if self.read_namespace_meta(ns).await?.is_some() {
899 namespaces.push(ns.to_string());
900 }
901 }
902 }
903 }
904
905 Ok(namespaces)
906 }
907
908 async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
909 if !self.namespace_exists(namespace).await? {
911 return Ok(false);
912 }
913
914 let prefix = format!("namespaces/{}/", namespace);
918 self.operator
919 .delete_with(&prefix)
920 .recursive(true)
921 .await
922 .map_err(|e| DakeraError::Storage(e.to_string()))?;
923
924 tracing::debug!(
925 namespace = namespace,
926 "Deleted namespace from object storage"
927 );
928
929 Ok(true)
930 }
931
932 async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
933 Ok(0)
936 }
937
938 async fn cleanup_all_expired(&self) -> Result<usize> {
939 Ok(0)
941 }
942}
943
944#[async_trait]
945impl IndexStorage for ObjectStorage {
946 async fn save_index(
947 &self,
948 namespace: &NamespaceId,
949 index_type: IndexType,
950 data: Vec<u8>,
951 ) -> Result<()> {
952 let path = format!(
953 "namespaces/{}/indexes/{}.bin",
954 namespace,
955 index_type.as_str()
956 );
957 self.operator
958 .write(&path, data)
959 .await
960 .map_err(|e| DakeraError::Storage(e.to_string()))?;
961
962 tracing::debug!(
963 namespace = namespace,
964 index_type = index_type.as_str(),
965 "Saved index to object storage"
966 );
967 Ok(())
968 }
969
970 async fn load_index(
971 &self,
972 namespace: &NamespaceId,
973 index_type: IndexType,
974 ) -> Result<Option<Vec<u8>>> {
975 let path = format!(
976 "namespaces/{}/indexes/{}.bin",
977 namespace,
978 index_type.as_str()
979 );
980 match self.operator.read(&path).await {
981 Ok(data) => {
982 tracing::debug!(
983 namespace = namespace,
984 index_type = index_type.as_str(),
985 size = data.len(),
986 "Loaded index from object storage"
987 );
988 Ok(Some(data.to_vec()))
989 }
990 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
991 Err(e) => Err(DakeraError::Storage(e.to_string())),
992 }
993 }
994
995 async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
996 let path = format!(
997 "namespaces/{}/indexes/{}.bin",
998 namespace,
999 index_type.as_str()
1000 );
1001 let exists = self
1002 .operator
1003 .exists(&path)
1004 .await
1005 .map_err(|e| DakeraError::Storage(e.to_string()))?;
1006
1007 if exists {
1008 self.operator
1009 .delete(&path)
1010 .await
1011 .map_err(|e| DakeraError::Storage(e.to_string()))?;
1012 tracing::debug!(
1013 namespace = namespace,
1014 index_type = index_type.as_str(),
1015 "Deleted index from object storage"
1016 );
1017 }
1018 Ok(exists)
1019 }
1020
1021 async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
1022 let path = format!(
1023 "namespaces/{}/indexes/{}.bin",
1024 namespace,
1025 index_type.as_str()
1026 );
1027 self.operator
1028 .exists(&path)
1029 .await
1030 .map_err(|e| DakeraError::Storage(e.to_string()))
1031 }
1032
1033 async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
1034 let prefix = format!("namespaces/{}/indexes/", namespace);
1035 let entries = self
1036 .operator
1037 .list(&prefix)
1038 .await
1039 .map_err(|e| DakeraError::Storage(e.to_string()))?;
1040
1041 let mut indexes = Vec::new();
1042 for entry in entries {
1043 let path = entry.path();
1044 if path.ends_with(".bin") {
1045 if let Some(filename) = path.strip_prefix(&prefix) {
1047 let name = filename.trim_end_matches(".bin");
1048 match name {
1049 "hnsw" => indexes.push(IndexType::Hnsw),
1050 "pq" => indexes.push(IndexType::Pq),
1051 "ivf" => indexes.push(IndexType::Ivf),
1052 "spfresh" => indexes.push(IndexType::SpFresh),
1053 "fulltext" => indexes.push(IndexType::FullText),
1054 _ => {} }
1056 }
1057 }
1058 }
1059
1060 Ok(indexes)
1061 }
1062}
1063
1064pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
1067 ObjectStorage::build_operator(config)
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072 use super::*;
1073
1074 #[tokio::test]
1075 async fn test_object_storage_memory() {
1076 let storage = ObjectStorage::memory().unwrap();
1077 let namespace = "test".to_string();
1078
1079 storage.ensure_namespace(&namespace).await.unwrap();
1081 assert!(storage.namespace_exists(&namespace).await.unwrap());
1082
1083 let vectors = vec![
1085 Vector {
1086 id: "v1".to_string(),
1087 values: vec![1.0, 2.0, 3.0],
1088 metadata: None,
1089 ttl_seconds: None,
1090 expires_at: None,
1091 },
1092 Vector {
1093 id: "v2".to_string(),
1094 values: vec![4.0, 5.0, 6.0],
1095 metadata: Some(serde_json::json!({"key": "value"})),
1096 ttl_seconds: None,
1097 expires_at: None,
1098 },
1099 ];
1100
1101 let count = storage.upsert(&namespace, vectors).await.unwrap();
1102 assert_eq!(count, 2);
1103
1104 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
1106 assert_eq!(results.len(), 1);
1107 assert_eq!(results[0].id, "v1");
1108 assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
1109
1110 let all = storage.get_all(&namespace).await.unwrap();
1112 assert_eq!(all.len(), 2);
1113
1114 assert_eq!(storage.count(&namespace).await.unwrap(), 2);
1116
1117 let deleted = storage
1119 .delete(&namespace, &["v1".to_string()])
1120 .await
1121 .unwrap();
1122 assert_eq!(deleted, 1);
1123 assert!(storage
1124 .get(&namespace, &["v1".to_string()])
1125 .await
1126 .unwrap()
1127 .is_empty());
1128 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
1129 }
1130
1131 #[tokio::test]
1132 async fn test_object_storage_dimension_mismatch() {
1133 let storage = ObjectStorage::memory().unwrap();
1134 let namespace = "test".to_string();
1135 storage.ensure_namespace(&namespace).await.unwrap();
1136
1137 let v1 = vec![Vector {
1139 id: "v1".to_string(),
1140 values: vec![1.0, 2.0, 3.0],
1141 metadata: None,
1142 ttl_seconds: None,
1143 expires_at: None,
1144 }];
1145 storage.upsert(&namespace, v1).await.unwrap();
1146
1147 let v2 = vec![Vector {
1149 id: "v2".to_string(),
1150 values: vec![1.0, 2.0], metadata: None,
1152 ttl_seconds: None,
1153 expires_at: None,
1154 }];
1155 let result = storage.upsert(&namespace, v2).await;
1156 assert!(result.is_err());
1157 }
1158
1159 #[tokio::test]
1160 async fn test_object_storage_upsert() {
1161 let storage = ObjectStorage::memory().unwrap();
1162 let namespace = "test".to_string();
1163 storage.ensure_namespace(&namespace).await.unwrap();
1164
1165 let v1 = vec![Vector {
1167 id: "v1".to_string(),
1168 values: vec![1.0, 2.0],
1169 metadata: None,
1170 ttl_seconds: None,
1171 expires_at: None,
1172 }];
1173 storage.upsert(&namespace, v1).await.unwrap();
1174
1175 let v1_updated = vec![Vector {
1177 id: "v1".to_string(),
1178 values: vec![3.0, 4.0],
1179 metadata: None,
1180 ttl_seconds: None,
1181 expires_at: None,
1182 }];
1183 storage.upsert(&namespace, v1_updated).await.unwrap();
1184
1185 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
1187 assert_eq!(results.len(), 1);
1188 assert_eq!(results[0].values, vec![3.0, 4.0]);
1189
1190 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
1192 }
1193
1194 #[tokio::test]
1195 async fn test_index_storage() {
1196 let storage = ObjectStorage::memory().unwrap();
1197 let namespace = "test_index".to_string();
1198
1199 assert!(!storage
1201 .index_exists(&namespace, IndexType::Hnsw)
1202 .await
1203 .unwrap());
1204
1205 let index_data = b"fake hnsw index data for testing".to_vec();
1207 storage
1208 .save_index(&namespace, IndexType::Hnsw, index_data.clone())
1209 .await
1210 .unwrap();
1211
1212 assert!(storage
1214 .index_exists(&namespace, IndexType::Hnsw)
1215 .await
1216 .unwrap());
1217 assert!(!storage
1218 .index_exists(&namespace, IndexType::Pq)
1219 .await
1220 .unwrap());
1221
1222 let loaded = storage
1224 .load_index(&namespace, IndexType::Hnsw)
1225 .await
1226 .unwrap();
1227 assert!(loaded.is_some());
1228 assert_eq!(loaded.unwrap(), index_data);
1229
1230 let pq_data = b"fake pq index data".to_vec();
1232 storage
1233 .save_index(&namespace, IndexType::Pq, pq_data)
1234 .await
1235 .unwrap();
1236
1237 let indexes = storage.list_indexes(&namespace).await.unwrap();
1239 assert_eq!(indexes.len(), 2);
1240 assert!(indexes.contains(&IndexType::Hnsw));
1241 assert!(indexes.contains(&IndexType::Pq));
1242
1243 let deleted = storage
1245 .delete_index(&namespace, IndexType::Hnsw)
1246 .await
1247 .unwrap();
1248 assert!(deleted);
1249 assert!(!storage
1250 .index_exists(&namespace, IndexType::Hnsw)
1251 .await
1252 .unwrap());
1253
1254 let deleted = storage
1256 .delete_index(&namespace, IndexType::Hnsw)
1257 .await
1258 .unwrap();
1259 assert!(!deleted);
1260
1261 let loaded = storage
1263 .load_index(&namespace, IndexType::Hnsw)
1264 .await
1265 .unwrap();
1266 assert!(loaded.is_none());
1267 }
1268
1269 fn make_vector(id: &str, dim: usize) -> Vector {
1272 Vector {
1273 id: id.to_string(),
1274 values: vec![0.1; dim],
1275 metadata: None,
1276 ttl_seconds: None,
1277 expires_at: None,
1278 }
1279 }
1280
1281 #[tokio::test]
1282 async fn test_upsert_batch_parallel_all_new() {
1283 let storage = ObjectStorage::memory().unwrap();
1284 let ns = "batch_all_new".to_string();
1285 storage.ensure_namespace(&ns).await.unwrap();
1286
1287 let vectors: Vec<Vector> = (0..50).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1289 let count = storage.upsert(&ns, vectors).await.unwrap();
1290
1291 assert_eq!(count, 50);
1292 assert_eq!(storage.count(&ns).await.unwrap(), 50);
1293 }
1294
1295 #[tokio::test]
1296 async fn test_upsert_batch_parallel_idempotent_count() {
1297 let storage = ObjectStorage::memory().unwrap();
1298 let ns = "batch_idempotent".to_string();
1299 storage.ensure_namespace(&ns).await.unwrap();
1300
1301 let vectors: Vec<Vector> = (0..10).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1302 storage.upsert(&ns, vectors.clone()).await.unwrap();
1303
1304 storage.upsert(&ns, vectors).await.unwrap();
1306 assert_eq!(storage.count(&ns).await.unwrap(), 10);
1307 }
1308
1309 #[tokio::test]
1310 async fn test_upsert_batch_parallel_empty() {
1311 let storage = ObjectStorage::memory().unwrap();
1312 let ns = "batch_empty".to_string();
1313 storage.ensure_namespace(&ns).await.unwrap();
1314
1315 let count = storage.upsert(&ns, vec![]).await.unwrap();
1316 assert_eq!(count, 0);
1317 assert_eq!(storage.count(&ns).await.unwrap(), 0);
1318 }
1319
1320 #[tokio::test]
1321 async fn test_upsert_batch_parallel_large_batch() {
1322 let storage = ObjectStorage::memory().unwrap();
1324 let ns = "batch_large".to_string();
1325 storage.ensure_namespace(&ns).await.unwrap();
1326
1327 let vectors: Vec<Vector> = (0..200).map(|i| make_vector(&format!("v{i}"), 8)).collect();
1328 let count = storage.upsert(&ns, vectors).await.unwrap();
1329
1330 assert_eq!(count, 200);
1331 assert_eq!(storage.count(&ns).await.unwrap(), 200);
1332 }
1333
1334 #[tokio::test]
1338 async fn test_upsert_count_mixed_insert_update_batch() {
1339 let storage = ObjectStorage::memory().unwrap();
1340 let ns = "mixed".to_string();
1341 storage.ensure_namespace(&ns).await.unwrap();
1342
1343 storage
1345 .upsert(&ns, vec![make_vector("v0", 4), make_vector("v1", 4)])
1346 .await
1347 .unwrap();
1348 assert_eq!(storage.count(&ns).await.unwrap(), 2);
1349
1350 storage
1352 .upsert(
1353 &ns,
1354 vec![
1355 make_vector("v1", 4),
1356 make_vector("v2", 4),
1357 make_vector("v3", 4),
1358 ],
1359 )
1360 .await
1361 .unwrap();
1362 assert_eq!(storage.count(&ns).await.unwrap(), 4);
1363
1364 storage
1366 .upsert(&ns, vec![make_vector("v0", 4), make_vector("v3", 4)])
1367 .await
1368 .unwrap();
1369 assert_eq!(storage.count(&ns).await.unwrap(), 4);
1370 }
1371
1372 #[tokio::test]
1376 async fn test_upsert_fs_fast_path_roundtrip() {
1377 let tmp = tempfile::tempdir().unwrap();
1378 let storage = ObjectStorage::filesystem(tmp.path().to_str().unwrap()).unwrap();
1379 let ns = "fsfast".to_string();
1380 storage.ensure_namespace(&ns).await.unwrap();
1381
1382 storage
1384 .upsert(
1385 &ns,
1386 vec![
1387 make_vector("a", 4),
1388 make_vector("b", 4),
1389 make_vector("c", 4),
1390 ],
1391 )
1392 .await
1393 .unwrap();
1394 assert_eq!(storage.count(&ns).await.unwrap(), 3);
1395
1396 let got = storage
1398 .get(&ns, &["a".to_string(), "b".to_string()])
1399 .await
1400 .unwrap();
1401 assert_eq!(got.len(), 2);
1402
1403 storage
1405 .upsert(&ns, vec![make_vector("b", 4), make_vector("d", 4)])
1406 .await
1407 .unwrap();
1408 assert_eq!(storage.count(&ns).await.unwrap(), 4);
1409
1410 storage
1413 .upsert(
1414 &ns,
1415 vec![make_vector("safe1", 4), make_vector("../escape", 4)],
1416 )
1417 .await
1418 .unwrap();
1419 let got = storage.get(&ns, &["safe1".to_string()]).await.unwrap();
1420 assert_eq!(got.len(), 1);
1421 assert!(!tmp.path().parent().unwrap().join("escape.json").exists());
1423 }
1424
1425 #[tokio::test]
1431 async fn test_upsert_incremental_count_growing_namespace_fs() {
1432 let tmp = tempfile::tempdir().unwrap();
1433 let storage = ObjectStorage::filesystem(tmp.path().to_str().unwrap()).unwrap();
1434 let ns = "grow".to_string();
1435 storage.ensure_namespace(&ns).await.unwrap();
1436
1437 const BATCHES: usize = 40;
1438 for b in 0..BATCHES {
1439 let new_id = format!("v{}", b + 1);
1442 let upd_id = format!("v{}", b);
1443 storage
1444 .upsert(&ns, vec![make_vector(&upd_id, 4), make_vector(&new_id, 4)])
1445 .await
1446 .unwrap();
1447 assert_eq!(storage.count(&ns).await.unwrap(), b + 2);
1449 }
1450 assert_eq!(storage.count(&ns).await.unwrap(), BATCHES + 1);
1451 }
1452
1453 #[tokio::test]
1456 async fn test_upsert_incremental_count_growing_namespace_opendal() {
1457 let storage = ObjectStorage::memory().unwrap();
1458 let ns = "grow-od".to_string();
1459 storage.ensure_namespace(&ns).await.unwrap();
1460
1461 const BATCHES: usize = 30;
1462 for b in 0..BATCHES {
1463 let new_id = format!("v{}", b + 1);
1464 let upd_id = format!("v{}", b);
1465 storage
1466 .upsert(&ns, vec![make_vector(&upd_id, 4), make_vector(&new_id, 4)])
1467 .await
1468 .unwrap();
1469 assert_eq!(storage.count(&ns).await.unwrap(), b + 2);
1470 }
1471 assert_eq!(storage.count(&ns).await.unwrap(), BATCHES + 1);
1472 }
1473
1474 #[test]
1480 fn test_write_strategy_selection() {
1481 assert!(!ObjectStorage::needs_tmp_rename_for(
1483 &ObjectStorageConfig::Memory
1484 ));
1485 assert!(!ObjectStorage::needs_tmp_rename_for(
1486 &ObjectStorageConfig::S3 {
1487 bucket: "dakera".to_string(),
1488 region: None,
1489 endpoint: None,
1490 access_key_id: None,
1491 secret_access_key: None,
1492 }
1493 ));
1494 assert!(!ObjectStorage::needs_tmp_rename_for(
1495 &ObjectStorageConfig::Azure {
1496 container: "c".to_string(),
1497 account_name: "acct".to_string(),
1498 account_key: None,
1499 sas_token: None,
1500 endpoint: None,
1501 }
1502 ));
1503 assert!(!ObjectStorage::needs_tmp_rename_for(
1504 &ObjectStorageConfig::Gcs {
1505 bucket: "dakera".to_string(),
1506 credential_path: None,
1507 endpoint: None,
1508 }
1509 ));
1510 assert!(ObjectStorage::needs_tmp_rename_for(
1512 &ObjectStorageConfig::Filesystem {
1513 root: "/tmp/x".to_string(),
1514 }
1515 ));
1516 assert!(!ObjectStorage::memory().unwrap().needs_tmp_rename);
1518 let tmp = tempfile::tempdir().unwrap();
1519 assert!(
1520 ObjectStorage::filesystem(tmp.path().to_str().unwrap())
1521 .unwrap()
1522 .needs_tmp_rename
1523 );
1524 }
1525
1526 #[tokio::test]
1531 async fn test_upsert_direct_write_path_roundtrip() {
1532 let storage = ObjectStorage::memory().unwrap();
1533 assert!(!storage.needs_tmp_rename, "memory must take direct path");
1534 let ns = "direct".to_string();
1535 storage.ensure_namespace(&ns).await.unwrap();
1536
1537 storage
1539 .upsert(&ns, vec![make_vector("a", 4), make_vector("b", 4)])
1540 .await
1541 .unwrap();
1542 assert_eq!(storage.count(&ns).await.unwrap(), 2);
1543
1544 let a_updated = Vector {
1546 id: "a".to_string(),
1547 values: vec![9.0, 9.0, 9.0, 9.0],
1548 metadata: None,
1549 ttl_seconds: None,
1550 expires_at: None,
1551 };
1552 storage.upsert(&ns, vec![a_updated]).await.unwrap();
1553 let got = storage.get(&ns, &["a".to_string()]).await.unwrap();
1554 assert_eq!(got.len(), 1);
1555 assert_eq!(got[0].values, vec![9.0, 9.0, 9.0, 9.0]);
1556 assert_eq!(storage.count(&ns).await.unwrap(), 2);
1558
1559 storage
1561 .upsert(&ns, vec![make_vector("b", 4), make_vector("c", 4)])
1562 .await
1563 .unwrap();
1564 assert_eq!(storage.count(&ns).await.unwrap(), 3);
1565 }
1566
1567 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1578 #[ignore]
1579 async fn profile_upsert_substages() {
1580 use std::time::Instant;
1581 const N: usize = 200;
1582 const DIM: usize = 1024; let conc = s3_concurrent_ops();
1584
1585 let make = |i: usize| -> Vector {
1587 Vector {
1588 id: format!("prof-{i}"),
1589 values: vec![0.0123_f32; DIM],
1590 metadata: Some(serde_json::json!({
1591 "_embedding_kind": "static",
1592 "tags": ["dia_1", "speaker_a", "session_3", "2026-06-03"],
1593 "importance": 0.7,
1594 "session_id": format!("sess-{}", i % 8),
1595 })),
1596 ttl_seconds: None,
1597 expires_at: None,
1598 }
1599 };
1600 let vectors: Vec<Vector> = (0..N).map(make).collect();
1601 let payload_bytes = serde_json::to_vec(&StoredVector::from(vectors[0].clone()))
1602 .unwrap()
1603 .len();
1604
1605 let tmp0 = tempfile::tempdir().unwrap();
1607 let s0 = ObjectStorage::filesystem(tmp0.path().to_str().unwrap()).unwrap();
1608 let ns = "prof".to_string();
1609 s0.ensure_namespace(&ns).await.unwrap();
1610 let t = Instant::now();
1611 let n = s0.upsert(&ns, vectors.clone()).await.unwrap();
1612 let v0_ms = t.elapsed().as_secs_f64() * 1000.0;
1613 assert_eq!(n, N);
1614
1615 async fn timed<F, Fut>(label: &str, n: usize, conc: usize, f: F) -> f64
1617 where
1618 F: Fn(usize, Operator) -> Fut,
1619 Fut: std::future::Future<Output = ()>,
1620 {
1621 let tmp = tempfile::tempdir().unwrap();
1622 let op = ObjectStorage::filesystem(tmp.path().to_str().unwrap())
1623 .unwrap()
1624 .operator;
1625 let t = Instant::now();
1626 stream::iter(0..n)
1627 .map(|i| {
1628 let op = op.clone();
1629 f(i, op)
1630 })
1631 .buffer_unordered(conc)
1632 .collect::<Vec<()>>()
1633 .await;
1634 let ms = t.elapsed().as_secs_f64() * 1000.0;
1635 drop(tmp);
1637 let _ = label;
1638 ms
1639 }
1640
1641 let data: Vec<Vec<u8>> = vectors
1642 .iter()
1643 .map(|v| serde_json::to_vec(&StoredVector::from(v.clone())).unwrap())
1644 .collect();
1645 let path_of = |i: usize| ObjectStorage::vector_path(&ns, &format!("prof-{i}"));
1646
1647 let d = data.clone();
1649 let p_exists = timed("exists", N, conc, move |i, op| {
1650 let path = path_of(i);
1651 let _ = &d;
1652 async move {
1653 let _ = op.exists(&path).await;
1654 }
1655 })
1656 .await;
1657
1658 let d = data.clone();
1660 let p_write_direct = timed("write_direct", N, conc, move |i, op| {
1661 let path = path_of(i);
1662 let bytes = d[i].clone();
1663 async move {
1664 op.write(&path, bytes).await.unwrap();
1665 }
1666 })
1667 .await;
1668
1669 let d = data.clone();
1671 let p_tmp_rename = timed("write_tmp_rename", N, conc, move |i, op| {
1672 let path = path_of(i);
1673 let bytes = d[i].clone();
1674 async move {
1675 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1676 let tmp_path = format!("{path}.tmp.{seq}");
1677 op.write(&tmp_path, bytes).await.unwrap();
1678 op.rename(&tmp_path, &path).await.unwrap();
1679 }
1680 })
1681 .await;
1682
1683 let d = data.clone();
1685 let p_full = timed("exists+tmp+rename", N, conc, move |i, op| {
1686 let path = path_of(i);
1687 let bytes = d[i].clone();
1688 async move {
1689 let _ = op.exists(&path).await;
1690 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1691 let tmp_path = format!("{path}.tmp.{seq}");
1692 op.write(&tmp_path, bytes).await.unwrap();
1693 op.rename(&tmp_path, &path).await.unwrap();
1694 }
1695 })
1696 .await;
1697
1698 let mut sweep = Vec::new();
1701 for &c in &[8usize, 16, 32, 64, 128] {
1702 let d = data.clone();
1703 let ms = timed("sweep", N, c, move |i, op| {
1704 let path = path_of(i);
1705 let bytes = d[i].clone();
1706 async move {
1707 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1708 let tmp_path = format!("{path}.tmp.{seq}");
1709 op.write(&tmp_path, bytes).await.unwrap();
1710 op.rename(&tmp_path, &path).await.unwrap();
1711 }
1712 })
1713 .await;
1714 sweep.push((c, ms));
1715 }
1716
1717 async fn raw_fs(n: usize, conc: usize, bytes: usize, do_sync: bool) -> f64 {
1721 let dir = tempfile::tempdir().unwrap();
1722 let root = dir.path().to_path_buf();
1723 let payload = vec![0u8; bytes];
1724 let t = Instant::now();
1725 stream::iter(0..n)
1726 .map(|i| {
1727 let root = root.clone();
1728 let payload = payload.clone();
1729 async move {
1730 tokio::task::spawn_blocking(move || {
1731 use std::io::Write;
1732 let p = root.join(format!("f{i}.bin"));
1733 let mut f = std::fs::File::create(&p).unwrap();
1734 f.write_all(&payload).unwrap();
1735 if do_sync {
1736 f.sync_all().unwrap();
1737 }
1738 })
1739 .await
1740 .unwrap();
1741 }
1742 })
1743 .buffer_unordered(conc)
1744 .collect::<Vec<()>>()
1745 .await;
1746 let ms = t.elapsed().as_secs_f64() * 1000.0;
1747 drop(dir);
1748 ms
1749 }
1750 let raw_nosync = raw_fs(N, 16, payload_bytes, false).await;
1751 let raw_sync = raw_fs(N, 16, payload_bytes, true).await;
1752
1753 let rate = |ms: f64| N as f64 / (ms / 1000.0);
1754 eprintln!("\n=== DAK-6287 storage_upsert sub-stage profile (fs backend) ===");
1755 eprintln!("N={N} dim={DIM} payload={payload_bytes}B concurrency={conc}");
1756 eprintln!(
1757 "V0 upsert() REAL path : {v0_ms:8.2} ms {:8.1} mem/s",
1758 rate(v0_ms)
1759 );
1760 eprintln!(
1761 "P_full exists+tmp+rename : {p_full:8.2} ms {:8.1} mem/s",
1762 rate(p_full)
1763 );
1764 eprintln!(
1765 "P_exists (stat only) : {p_exists:8.2} ms {:8.1} mem/s",
1766 rate(p_exists)
1767 );
1768 eprintln!(
1769 "P_write_tmp_rename : {p_tmp_rename:8.2} ms {:8.1} mem/s",
1770 rate(p_tmp_rename)
1771 );
1772 eprintln!(
1773 "P_write_direct : {p_write_direct:8.2} ms {:8.1} mem/s",
1774 rate(p_write_direct)
1775 );
1776 eprintln!("--- attribution (of P_full) ---");
1777 eprintln!("exists share : {:5.1}%", 100.0 * p_exists / p_full);
1778 eprintln!("rename overhead: {:5.1}% (tmp+rename {p_tmp_rename:.1} vs direct {p_write_direct:.1})",
1779 100.0 * (p_tmp_rename - p_write_direct).max(0.0) / p_full);
1780 eprintln!("--- concurrency sweep (write tmp+rename) ---");
1781 for (c, ms) in &sweep {
1782 eprintln!("conc={c:4} : {ms:8.2} ms {:8.1} mem/s", rate(*ms));
1783 }
1784 eprintln!("--- fsync isolation (raw std::fs, conc=16, {payload_bytes}B) ---");
1785 eprintln!(
1786 "write no-sync : {raw_nosync:8.2} ms {:8.1} mem/s",
1787 rate(raw_nosync)
1788 );
1789 eprintln!(
1790 "write+sync_all: {raw_sync:8.2} ms {:8.1} mem/s",
1791 rate(raw_sync)
1792 );
1793 eprintln!(
1794 "=> fsync cost factor: {:.1}x",
1795 raw_sync / raw_nosync.max(0.001)
1796 );
1797 }
1798
1799 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1827 #[ignore]
1828 async fn profile_upsert_substages_s3() {
1829 use std::time::Instant;
1830
1831 let endpoint = match std::env::var("DAKERA_PROFILE_S3_ENDPOINT") {
1832 Ok(e) => e,
1833 Err(_) => {
1834 eprintln!(
1835 "SKIP profile_upsert_substages_s3: set DAKERA_PROFILE_S3_ENDPOINT + \
1836 DAKERA_PROFILE_S3_BUCKET (+ AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY) \
1837 to a live MinIO/S3 endpoint to run this profile."
1838 );
1839 return;
1840 }
1841 };
1842 let bucket = std::env::var("DAKERA_PROFILE_S3_BUCKET")
1843 .expect("DAKERA_PROFILE_S3_BUCKET required for S3 profile");
1844 let region = std::env::var("DAKERA_PROFILE_S3_REGION").ok();
1845 let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
1846 let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
1847 let n: usize = std::env::var("DAKERA_PROFILE_N")
1848 .ok()
1849 .and_then(|v| v.parse().ok())
1850 .unwrap_or(100);
1851 const DIM: usize = 1024; let conc = s3_concurrent_ops();
1853
1854 let s3_cfg = ObjectStorageConfig::S3 {
1855 bucket: bucket.clone(),
1856 region: region.clone(),
1857 endpoint: Some(endpoint.clone()),
1858 access_key_id: access_key_id.clone(),
1859 secret_access_key: secret_access_key.clone(),
1860 };
1861
1862 let make = |i: usize| -> Vector {
1864 Vector {
1865 id: format!("prof-{i}"),
1866 values: vec![0.0123_f32; DIM],
1867 metadata: Some(serde_json::json!({
1868 "_embedding_kind": "static",
1869 "tags": ["dia_1", "speaker_a", "session_3", "2026-06-03"],
1870 "importance": 0.7,
1871 "session_id": format!("sess-{}", i % 8),
1872 })),
1873 ttl_seconds: None,
1874 expires_at: None,
1875 }
1876 };
1877 let vectors: Vec<Vector> = (0..n).map(make).collect();
1878 let data: Vec<Vec<u8>> = vectors
1879 .iter()
1880 .map(|v| serde_json::to_vec(&StoredVector::from(v.clone())).unwrap())
1881 .collect();
1882 let payload_bytes = data[0].len();
1883
1884 let s0 = ObjectStorage::new(s3_cfg.clone()).unwrap();
1886 assert!(!s0.needs_tmp_rename, "S3 must take the direct-PUT path");
1887 let ns = "locomo-bench-prof-v0".to_string();
1888 let _ = s0.delete_namespace(&ns).await; s0.ensure_namespace(&ns).await.unwrap();
1890 let t = Instant::now();
1891 let written = s0.upsert(&ns, vectors.clone()).await.unwrap();
1892 let v0_ms = t.elapsed().as_secs_f64() * 1000.0;
1893 assert_eq!(written, n);
1894
1895 let op = ObjectStorage::build_operator(&s3_cfg).unwrap();
1897 async fn timed<F, Fut>(op: &Operator, n: usize, conc: usize, f: F) -> f64
1898 where
1899 F: Fn(usize, Operator) -> Fut,
1900 Fut: std::future::Future<Output = ()>,
1901 {
1902 let t = Instant::now();
1903 stream::iter(0..n)
1904 .map(|i| f(i, op.clone()))
1905 .buffer_unordered(conc)
1906 .collect::<Vec<()>>()
1907 .await;
1908 t.elapsed().as_secs_f64() * 1000.0
1909 }
1910 fn key(phase: &str, i: usize) -> String {
1913 format!("namespaces/locomo-bench-prof-{phase}/vectors/prof-{i}.json")
1914 }
1915
1916 let p_exists = timed(&op, n, conc, |i, op| {
1918 let path = key("exists", i);
1919 async move {
1920 let _ = op.exists(&path).await;
1921 }
1922 })
1923 .await;
1924
1925 let d = data.clone();
1927 let p_put = timed(&op, n, conc, move |i, op| {
1928 let path = key("put", i);
1929 let bytes = d[i].clone();
1930 async move {
1931 op.write(&path, bytes).await.unwrap();
1932 }
1933 })
1934 .await;
1935
1936 let d = data.clone();
1942 let p_waste = timed(&op, n, conc, move |i, op| {
1943 let tmp = format!("{}.tmp", key("waste", i));
1944 let bytes = d[i].clone();
1945 async move {
1946 op.write(&tmp, bytes).await.unwrap();
1947 op.delete(&tmp).await.unwrap();
1948 }
1949 })
1950 .await;
1951
1952 let d = data.clone();
1956 let p_full_old = timed(&op, n, conc, move |i, op| {
1957 let path = key("old", i);
1958 let tmp = format!("{path}.tmp");
1959 let bytes = d[i].clone();
1960 async move {
1961 let _ = op.exists(&path).await;
1962 let rename_ok = async {
1963 op.write(&tmp, bytes.clone()).await?;
1964 op.rename(&tmp, &path).await?;
1965 Ok::<(), opendal::Error>(())
1966 }
1967 .await
1968 .is_ok();
1969 if !rename_ok {
1970 let _ = op.delete(&tmp).await;
1971 op.write(&path, bytes).await.unwrap();
1972 }
1973 }
1974 })
1975 .await;
1976
1977 let d = data.clone();
1979 let p_direct_new = timed(&op, n, conc, move |i, op| {
1980 let path = key("new", i);
1981 let bytes = d[i].clone();
1982 async move {
1983 let _ = op.exists(&path).await;
1984 op.write(&path, bytes).await.unwrap();
1985 }
1986 })
1987 .await;
1988
1989 let mut sweep = Vec::new();
1991 for &c in &[8usize, 16, 32, 64] {
1992 let d = data.clone();
1993 let ms = timed(&op, n, c, move |i, op| {
1994 let path = format!("namespaces/locomo-bench-prof-sweep{c}/vectors/prof-{i}.json");
1995 let bytes = d[i].clone();
1996 async move {
1997 op.write(&path, bytes).await.unwrap();
1998 }
1999 })
2000 .await;
2001 sweep.push((c, ms));
2002 }
2003
2004 let rate = |ms: f64| n as f64 / (ms / 1000.0);
2005 eprintln!("\n=== DAK-6289 storage_upsert sub-stage profile (S3/MinIO backend) ===");
2006 eprintln!("endpoint={endpoint} bucket={bucket} N={n} dim={DIM} payload={payload_bytes}B concurrency={conc}");
2007 eprintln!(
2008 "V0 upsert() REAL (direct PUT): {v0_ms:8.2} ms {:8.1} mem/s",
2009 rate(v0_ms)
2010 );
2011 eprintln!(
2012 "P_full_old exists+PUT+rename : {p_full_old:8.2} ms {:8.1} mem/s <- BEFORE",
2013 rate(p_full_old)
2014 );
2015 eprintln!(
2016 "P_direct_new exists+PUT : {p_direct_new:8.2} ms {:8.1} mem/s <- AFTER",
2017 rate(p_direct_new)
2018 );
2019 eprintln!(
2020 "P_exists (HEAD only) : {p_exists:8.2} ms {:8.1} mem/s",
2021 rate(p_exists)
2022 );
2023 eprintln!(
2024 "P_put (direct PUT only) : {p_put:8.2} ms {:8.1} mem/s",
2025 rate(p_put)
2026 );
2027 eprintln!(
2028 "P_waste (PUT tmp + DELETE) : {p_waste:8.2} ms {:8.1} mem/s (the ops the fix drops)",
2029 rate(p_waste)
2030 );
2031 eprintln!("--- attribution (S3 rename is Unsupported → old path = PUT(tmp)+DELETE(tmp)+PUT(final)) ---");
2032 eprintln!(
2033 "wasted-op share of old body : {:5.1}% ({p_waste:.1}ms of {p_full_old:.1}ms)",
2034 100.0 * p_waste / p_full_old.max(0.001)
2035 );
2036 eprintln!(
2037 "=> direct-PUT speedup (old/new): {:.2}x (end-to-end V0 vs P_full_old: {:.2}x)",
2038 p_full_old / p_direct_new.max(0.001),
2039 p_full_old / v0_ms.max(0.001)
2040 );
2041 eprintln!("--- concurrency sweep (direct PUT) ---");
2042 for (c, ms) in &sweep {
2043 eprintln!("conc={c:4} : {ms:8.2} ms {:8.1} mem/s", rate(*ms));
2044 }
2045
2046 let mut prefixes: Vec<String> = ["v0", "exists", "put", "waste", "old", "new"]
2050 .iter()
2051 .map(|p| format!("namespaces/locomo-bench-prof-{p}/"))
2052 .collect();
2053 for &c in &[8usize, 16, 32, 64] {
2054 prefixes.push(format!("namespaces/locomo-bench-prof-sweep{c}/"));
2055 }
2056 for prefix in prefixes {
2057 let _ = op.delete_with(&prefix).recursive(true).await;
2058 }
2059 }
2060}