1use std::ops::Range;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use common::coordinator::Flusher;
11use common::storage::{Storage, StorageSnapshot};
12
13use crate::delta::{VectorDbImmutableDelta, VectorDbWriteDelta};
14
15pub struct VectorDbFlusher {
20 pub storage: Arc<dyn Storage>,
22}
23
24#[async_trait]
25impl Flusher<VectorDbWriteDelta> for VectorDbFlusher {
26 async fn flush_delta(
27 &self,
28 frozen: VectorDbImmutableDelta,
29 _epoch_range: &Range<u64>,
30 ) -> Result<Arc<dyn StorageSnapshot>, String> {
31 self.storage
33 .apply(frozen.ops.clone())
34 .await
35 .map_err(|e| e.to_string())?;
36
37 let snapshot = self.storage.snapshot().await.map_err(|e| e.to_string())?;
39
40 Ok(snapshot)
42 }
43
44 async fn flush_storage(&self) -> std::result::Result<(), String> {
45 self.storage.flush().await.map_err(|e| e.to_string())
46 }
47}
48
49#[cfg(test)]
50mod tests {
51 use super::*;
52 use common::coordinator::Flusher;
53 use common::storage::RecordOp;
54 use common::storage::in_memory::{FailingStorage, InMemoryStorage};
55
56 fn create_failing_storage() -> Arc<FailingStorage> {
57 let inner: Arc<dyn Storage> = Arc::new(InMemoryStorage::new());
58 FailingStorage::wrap(inner)
59 }
60
61 fn create_non_empty_frozen() -> VectorDbImmutableDelta {
62 VectorDbImmutableDelta {
63 ops: vec![RecordOp::Put(
64 common::Record::new(
65 bytes::Bytes::from("test-key"),
66 bytes::Bytes::from("test-value"),
67 )
68 .into(),
69 )],
70 }
71 }
72
73 #[tokio::test]
74 async fn should_propagate_apply_error() {
75 let storage = create_failing_storage();
77 let flusher = VectorDbFlusher {
78 storage: storage.clone(),
79 };
80 storage.fail_apply(common::StorageError::Storage("test apply error".into()));
81
82 let result = flusher
84 .flush_delta(create_non_empty_frozen(), &(1..2))
85 .await;
86
87 let err = result.err().expect("expected apply error");
89 assert!(
90 err.contains("test apply error"),
91 "expected test apply error message, got: {err}"
92 );
93 }
94
95 #[tokio::test]
96 async fn should_propagate_snapshot_error_after_apply() {
97 let storage = create_failing_storage();
99 let flusher = VectorDbFlusher {
100 storage: storage.clone(),
101 };
102 storage.fail_snapshot(common::StorageError::Storage("test snapshot error".into()));
104
105 let result = flusher
107 .flush_delta(create_non_empty_frozen(), &(1..2))
108 .await;
109
110 let err = result.err().expect("expected snapshot error");
112 assert!(
113 err.contains("test snapshot error"),
114 "expected test snapshot error message, got: {err}"
115 );
116 }
117
118 #[tokio::test]
119 async fn should_propagate_flush_storage_error() {
120 let storage = create_failing_storage();
122 let flusher = VectorDbFlusher {
123 storage: storage.clone(),
124 };
125 storage.fail_flush(common::StorageError::Storage("test flush error".into()));
126
127 let result = flusher.flush_storage().await;
129
130 assert!(result.is_err());
132 assert!(
133 result.unwrap_err().contains("test flush error"),
134 "expected test flush error message"
135 );
136 }
137}