Skip to main content

briefcase_core/storage/
buffered.rs

1use crate::models::{DecisionSnapshot, Snapshot};
2use crate::storage::{FlushResult, SnapshotQuery, StorageBackend, StorageError};
3use async_trait::async_trait;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6
7#[allow(clippy::large_enum_variant)]
8enum Command {
9    SaveSnapshot(Snapshot, mpsc::Sender<Result<String, StorageError>>),
10    SaveDecision(DecisionSnapshot, mpsc::Sender<Result<String, StorageError>>),
11}
12
13pub struct BufferedBackend {
14    inner: Arc<dyn StorageBackend>,
15    sender: mpsc::Sender<Command>,
16}
17
18impl BufferedBackend {
19    pub fn new(inner: Arc<dyn StorageBackend>, buffer_size: usize) -> Self {
20        let (tx, mut rx) = mpsc::channel(buffer_size);
21        let inner_clone = inner.clone();
22
23        // Spawn background worker
24        tokio::spawn(async move {
25            while let Some(cmd) = rx.recv().await {
26                match cmd {
27                    Command::SaveSnapshot(snap, reply) => {
28                        let res = inner_clone.save(&snap).await;
29                        let _ = reply.send(res).await;
30                    }
31                    Command::SaveDecision(dec, reply) => {
32                        let res = inner_clone.save_decision(&dec).await;
33                        let _ = reply.send(res).await;
34                    }
35                }
36            }
37        });
38
39        Self { inner, sender: tx }
40    }
41}
42
43#[async_trait]
44impl StorageBackend for BufferedBackend {
45    async fn save(&self, snapshot: &Snapshot) -> Result<String, StorageError> {
46        let (tx, mut rx) = mpsc::channel(1);
47        self.sender
48            .send(Command::SaveSnapshot(snapshot.clone(), tx))
49            .await
50            .map_err(|e| StorageError::IoError(e.to_string()))?;
51
52        // Return immediately or wait? For "performance" mode, we return a virtual ID
53        // but for this implementation we'll wait to ensure reliability in standard mode
54        rx.recv()
55            .await
56            .unwrap_or(Err(StorageError::ConnectionError("Worker died".into())))
57    }
58
59    async fn save_decision(&self, decision: &DecisionSnapshot) -> Result<String, StorageError> {
60        let (tx, mut rx) = mpsc::channel(1);
61        self.sender
62            .send(Command::SaveDecision(decision.clone(), tx))
63            .await
64            .map_err(|e| StorageError::IoError(e.to_string()))?;
65
66        rx.recv()
67            .await
68            .unwrap_or(Err(StorageError::ConnectionError("Worker died".into())))
69    }
70
71    async fn load(&self, id: &str) -> Result<Snapshot, StorageError> {
72        self.inner.load(id).await
73    }
74    async fn load_decision(&self, id: &str) -> Result<DecisionSnapshot, StorageError> {
75        self.inner.load_decision(id).await
76    }
77    async fn query(&self, q: SnapshotQuery) -> Result<Vec<Snapshot>, StorageError> {
78        self.inner.query(q).await
79    }
80    async fn delete(&self, id: &str) -> Result<bool, StorageError> {
81        self.inner.delete(id).await
82    }
83    async fn flush(&self) -> Result<FlushResult, StorageError> {
84        self.inner.flush().await
85    }
86    async fn health_check(&self) -> Result<bool, StorageError> {
87        self.inner.health_check().await
88    }
89}