slatedb/ops.rs
1use bytes::Bytes;
2use std::ops::RangeBounds;
3use uuid::Uuid;
4
5use crate::batch::WriteBatch;
6use crate::bytes_range::BytesRange;
7use crate::config::{
8 FlushOptions, MergeOptions, PutOptions, ReadOptions, ScanOptions, WriteOptions,
9};
10use crate::db::WriteHandle;
11use crate::db_cache_manager::CacheTarget;
12use crate::db_state::SsTableId;
13use crate::db_status::DbStatus;
14use crate::manifest::VersionedManifest;
15use crate::transaction_manager::IsolationLevel;
16use crate::types::KeyValue;
17use crate::DbIterator;
18
19/// Trait for read-only database operations.
20///
21/// This trait defines the interface for reading data from SlateDB,
22/// and can be implemented by `Db`, `DbReader` and `DbSnapshot`
23/// to provide a unified interface for read-only operations.
24#[async_trait::async_trait]
25pub trait DbReadOps {
26 /// Get a value from the database with default read options.
27 ///
28 /// The `Bytes` object returned contains a slice of an entire
29 /// 4 KiB block. The block will be held in memory as long as the
30 /// caller holds a reference to the `Bytes` object. Consider
31 /// copying the data if you need to hold it for a long time.
32 ///
33 /// ## Arguments
34 /// - `key`: the key to get
35 ///
36 /// ## Returns
37 /// - `Result<Option<Bytes>, Error>`:
38 /// - `Some(Bytes)`: the value if it exists
39 /// - `None`: if the value does not exist
40 ///
41 /// ## Errors
42 /// - `Error`: if there was an error getting the value
43 async fn get<K: AsRef<[u8]> + Send>(&self, key: K) -> Result<Option<Bytes>, crate::Error> {
44 self.get_with_options(key, &ReadOptions::default()).await
45 }
46
47 /// Get a value from the database with custom read options.
48 ///
49 /// The `Bytes` object returned contains a slice of an entire
50 /// 4 KiB block. The block will be held in memory as long as the
51 /// caller holds a reference to the `Bytes` object. Consider
52 /// copying the data if you need to hold it for a long time.
53 ///
54 /// ## Arguments
55 /// - `key`: the key to get
56 /// - `options`: the read options to use
57 ///
58 /// ## Returns
59 /// - `Result<Option<Bytes>, Error>`:
60 /// - `Some(Bytes)`: the value if it exists
61 /// - `None`: if the value does not exist
62 ///
63 /// ## Errors
64 /// - `Error`: if there was an error getting the value
65 async fn get_with_options<K: AsRef<[u8]> + Send>(
66 &self,
67 key: K,
68 options: &ReadOptions,
69 ) -> Result<Option<Bytes>, crate::Error>;
70
71 /// Get a key-value pair from the database with default read options.
72 ///
73 /// Returns the key along with its value and metadata (sequence number,
74 /// creation timestamp, expiration timestamp). Unlike [`get`](Self::get),
75 /// which returns only the value bytes, this method returns a [`KeyValue`]
76 /// that includes row metadata.
77 ///
78 /// ## Arguments
79 /// - `key`: the key to look up
80 ///
81 /// ## Returns
82 /// - `Ok(Some(KeyValue))`: if the key exists and is not deleted/expired
83 /// - `Ok(None)`: if the key does not exist or is deleted/expired
84 ///
85 /// ## Errors
86 /// - `Error`: if there was an error reading from the database
87 async fn get_key_value<K: AsRef<[u8]> + Send>(
88 &self,
89 key: K,
90 ) -> Result<Option<KeyValue>, crate::Error> {
91 self.get_key_value_with_options(key, &ReadOptions::default())
92 .await
93 }
94
95 /// Get a key-value pair from the database with custom read options.
96 ///
97 /// Returns the key along with its value and metadata (sequence number,
98 /// creation timestamp, expiration timestamp). Unlike
99 /// [`get_with_options`](Self::get_with_options), which returns only the
100 /// value bytes, this method returns a [`KeyValue`] that includes row
101 /// metadata.
102 ///
103 /// ## Arguments
104 /// - `key`: the key to look up
105 /// - `options`: the read options to use
106 ///
107 /// ## Returns
108 /// - `Ok(Some(KeyValue))`: if the key exists and is not deleted/expired
109 /// - `Ok(None)`: if the key does not exist or is deleted/expired
110 ///
111 /// ## Errors
112 /// - `Error`: if there was an error reading from the database
113 async fn get_key_value_with_options<K: AsRef<[u8]> + Send>(
114 &self,
115 key: K,
116 options: &ReadOptions,
117 ) -> Result<Option<KeyValue>, crate::Error>;
118
119 /// Scan a range of keys using the default scan options.
120 ///
121 /// returns a `DbIterator`
122 ///
123 /// ## Arguments
124 /// - `range`: the range of keys to scan
125 ///
126 /// ## Errors
127 /// - `Error`: if there was an error scanning the range of keys
128 ///
129 /// ## Returns
130 /// - `Result<DbIterator, Error>`: An iterator with the results of the scan
131 async fn scan<K, T>(&self, range: T) -> Result<DbIterator, crate::Error>
132 where
133 K: AsRef<[u8]> + Send,
134 T: RangeBounds<K> + Send,
135 {
136 self.scan_with_options(range, &ScanOptions::default()).await
137 }
138
139 /// Scan a range of keys with the provided options.
140 ///
141 /// returns a `DbIterator`
142 ///
143 /// ## Arguments
144 /// - `range`: the range of keys to scan
145 /// - `options`: the scan options to use
146 ///
147 /// ## Errors
148 /// - `Error`: if there was an error scanning the range of keys
149 ///
150 /// ## Returns
151 /// - `Result<DbIterator, Error>`: An iterator with the results of the scan
152 async fn scan_with_options<K, T>(
153 &self,
154 range: T,
155 options: &ScanOptions,
156 ) -> Result<DbIterator, crate::Error>
157 where
158 K: AsRef<[u8]> + Send,
159 T: RangeBounds<K> + Send;
160
161 /// Scan all keys that share the provided prefix using the default scan options.
162 ///
163 /// ## Arguments
164 /// - `prefix`: the key prefix to scan
165 ///
166 /// ## Returns
167 /// - `Result<DbIterator, Error>`: An iterator with the results of the scan
168 async fn scan_prefix<P>(&self, prefix: P) -> Result<DbIterator, crate::Error>
169 where
170 P: AsRef<[u8]> + Send,
171 {
172 self.scan_prefix_with_options(prefix, &ScanOptions::default())
173 .await
174 }
175
176 /// Scan all keys that share the provided prefix with custom options.
177 ///
178 /// ## Arguments
179 /// - `prefix`: the key prefix to scan
180 /// - `options`: the scan options to use
181 ///
182 /// ## Returns
183 /// - `Result<DbIterator, Error>`: An iterator with the results of the scan
184 async fn scan_prefix_with_options<P>(
185 &self,
186 prefix: P,
187 options: &ScanOptions,
188 ) -> Result<DbIterator, crate::Error>
189 where
190 P: AsRef<[u8]> + Send,
191 {
192 let range = BytesRange::from_prefix(prefix.as_ref());
193 self.scan_with_options(range, options).await
194 }
195}
196
197/// Trait for write-side database operations.
198///
199/// This trait defines the asynchronous write API exposed by [`Db`](crate::Db),
200/// allowing consumers to write generic code or test doubles over the writer
201/// surface without depending on the concrete `Db` type.
202#[async_trait::async_trait]
203pub trait DbWriteOps {
204 /// The transaction type returned by [`Self::begin`]. Stub
205 /// implementations supply their own [`DbTransactionOps`] type, while
206 /// the real `Db` returns a `DbTransaction`.
207 type Transaction: DbTransactionOps + Send;
208
209 /// Write a value into the database with default `PutOptions` and
210 /// `WriteOptions`.
211 ///
212 /// ## Arguments
213 /// - `key`: the key to write
214 /// - `value`: the value to write
215 ///
216 /// ## Errors
217 /// - `Error`: if there was an error writing the value.
218 async fn put<K, V>(&self, key: K, value: V) -> Result<WriteHandle, crate::Error>
219 where
220 K: AsRef<[u8]> + Send,
221 V: AsRef<[u8]> + Send,
222 {
223 self.put_with_options(key, value, &PutOptions::default(), &WriteOptions::default())
224 .await
225 }
226
227 /// Write a value into the database with custom `PutOptions` and
228 /// `WriteOptions`.
229 ///
230 /// ## Arguments
231 /// - `key`: the key to write
232 /// - `value`: the value to write
233 /// - `put_opts`: the put options to use
234 /// - `write_opts`: the write options to use
235 ///
236 /// ## Errors
237 /// - `Error`: if there was an error writing the value.
238 async fn put_with_options<K, V>(
239 &self,
240 key: K,
241 value: V,
242 put_opts: &PutOptions,
243 write_opts: &WriteOptions,
244 ) -> Result<WriteHandle, crate::Error>
245 where
246 K: AsRef<[u8]> + Send,
247 V: AsRef<[u8]> + Send;
248
249 /// Delete a key from the database with default `WriteOptions`.
250 ///
251 /// ## Arguments
252 /// - `key`: the key to delete
253 ///
254 /// ## Errors
255 /// - `Error`: if there was an error deleting the key.
256 async fn delete<K: AsRef<[u8]> + Send>(&self, key: K) -> Result<WriteHandle, crate::Error> {
257 self.delete_with_options(key, &WriteOptions::default())
258 .await
259 }
260
261 /// Delete a key from the database with custom `WriteOptions`.
262 ///
263 /// ## Arguments
264 /// - `key`: the key to delete
265 /// - `options`: the write options to use
266 ///
267 /// ## Errors
268 /// - `Error`: if there was an error deleting the key.
269 async fn delete_with_options<K: AsRef<[u8]> + Send>(
270 &self,
271 key: K,
272 options: &WriteOptions,
273 ) -> Result<WriteHandle, crate::Error>;
274
275 /// Merge a value into the database with default `MergeOptions` and
276 /// `WriteOptions`.
277 ///
278 /// Merge operations allow applications to bypass the traditional
279 /// read/modify/write cycle by expressing partial updates using an
280 /// associative operator. The merge operator must be configured when
281 /// opening the database.
282 ///
283 /// ## Arguments
284 /// - `key`: the key to merge into
285 /// - `value`: the merge operand to apply
286 ///
287 /// ## Errors
288 /// - `Error`: if there was an error merging the value, or if no merge
289 /// operator is configured.
290 async fn merge<K, V>(&self, key: K, value: V) -> Result<WriteHandle, crate::Error>
291 where
292 K: AsRef<[u8]> + Send,
293 V: AsRef<[u8]> + Send,
294 {
295 self.merge_with_options(
296 key,
297 value,
298 &MergeOptions::default(),
299 &WriteOptions::default(),
300 )
301 .await
302 }
303
304 /// Merge a value into the database with custom `MergeOptions` and
305 /// `WriteOptions`.
306 ///
307 /// ## Arguments
308 /// - `key`: the key to merge into
309 /// - `value`: the merge operand to apply
310 /// - `merge_opts`: the merge options to use
311 /// - `write_opts`: the write options to use
312 ///
313 /// ## Errors
314 /// - `Error`: if there was an error merging the value, or if no merge
315 /// operator is configured.
316 async fn merge_with_options<K, V>(
317 &self,
318 key: K,
319 value: V,
320 merge_opts: &MergeOptions,
321 write_opts: &WriteOptions,
322 ) -> Result<WriteHandle, crate::Error>
323 where
324 K: AsRef<[u8]> + Send,
325 V: AsRef<[u8]> + Send;
326
327 /// Write a batch of put/delete operations atomically to the database.
328 ///
329 /// ## Arguments
330 /// - `batch`: the batch of operations to write
331 ///
332 /// ## Errors
333 /// - `Error`: if there was an error writing the batch.
334 async fn write(&self, batch: WriteBatch) -> Result<WriteHandle, crate::Error> {
335 self.write_with_options(batch, &WriteOptions::default())
336 .await
337 }
338
339 /// Write a batch of put/delete operations atomically to the database with
340 /// custom `WriteOptions`.
341 ///
342 /// ## Arguments
343 /// - `batch`: the batch of operations to write
344 /// - `options`: the write options to use
345 ///
346 /// ## Errors
347 /// - `Error`: if there was an error writing the batch.
348 async fn write_with_options(
349 &self,
350 batch: WriteBatch,
351 options: &WriteOptions,
352 ) -> Result<WriteHandle, crate::Error>;
353
354 /// Flush in-memory writes to disk. This function blocks until the
355 /// in-memory data has been durably written to object storage.
356 ///
357 /// ## Errors
358 /// - `Error`: if there was an error flushing the database.
359 async fn flush(&self) -> Result<(), crate::Error>;
360
361 /// Flush in-memory writes to disk with custom options.
362 ///
363 /// An error will be returned if `options.flush_type` is `FlushType::Wal`
364 /// and the WAL is disabled.
365 ///
366 /// ## Arguments
367 /// - `options`: the flush options
368 ///
369 /// ## Errors
370 /// - `Error`: if there was an error flushing the database.
371 async fn flush_with_options(&self, options: FlushOptions) -> Result<(), crate::Error>;
372
373 /// Begin a new transaction with the specified isolation level.
374 ///
375 /// ## Arguments
376 /// - `isolation_level`: the isolation level for the transaction
377 ///
378 /// ## Returns
379 /// - `Result<Self::Transaction, Error>`: the transaction handle
380 async fn begin(
381 &self,
382 isolation_level: IsolationLevel,
383 ) -> Result<Self::Transaction, crate::Error>;
384}
385
386/// Trait for transactional database operations.
387///
388/// This trait defines the synchronous write API and lifecycle operations
389/// exposed by [`DbTransaction`](crate::DbTransaction), and extends
390/// [`DbReadOps`] so consumers can write generic code or test doubles over
391/// the full transaction surface without depending on the concrete
392/// `DbTransaction` type.
393#[async_trait::async_trait]
394pub trait DbTransactionOps: DbReadOps {
395 /// Put a key-value pair into the transaction with default `PutOptions`.
396 /// The write is buffered in the transaction's write batch until commit.
397 fn put<K, V>(&self, key: K, value: V) -> Result<(), crate::Error>
398 where
399 K: AsRef<[u8]>,
400 V: AsRef<[u8]>,
401 {
402 self.put_with_options(key, value, &PutOptions::default())
403 }
404
405 /// Put a key-value pair into the transaction with custom `PutOptions`.
406 /// The write is buffered in the transaction's write batch until commit.
407 fn put_with_options<K, V>(
408 &self,
409 key: K,
410 value: V,
411 options: &PutOptions,
412 ) -> Result<(), crate::Error>
413 where
414 K: AsRef<[u8]>,
415 V: AsRef<[u8]>;
416
417 /// Delete a key from the transaction. The delete is buffered in the
418 /// transaction's write batch until commit.
419 fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), crate::Error>;
420
421 /// Merge a key-value pair into the transaction with default
422 /// `MergeOptions`.
423 ///
424 /// ## Errors
425 /// - `Error`: if no merge operator is configured for the database.
426 fn merge<K, V>(&self, key: K, value: V) -> Result<(), crate::Error>
427 where
428 K: AsRef<[u8]>,
429 V: AsRef<[u8]>,
430 {
431 self.merge_with_options(key, value, &MergeOptions::default())
432 }
433
434 /// Merge a key-value pair into the transaction with custom `MergeOptions`.
435 ///
436 /// ## Errors
437 /// - `Error`: if no merge operator is configured for the database.
438 fn merge_with_options<K, V>(
439 &self,
440 key: K,
441 value: V,
442 options: &MergeOptions,
443 ) -> Result<(), crate::Error>
444 where
445 K: AsRef<[u8]>,
446 V: AsRef<[u8]>;
447
448 /// Mark keys as read for conflict detection.
449 ///
450 /// When keys are marked as read, the transaction will detect conflicts
451 /// if another transaction modifies any of those keys after this
452 /// transaction started, regardless of the isolation level.
453 fn mark_read<K, I>(&self, keys: I) -> Result<(), crate::Error>
454 where
455 K: AsRef<[u8]>,
456 I: IntoIterator<Item = K>;
457
458 /// Mark written keys as untracked for conflict detection.
459 ///
460 /// Keys marked with this method are still written atomically with the
461 /// rest of the transaction, but are excluded from transaction conflict
462 /// detection on commit for both this transaction and other transactions.
463 fn unmark_write<K, I>(&self, keys: I) -> Result<(), crate::Error>
464 where
465 K: AsRef<[u8]>,
466 I: IntoIterator<Item = K>;
467
468 /// Get the sequence number this transaction was started at.
469 fn seqnum(&self) -> u64;
470
471 /// Get the unique transaction ID assigned by the transaction manager.
472 fn id(&self) -> Uuid;
473
474 /// Commit the transaction with default `WriteOptions`.
475 ///
476 /// ## Returns
477 /// - `Ok(Some(WriteHandle))` if the commit is successful and there are
478 /// writes in the batch.
479 /// - `Ok(None)` if the commit is successful but the write batch is empty.
480 ///
481 /// ## Errors
482 /// - `Error`: if the commit operation fails (I/O errors or conflict
483 /// detection).
484 async fn commit(self) -> Result<Option<WriteHandle>, crate::Error>
485 where
486 Self: Sized + Send,
487 {
488 self.commit_with_options(&WriteOptions::default()).await
489 }
490
491 /// Commit the transaction with custom `WriteOptions`.
492 async fn commit_with_options(
493 self,
494 options: &WriteOptions,
495 ) -> Result<Option<WriteHandle>, crate::Error>
496 where
497 Self: Sized + Send;
498
499 /// Rollback the transaction by discarding all buffered operations.
500 fn rollback(self)
501 where
502 Self: Sized;
503}
504
505/// Trait for database metadata operations.
506///
507/// This trait provides access to database status and manifest information,
508/// implemented by [`Db`](crate::Db) and [`DbReader`](crate::DbReader) to
509/// provide a unified interface for metadata access.
510///
511/// The trait is object-safe, allowing for dynamic dispatch when needed.
512pub trait DbMetadataOps {
513 /// Get the current manifest state.
514 ///
515 /// Returns the current manifest snapshot known to this handle, paired
516 /// with its manifest version ID.
517 fn manifest(&self) -> VersionedManifest;
518
519 /// Subscribe to database state changes.
520 ///
521 /// Returns a [`tokio::sync::watch::Receiver<DbStatus>`] that always
522 /// reflects the latest database status. The status includes the latest durable
523 /// sequence number and the current manifest snapshot observed by this
524 /// handle. For [`Db`](crate::Db) is is the current in-memory snapshot and
525 /// for [`DbReader`](crate::DbReader) it is the latest manifest polled from object storage.
526 /// For example, you can wait for a specific sequence number to
527 /// become durable:
528 ///
529 /// ```ignore
530 /// let seq = 42; // sequence number from a write operation
531 /// let mut rx = db.subscribe();
532 /// rx.wait_for(|s| s.durable_seq >= seq).await.expect("db dropped");
533 /// ```
534 ///
535 /// # Deadlock risk
536 ///
537 /// The returned receiver holds a read lock on the current value while
538 /// borrowed (via [`borrow`](tokio::sync::watch::Receiver::borrow),
539 /// [`borrow_and_update`](tokio::sync::watch::Receiver::borrow_and_update),
540 /// or the guard returned by [`wait_for`](tokio::sync::watch::Receiver::wait_for)).
541 /// The database must acquire a write lock to publish new status updates.
542 /// Holding the read guard for an extended period will block all database status
543 /// updates and may cause a deadlock. See the [deadlock warning in
544 /// `Receiver::borrow`](https://docs.rs/tokio/latest/tokio/sync/watch/struct.Receiver.html#method.borrow)
545 /// for details. Always clone or copy the data you need:
546 ///
547 /// ```ignore
548 /// // Good: clone the status and release the lock immediately.
549 /// let status = rx.borrow().clone();
550 /// some_async_fn(status.durable_seq).await;
551 /// some_other_async_fn(status.current_manifest.clone()).await;
552 ///
553 /// // Good: copy the durable seq and release the lock immediately.
554 /// let durable_seq = rx.borrow().durable_seq; // uses Copy trait
555 /// some_async_fn(durable_seq).await;
556 ///
557 /// // Bad: holding the status across an await blocks all senders.
558 /// let status = rx.borrow();
559 /// some_async_fn(status.durable_seq).await; // deadlock!
560 /// ```
561 fn subscribe(&self) -> tokio::sync::watch::Receiver<DbStatus>;
562
563 /// Returns the latest database status.
564 ///
565 /// This is a snapshot of the current state and will not update automatically.
566 /// Use [`subscribe`](DbMetadataOps::subscribe) to receive real-time updates.
567 fn status(&self) -> DbStatus;
568}
569
570/// Trait for block-cache warming and eviction operations.
571#[async_trait::async_trait]
572pub trait DbCacheManagerOps {
573 /// Warms selected cache content for one SST.
574 ///
575 /// Callers fan out over SSTs themselves (for example with
576 /// `FuturesUnordered`) to get the concurrency they want. Per-target
577 /// outcomes are reflected in cache-manager metrics, not the return value.
578 ///
579 /// Returns `Err` on the first failing target. If no block cache is
580 /// configured, or if the SST is not reachable from the current manifest,
581 /// the call is a no-op that returns `Ok(())`.
582 async fn warm_sst(
583 &self,
584 sst_id: SsTableId,
585 targets: &[CacheTarget],
586 ) -> Result<(), crate::Error>;
587
588 /// Best-effort eviction of block-cache entries for one SST.
589 ///
590 /// If no block cache is configured, logs a warning and returns `Ok(())`.
591 /// Does not check whether the SST is still live in the current manifest —
592 /// callers own that policy.
593 async fn evict_cached_sst(&self, sst_id: SsTableId) -> Result<(), crate::Error>;
594}
595
596#[cfg(test)]
597mod tests {
598 use super::*;
599
600 // Compile-time check: the trait is object-safe.
601 fn _assert_object_safe(_: &dyn DbMetadataOps) {}
602 fn _assert_cache_manager_ops_object_safe(_: &dyn DbCacheManagerOps) {}
603}