Skip to main content

yeti_types/backend/
traits.rs

1//! Storage backend traits: `KvBackend`, `SnapshotTransaction`, and the
2//! `WriteOp` batch operation type.
3
4use async_trait::async_trait;
5
6use crate::error::{Result, YetiError};
7
8// ============================================================================
9// WriteOp
10// ============================================================================
11
12/// A single write operation in a batch.
13///
14/// Serializable so it can be embedded in [`crate::backend::log::LogEntry`]
15/// and replicated over the wire alongside HLC + originator metadata.
16#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
17pub enum WriteOp {
18    /// Insert or update a key-value pair
19    Put {
20        /// Key bytes
21        key: Vec<u8>,
22        /// Value bytes
23        value: Vec<u8>,
24    },
25    /// Delete a key
26    Delete {
27        /// Key bytes to delete
28        key: Vec<u8>,
29    },
30}
31
32impl WriteOp {
33    /// The key this op targets (regardless of variant).
34    #[must_use]
35    pub fn key(&self) -> &[u8] {
36        match self {
37            Self::Put { key, .. } | Self::Delete { key } => key,
38        }
39    }
40
41    /// True if this is a [`WriteOp::Delete`].
42    #[must_use]
43    pub const fn is_delete(&self) -> bool {
44        matches!(self, Self::Delete { .. })
45    }
46}
47
48// ============================================================================
49// KvBackend trait
50// ============================================================================
51
52/// Drive a backend future to completion from a synchronous context, working on
53/// both runtime flavors. On a multi-threaded runtime we use `block_in_place` so
54/// the worker parks-and-migrates (other tasks keep running); on a current-thread
55/// runtime (tests, the wasm benches) `block_in_place` would panic, so we fall
56/// back to the `futures` executor.
57///
58/// This is only the *fallback* path for the `*_sync` trait methods — hot
59/// backends (`RocksDB`, in-memory) override those with genuinely synchronous
60/// implementations and never reach this helper. It must therefore never be
61/// applied to a future that can only make progress on the very runtime we're
62/// blocking (e.g. a full bounded-channel send); the only in-tree user is the
63/// read passthrough, which is channel-free.
64fn block_on_backend<F: std::future::Future>(fut: F) -> F::Output {
65    // Native: prefer block_in_place on a multi-threaded runtime (parks +
66    // migrates the worker); fall back to the futures executor on current-thread
67    // (where block_in_place panics).
68    #[cfg(not(target_arch = "wasm32"))]
69    {
70        match tokio::runtime::Handle::try_current() {
71            Ok(h)
72                if matches!(
73                    h.runtime_flavor(),
74                    tokio::runtime::RuntimeFlavor::MultiThread
75                ) =>
76            {
77                tokio::task::block_in_place(|| h.block_on(fut))
78            },
79            _ => futures::executor::block_on(fut),
80        }
81    }
82    // wasm32 guest: no tokio threading (block_in_place doesn't exist), and the
83    // guest never drives a host backend through this path anyway.
84    #[cfg(target_arch = "wasm32")]
85    {
86        futures::executor::block_on(fut)
87    }
88}
89
90/// Trait that all storage backends must implement.
91#[async_trait]
92pub trait KvBackend: Send + Sync {
93    /// Put a key-value pair.
94    async fn put(&self, key: &[u8], value: &[u8]) -> Result<()>;
95
96    /// Get a value by key.
97    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
98
99    /// Synchronous get — backends whose reads complete inline (e.g. `RocksDB`
100    /// via `with_cf_inline`, in-memory `HashMap`) should override this to skip
101    /// the `async_trait` `Box::pin` overhead. The default falls back to the
102    /// async variant via `block_on_backend` so any backend is correct without
103    /// an override; hot backends should override for zero-cost dispatch.
104    ///
105    /// # Errors
106    /// Propagates any storage error from the underlying backend.
107    fn get_sync(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
108        block_on_backend(self.get(key))
109    }
110
111    /// Synchronous put — same rationale as `get_sync`.
112    ///
113    /// # Errors
114    /// Propagates any storage error from the underlying backend.
115    fn put_sync(&self, key: &[u8], value: &[u8]) -> Result<()> {
116        block_on_backend(self.put(key, value))
117    }
118
119    /// Synchronous delete — same rationale as `get_sync`.
120    ///
121    /// # Errors
122    /// Propagates any storage error from the underlying backend.
123    fn delete_sync(&self, key: &[u8]) -> Result<()> {
124        block_on_backend(self.delete(key))
125    }
126
127    /// Check if a key exists without loading the full value.
128    async fn exists(&self, key: &[u8]) -> Result<bool> {
129        Ok(self.get(key).await?.is_some())
130    }
131
132    /// Get multiple values by keys (batch get).
133    async fn get_batch(&self, keys: &[&[u8]]) -> Result<Vec<Option<Vec<u8>>>> {
134        let mut results = Vec::with_capacity(keys.len());
135        for key in keys {
136            results.push(self.get(key).await?);
137        }
138        Ok(results)
139    }
140
141    /// Delete a key.
142    async fn delete(&self, key: &[u8]) -> Result<()>;
143
144    /// Scan keys with a given prefix, returning all matching key-value pairs.
145    async fn scan_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
146
147    /// Scan keys with a given prefix, returning only keys.
148    async fn scan_keys(&self, prefix: &[u8]) -> Result<Vec<Vec<u8>>> {
149        let results = self.scan_prefix(prefix).await?;
150        Ok(results.into_iter().map(|(k, _)| k).collect())
151    }
152
153    /// Count keys with a given prefix.
154    async fn count_prefix(&self, prefix: &[u8]) -> Result<usize>;
155
156    /// Batch write operations (atomic).
157    async fn write_batch(&self, ops: Vec<WriteOp>) -> Result<()>;
158
159    /// Synchronous `write_batch` — same rationale as `get_sync`. Backends
160    /// whose small-batch commit is inline (`RocksDB`, in-memory) should override
161    /// to skip the `async_trait` `Box::pin`. The default falls back to the
162    /// async variant via `block_in_place`.
163    ///
164    /// # Errors
165    /// Propagates any storage error from the underlying backend.
166    fn write_batch_sync(&self, ops: Vec<WriteOp>) -> Result<()> {
167        block_on_backend(self.write_batch(ops))
168    }
169
170    /// Flush any pending writes to disk.
171    async fn flush(&self) -> Result<()>;
172
173    /// Scan keys with a given prefix in reverse order.
174    async fn scan_prefix_reverse(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
175        let mut results = self.scan_prefix(prefix).await?;
176        results.reverse();
177        Ok(results)
178    }
179
180    /// Scan a key range [start, end) returning all key-value pairs.
181    async fn scan_range(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
182        let all = self.scan_prefix(&[]).await?;
183        Ok(all
184            .into_iter()
185            .filter(|(k, _)| k.as_slice() >= start && k.as_slice() < end)
186            .collect())
187    }
188
189    /// Get storage statistics as a JSON object.
190    fn storage_stats(&self) -> Option<serde_json::Value> {
191        None
192    }
193
194    /// Begin a snapshot-isolated transaction.
195    ///
196    /// Returns a transaction handle with read-your-writes consistency and
197    /// optimistic conflict detection on commit. Not all backends support this.
198    async fn begin_transaction(&self) -> Result<Box<dyn SnapshotTransaction>> {
199        Err(YetiError::Internal(
200            "Snapshot transactions not supported by this backend".into(),
201        ))
202    }
203
204    /// Submit a raw JSON record for eventual processing (WAL consumer path).
205    ///
206    /// Strong consistency backends return an error (caller falls back to sync write).
207    async fn submit_record(&self, _primary_key: &str, _json: &[u8], _delete: bool) -> Result<()> {
208        Err(YetiError::Internal(
209            "submit_record not supported by this backend".into(),
210        ))
211    }
212
213    /// Delete all keys in this backend (truncate table).
214    async fn truncate(&self) -> Result<()> {
215        let keys = self.scan_keys(&[]).await?;
216        if keys.is_empty() {
217            return Ok(());
218        }
219        let ops: Vec<WriteOp> = keys
220            .into_iter()
221            .map(|k| WriteOp::Delete { key: k })
222            .collect();
223        self.write_batch(ops).await
224    }
225
226    /// Compare-and-swap put (durable work queue).
227    ///
228    /// Writes `new` for `key` only if the current on-disk bytes match
229    /// `expected` (`None` meaning "the key must not exist"). Returns
230    /// `YetiError::Cas { reason: CasReason::Mismatch }` when the expected
231    /// value does not match.
232    ///
233    /// Default implementation is a non-atomic read+write — usable but racy
234    /// under concurrent load. Real backends override this with a `RocksDB`
235    /// `OptimisticTransactionDB` round-trip (see `yeti-store`).
236    async fn put_if(&self, key: &[u8], expected: Option<&[u8]>, new: &[u8]) -> Result<()> {
237        let current = self.get(key).await?;
238        let matches = match (expected, current.as_deref()) {
239            (None, None) => true,
240            (Some(e), Some(c)) => e == c,
241            _ => false,
242        };
243        if !matches {
244            return Err(YetiError::Cas {
245                reason: crate::error::CasReason::Mismatch,
246            });
247        }
248        self.put(key, new).await
249    }
250}
251
252// ============================================================================
253// SnapshotTransaction
254// ============================================================================
255
256/// A snapshot-isolated transaction handle.
257///
258/// Provides read-your-writes consistency within the transaction scope.
259/// On commit, optimistic conflict detection ensures serializability.
260#[async_trait]
261pub trait SnapshotTransaction: Send + Sync {
262    /// Read a value within the transaction's snapshot.
263    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
264    /// Write a value (visible to subsequent reads within this transaction).
265    async fn put(&self, key: &[u8], value: &[u8]) -> Result<()>;
266    /// Delete a key within the transaction.
267    async fn delete(&self, key: &[u8]) -> Result<()>;
268    /// Commit the transaction (optimistic conflict detection).
269    async fn commit(self: Box<Self>) -> Result<()>;
270    /// Rollback the transaction, discarding all changes.
271    async fn rollback(self: Box<Self>) -> Result<()>;
272}