briefcase_core/storage/
buffered.rs1use crate::models::{DecisionSnapshot, Snapshot};
2use crate::storage::{FlushResult, SnapshotQuery, StorageBackend, StorageError};
3use async_trait::async_trait;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6
7#[allow(clippy::large_enum_variant)]
8enum Command {
9 SaveSnapshot(Snapshot, mpsc::Sender<Result<String, StorageError>>),
10 SaveDecision(DecisionSnapshot, mpsc::Sender<Result<String, StorageError>>),
11}
12
13pub struct BufferedBackend {
14 inner: Arc<dyn StorageBackend>,
15 sender: mpsc::Sender<Command>,
16}
17
18impl BufferedBackend {
19 pub fn new(inner: Arc<dyn StorageBackend>, buffer_size: usize) -> Self {
20 let (tx, mut rx) = mpsc::channel(buffer_size);
21 let inner_clone = inner.clone();
22
23 tokio::spawn(async move {
25 while let Some(cmd) = rx.recv().await {
26 match cmd {
27 Command::SaveSnapshot(snap, reply) => {
28 let res = inner_clone.save(&snap).await;
29 let _ = reply.send(res).await;
30 }
31 Command::SaveDecision(dec, reply) => {
32 let res = inner_clone.save_decision(&dec).await;
33 let _ = reply.send(res).await;
34 }
35 }
36 }
37 });
38
39 Self { inner, sender: tx }
40 }
41}
42
43#[async_trait]
44impl StorageBackend for BufferedBackend {
45 async fn save(&self, snapshot: &Snapshot) -> Result<String, StorageError> {
46 let (tx, mut rx) = mpsc::channel(1);
47 self.sender
48 .send(Command::SaveSnapshot(snapshot.clone(), tx))
49 .await
50 .map_err(|e| StorageError::IoError(e.to_string()))?;
51
52 rx.recv()
55 .await
56 .unwrap_or(Err(StorageError::ConnectionError("Worker died".into())))
57 }
58
59 async fn save_decision(&self, decision: &DecisionSnapshot) -> Result<String, StorageError> {
60 let (tx, mut rx) = mpsc::channel(1);
61 self.sender
62 .send(Command::SaveDecision(decision.clone(), tx))
63 .await
64 .map_err(|e| StorageError::IoError(e.to_string()))?;
65
66 rx.recv()
67 .await
68 .unwrap_or(Err(StorageError::ConnectionError("Worker died".into())))
69 }
70
71 async fn load(&self, id: &str) -> Result<Snapshot, StorageError> {
72 self.inner.load(id).await
73 }
74 async fn load_decision(&self, id: &str) -> Result<DecisionSnapshot, StorageError> {
75 self.inner.load_decision(id).await
76 }
77 async fn query(&self, q: SnapshotQuery) -> Result<Vec<Snapshot>, StorageError> {
78 self.inner.query(q).await
79 }
80 async fn delete(&self, id: &str) -> Result<bool, StorageError> {
81 self.inner.delete(id).await
82 }
83 async fn flush(&self) -> Result<FlushResult, StorageError> {
84 self.inner.flush().await
85 }
86 async fn health_check(&self) -> Result<bool, StorageError> {
87 self.inner.health_check().await
88 }
89}