Skip to main content

vector/
flusher.rs

1//! Flusher implementation for VectorDb.
2//!
3//! This module contains the `VectorDbFlusher` which applies accumulated
4//! RecordOps to storage atomically and updates the shared snapshot for queries.
5
6use std::ops::Range;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use common::coordinator::Flusher;
11use common::storage::{Storage, StorageSnapshot};
12
13use crate::delta::{VectorDbImmutableDelta, VectorDbWriteDelta};
14
15/// Flusher implementation for VectorDb.
16///
17/// Applies the accumulated RecordOps to storage atomically and updates
18/// the shared snapshot for queries.
19pub struct VectorDbFlusher {
20    /// Storage backend.
21    pub storage: Arc<dyn Storage>,
22}
23
24#[async_trait]
25impl Flusher<VectorDbWriteDelta> for VectorDbFlusher {
26    async fn flush_delta(
27        &self,
28        frozen: VectorDbImmutableDelta,
29        _epoch_range: &Range<u64>,
30    ) -> Result<Arc<dyn StorageSnapshot>, String> {
31        // Apply all ops atomically
32        self.storage
33            .apply(frozen.ops.clone())
34            .await
35            .map_err(|e| e.to_string())?;
36
37        // Get new snapshot
38        let snapshot = self.storage.snapshot().await.map_err(|e| e.to_string())?;
39
40        // StorageSnapshot extends StorageRead, so we can return it directly
41        Ok(snapshot)
42    }
43
44    async fn flush_storage(&self) -> std::result::Result<(), String> {
45        self.storage.flush().await.map_err(|e| e.to_string())
46    }
47}
48
49#[cfg(test)]
50mod tests {
51    use super::*;
52    use common::coordinator::Flusher;
53    use common::storage::RecordOp;
54    use common::storage::in_memory::{FailingStorage, InMemoryStorage};
55
56    fn create_failing_storage() -> Arc<FailingStorage> {
57        let inner: Arc<dyn Storage> = Arc::new(InMemoryStorage::new());
58        FailingStorage::wrap(inner)
59    }
60
61    fn create_non_empty_frozen() -> VectorDbImmutableDelta {
62        VectorDbImmutableDelta {
63            ops: vec![RecordOp::Put(
64                common::Record::new(
65                    bytes::Bytes::from("test-key"),
66                    bytes::Bytes::from("test-value"),
67                )
68                .into(),
69            )],
70        }
71    }
72
73    #[tokio::test]
74    async fn should_propagate_apply_error() {
75        // given
76        let storage = create_failing_storage();
77        let flusher = VectorDbFlusher {
78            storage: storage.clone(),
79        };
80        storage.fail_apply(common::StorageError::Storage("test apply error".into()));
81
82        // when
83        let result = flusher
84            .flush_delta(create_non_empty_frozen(), &(1..2))
85            .await;
86
87        // then
88        let err = result.err().expect("expected apply error");
89        assert!(
90            err.contains("test apply error"),
91            "expected test apply error message, got: {err}"
92        );
93    }
94
95    #[tokio::test]
96    async fn should_propagate_snapshot_error_after_apply() {
97        // given
98        let storage = create_failing_storage();
99        let flusher = VectorDbFlusher {
100            storage: storage.clone(),
101        };
102        // Apply succeeds, but snapshot after apply fails
103        storage.fail_snapshot(common::StorageError::Storage("test snapshot error".into()));
104
105        // when
106        let result = flusher
107            .flush_delta(create_non_empty_frozen(), &(1..2))
108            .await;
109
110        // then
111        let err = result.err().expect("expected snapshot error");
112        assert!(
113            err.contains("test snapshot error"),
114            "expected test snapshot error message, got: {err}"
115        );
116    }
117
118    #[tokio::test]
119    async fn should_propagate_flush_storage_error() {
120        // given
121        let storage = create_failing_storage();
122        let flusher = VectorDbFlusher {
123            storage: storage.clone(),
124        };
125        storage.fail_flush(common::StorageError::Storage("test flush error".into()));
126
127        // when
128        let result = flusher.flush_storage().await;
129
130        // then
131        assert!(result.is_err());
132        assert!(
133            result.unwrap_err().contains("test flush error"),
134            "expected test flush error message"
135        );
136    }
137}