yeti_types/backend/traits.rs
1//! Storage backend traits: `KvBackend`, `SnapshotTransaction`, and the
2//! `WriteOp` batch operation type.
3
4use async_trait::async_trait;
5
6use crate::error::{Result, YetiError};
7
8// ============================================================================
9// WriteOp
10// ============================================================================
11
12/// A single write operation in a batch.
13///
14/// Serializable so it can be embedded in [`crate::backend::log::LogEntry`]
15/// and replicated over the wire alongside HLC + originator metadata.
16#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
17pub enum WriteOp {
18 /// Insert or update a key-value pair
19 Put {
20 /// Key bytes
21 key: Vec<u8>,
22 /// Value bytes
23 value: Vec<u8>,
24 },
25 /// Delete a key
26 Delete {
27 /// Key bytes to delete
28 key: Vec<u8>,
29 },
30}
31
32impl WriteOp {
33 /// The key this op targets (regardless of variant).
34 #[must_use]
35 pub fn key(&self) -> &[u8] {
36 match self {
37 Self::Put { key, .. } | Self::Delete { key } => key,
38 }
39 }
40
41 /// True if this is a [`WriteOp::Delete`].
42 #[must_use]
43 pub const fn is_delete(&self) -> bool {
44 matches!(self, Self::Delete { .. })
45 }
46}
47
48// ============================================================================
49// KvBackend trait
50// ============================================================================
51
52/// Drive a backend future to completion from a synchronous context, working on
53/// both runtime flavors. On a multi-threaded runtime we use `block_in_place` so
54/// the worker parks-and-migrates (other tasks keep running); on a current-thread
55/// runtime (tests, the wasm benches) `block_in_place` would panic, so we fall
56/// back to the `futures` executor.
57///
58/// This is only the *fallback* path for the `*_sync` trait methods — hot
59/// backends (`RocksDB`, in-memory) override those with genuinely synchronous
60/// implementations and never reach this helper. It must therefore never be
61/// applied to a future that can only make progress on the very runtime we're
62/// blocking (e.g. a full bounded-channel send); the only in-tree user is the
63/// read passthrough, which is channel-free.
64fn block_on_backend<F: std::future::Future>(fut: F) -> F::Output {
65 // Native: prefer block_in_place on a multi-threaded runtime (parks +
66 // migrates the worker); fall back to the futures executor on current-thread
67 // (where block_in_place panics).
68 #[cfg(not(target_arch = "wasm32"))]
69 {
70 match tokio::runtime::Handle::try_current() {
71 Ok(h)
72 if matches!(
73 h.runtime_flavor(),
74 tokio::runtime::RuntimeFlavor::MultiThread
75 ) =>
76 {
77 tokio::task::block_in_place(|| h.block_on(fut))
78 },
79 _ => futures::executor::block_on(fut),
80 }
81 }
82 // wasm32 guest: no tokio threading (block_in_place doesn't exist), and the
83 // guest never drives a host backend through this path anyway.
84 #[cfg(target_arch = "wasm32")]
85 {
86 futures::executor::block_on(fut)
87 }
88}
89
90/// Trait that all storage backends must implement.
91#[async_trait]
92pub trait KvBackend: Send + Sync {
93 /// Put a key-value pair.
94 async fn put(&self, key: &[u8], value: &[u8]) -> Result<()>;
95
96 /// Get a value by key.
97 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
98
99 /// Synchronous get — backends whose reads complete inline (e.g. `RocksDB`
100 /// via `with_cf_inline`, in-memory `HashMap`) should override this to skip
101 /// the `async_trait` `Box::pin` overhead. The default falls back to the
102 /// async variant via `block_on_backend` so any backend is correct without
103 /// an override; hot backends should override for zero-cost dispatch.
104 ///
105 /// # Errors
106 /// Propagates any storage error from the underlying backend.
107 fn get_sync(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
108 block_on_backend(self.get(key))
109 }
110
111 /// Synchronous put — same rationale as `get_sync`.
112 ///
113 /// # Errors
114 /// Propagates any storage error from the underlying backend.
115 fn put_sync(&self, key: &[u8], value: &[u8]) -> Result<()> {
116 block_on_backend(self.put(key, value))
117 }
118
119 /// Synchronous delete — same rationale as `get_sync`.
120 ///
121 /// # Errors
122 /// Propagates any storage error from the underlying backend.
123 fn delete_sync(&self, key: &[u8]) -> Result<()> {
124 block_on_backend(self.delete(key))
125 }
126
127 /// Check if a key exists without loading the full value.
128 async fn exists(&self, key: &[u8]) -> Result<bool> {
129 Ok(self.get(key).await?.is_some())
130 }
131
132 /// Get multiple values by keys (batch get).
133 async fn get_batch(&self, keys: &[&[u8]]) -> Result<Vec<Option<Vec<u8>>>> {
134 let mut results = Vec::with_capacity(keys.len());
135 for key in keys {
136 results.push(self.get(key).await?);
137 }
138 Ok(results)
139 }
140
141 /// Delete a key.
142 async fn delete(&self, key: &[u8]) -> Result<()>;
143
144 /// Scan keys with a given prefix, returning all matching key-value pairs.
145 async fn scan_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
146
147 /// Scan keys with a given prefix, returning only keys.
148 async fn scan_keys(&self, prefix: &[u8]) -> Result<Vec<Vec<u8>>> {
149 let results = self.scan_prefix(prefix).await?;
150 Ok(results.into_iter().map(|(k, _)| k).collect())
151 }
152
153 /// Count keys with a given prefix.
154 async fn count_prefix(&self, prefix: &[u8]) -> Result<usize>;
155
156 /// Batch write operations (atomic).
157 async fn write_batch(&self, ops: Vec<WriteOp>) -> Result<()>;
158
159 /// Synchronous `write_batch` — same rationale as `get_sync`. Backends
160 /// whose small-batch commit is inline (`RocksDB`, in-memory) should override
161 /// to skip the `async_trait` `Box::pin`. The default falls back to the
162 /// async variant via `block_in_place`.
163 ///
164 /// # Errors
165 /// Propagates any storage error from the underlying backend.
166 fn write_batch_sync(&self, ops: Vec<WriteOp>) -> Result<()> {
167 block_on_backend(self.write_batch(ops))
168 }
169
170 /// Flush any pending writes to disk.
171 async fn flush(&self) -> Result<()>;
172
173 /// Scan keys with a given prefix in reverse order.
174 async fn scan_prefix_reverse(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
175 let mut results = self.scan_prefix(prefix).await?;
176 results.reverse();
177 Ok(results)
178 }
179
180 /// Scan a key range [start, end) returning all key-value pairs.
181 async fn scan_range(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
182 let all = self.scan_prefix(&[]).await?;
183 Ok(all
184 .into_iter()
185 .filter(|(k, _)| k.as_slice() >= start && k.as_slice() < end)
186 .collect())
187 }
188
189 /// Get storage statistics as a JSON object.
190 fn storage_stats(&self) -> Option<serde_json::Value> {
191 None
192 }
193
194 /// Begin a snapshot-isolated transaction.
195 ///
196 /// Returns a transaction handle with read-your-writes consistency and
197 /// optimistic conflict detection on commit. Not all backends support this.
198 async fn begin_transaction(&self) -> Result<Box<dyn SnapshotTransaction>> {
199 Err(YetiError::Internal(
200 "Snapshot transactions not supported by this backend".into(),
201 ))
202 }
203
204 /// Submit a raw JSON record for eventual processing (WAL consumer path).
205 ///
206 /// Strong consistency backends return an error (caller falls back to sync write).
207 async fn submit_record(&self, _primary_key: &str, _json: &[u8], _delete: bool) -> Result<()> {
208 Err(YetiError::Internal(
209 "submit_record not supported by this backend".into(),
210 ))
211 }
212
213 /// Delete all keys in this backend (truncate table).
214 async fn truncate(&self) -> Result<()> {
215 let keys = self.scan_keys(&[]).await?;
216 if keys.is_empty() {
217 return Ok(());
218 }
219 let ops: Vec<WriteOp> = keys
220 .into_iter()
221 .map(|k| WriteOp::Delete { key: k })
222 .collect();
223 self.write_batch(ops).await
224 }
225
226 /// Compare-and-swap put (durable work queue).
227 ///
228 /// Writes `new` for `key` only if the current on-disk bytes match
229 /// `expected` (`None` meaning "the key must not exist"). Returns
230 /// `YetiError::Cas { reason: CasReason::Mismatch }` when the expected
231 /// value does not match.
232 ///
233 /// Default implementation is a non-atomic read+write — usable but racy
234 /// under concurrent load. Real backends override this with a `RocksDB`
235 /// `OptimisticTransactionDB` round-trip (see `yeti-store`).
236 async fn put_if(&self, key: &[u8], expected: Option<&[u8]>, new: &[u8]) -> Result<()> {
237 let current = self.get(key).await?;
238 let matches = match (expected, current.as_deref()) {
239 (None, None) => true,
240 (Some(e), Some(c)) => e == c,
241 _ => false,
242 };
243 if !matches {
244 return Err(YetiError::Cas {
245 reason: crate::error::CasReason::Mismatch,
246 });
247 }
248 self.put(key, new).await
249 }
250}
251
252// ============================================================================
253// SnapshotTransaction
254// ============================================================================
255
256/// A snapshot-isolated transaction handle.
257///
258/// Provides read-your-writes consistency within the transaction scope.
259/// On commit, optimistic conflict detection ensures serializability.
260#[async_trait]
261pub trait SnapshotTransaction: Send + Sync {
262 /// Read a value within the transaction's snapshot.
263 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
264 /// Write a value (visible to subsequent reads within this transaction).
265 async fn put(&self, key: &[u8], value: &[u8]) -> Result<()>;
266 /// Delete a key within the transaction.
267 async fn delete(&self, key: &[u8]) -> Result<()>;
268 /// Commit the transaction (optimistic conflict detection).
269 async fn commit(self: Box<Self>) -> Result<()>;
270 /// Rollback the transaction, discarding all changes.
271 async fn rollback(self: Box<Self>) -> Result<()>;
272}