Skip to main content

slatedb/
ops.rs

1use bytes::Bytes;
2use std::ops::RangeBounds;
3use uuid::Uuid;
4
5use crate::batch::WriteBatch;
6use crate::bytes_range::BytesRange;
7use crate::config::{
8    FlushOptions, MergeOptions, PutOptions, ReadOptions, ScanOptions, WriteOptions,
9};
10use crate::db::WriteHandle;
11use crate::db_cache_manager::CacheTarget;
12use crate::db_state::SsTableId;
13use crate::db_status::DbStatus;
14use crate::manifest::VersionedManifest;
15use crate::transaction_manager::IsolationLevel;
16use crate::types::KeyValue;
17use crate::DbIterator;
18
19/// Trait for read-only database operations.
20///
21/// This trait defines the interface for reading data from SlateDB,
22/// and can be implemented by `Db`, `DbReader` and `DbSnapshot`
23/// to provide a unified interface for read-only operations.
24#[async_trait::async_trait]
25pub trait DbReadOps {
26    /// Get a value from the database with default read options.
27    ///
28    /// The `Bytes` object returned contains a slice of an entire
29    /// 4 KiB block. The block will be held in memory as long as the
30    /// caller holds a reference to the `Bytes` object. Consider
31    /// copying the data if you need to hold it for a long time.
32    ///
33    /// ## Arguments
34    /// - `key`: the key to get
35    ///
36    /// ## Returns
37    /// - `Result<Option<Bytes>, Error>`:
38    ///     - `Some(Bytes)`: the value if it exists
39    ///     - `None`: if the value does not exist
40    ///
41    /// ## Errors
42    /// - `Error`: if there was an error getting the value
43    async fn get<K: AsRef<[u8]> + Send>(&self, key: K) -> Result<Option<Bytes>, crate::Error> {
44        self.get_with_options(key, &ReadOptions::default()).await
45    }
46
47    /// Get a value from the database with custom read options.
48    ///
49    /// The `Bytes` object returned contains a slice of an entire
50    /// 4 KiB block. The block will be held in memory as long as the
51    /// caller holds a reference to the `Bytes` object. Consider
52    /// copying the data if you need to hold it for a long time.
53    ///
54    /// ## Arguments
55    /// - `key`: the key to get
56    /// - `options`: the read options to use
57    ///
58    /// ## Returns
59    /// - `Result<Option<Bytes>, Error>`:
60    ///   - `Some(Bytes)`: the value if it exists
61    ///   - `None`: if the value does not exist
62    ///
63    /// ## Errors
64    /// - `Error`: if there was an error getting the value
65    async fn get_with_options<K: AsRef<[u8]> + Send>(
66        &self,
67        key: K,
68        options: &ReadOptions,
69    ) -> Result<Option<Bytes>, crate::Error>;
70
71    /// Get a key-value pair from the database with default read options.
72    ///
73    /// Returns the key along with its value and metadata (sequence number,
74    /// creation timestamp, expiration timestamp). Unlike [`get`](Self::get),
75    /// which returns only the value bytes, this method returns a [`KeyValue`]
76    /// that includes row metadata.
77    ///
78    /// ## Arguments
79    /// - `key`: the key to look up
80    ///
81    /// ## Returns
82    /// - `Ok(Some(KeyValue))`: if the key exists and is not deleted/expired
83    /// - `Ok(None)`: if the key does not exist or is deleted/expired
84    ///
85    /// ## Errors
86    /// - `Error`: if there was an error reading from the database
87    async fn get_key_value<K: AsRef<[u8]> + Send>(
88        &self,
89        key: K,
90    ) -> Result<Option<KeyValue>, crate::Error> {
91        self.get_key_value_with_options(key, &ReadOptions::default())
92            .await
93    }
94
95    /// Get a key-value pair from the database with custom read options.
96    ///
97    /// Returns the key along with its value and metadata (sequence number,
98    /// creation timestamp, expiration timestamp). Unlike
99    /// [`get_with_options`](Self::get_with_options), which returns only the
100    /// value bytes, this method returns a [`KeyValue`] that includes row
101    /// metadata.
102    ///
103    /// ## Arguments
104    /// - `key`: the key to look up
105    /// - `options`: the read options to use
106    ///
107    /// ## Returns
108    /// - `Ok(Some(KeyValue))`: if the key exists and is not deleted/expired
109    /// - `Ok(None)`: if the key does not exist or is deleted/expired
110    ///
111    /// ## Errors
112    /// - `Error`: if there was an error reading from the database
113    async fn get_key_value_with_options<K: AsRef<[u8]> + Send>(
114        &self,
115        key: K,
116        options: &ReadOptions,
117    ) -> Result<Option<KeyValue>, crate::Error>;
118
119    /// Scan a range of keys using the default scan options.
120    ///
121    /// returns a `DbIterator`
122    ///
123    /// ## Arguments
124    /// - `range`: the range of keys to scan
125    ///
126    /// ## Errors
127    /// - `Error`: if there was an error scanning the range of keys
128    ///
129    /// ## Returns
130    /// - `Result<DbIterator, Error>`: An iterator with the results of the scan
131    async fn scan<K, T>(&self, range: T) -> Result<DbIterator, crate::Error>
132    where
133        K: AsRef<[u8]> + Send,
134        T: RangeBounds<K> + Send,
135    {
136        self.scan_with_options(range, &ScanOptions::default()).await
137    }
138
139    /// Scan a range of keys with the provided options.
140    ///
141    /// returns a `DbIterator`
142    ///
143    /// ## Arguments
144    /// - `range`: the range of keys to scan
145    /// - `options`: the scan options to use
146    ///
147    /// ## Errors
148    /// - `Error`: if there was an error scanning the range of keys
149    ///
150    /// ## Returns
151    /// - `Result<DbIterator, Error>`: An iterator with the results of the scan
152    async fn scan_with_options<K, T>(
153        &self,
154        range: T,
155        options: &ScanOptions,
156    ) -> Result<DbIterator, crate::Error>
157    where
158        K: AsRef<[u8]> + Send,
159        T: RangeBounds<K> + Send;
160
161    /// Scan all keys that share the provided prefix using the default scan options.
162    ///
163    /// ## Arguments
164    /// - `prefix`: the key prefix to scan
165    ///
166    /// ## Returns
167    /// - `Result<DbIterator, Error>`: An iterator with the results of the scan
168    async fn scan_prefix<P>(&self, prefix: P) -> Result<DbIterator, crate::Error>
169    where
170        P: AsRef<[u8]> + Send,
171    {
172        self.scan_prefix_with_options(prefix, &ScanOptions::default())
173            .await
174    }
175
176    /// Scan all keys that share the provided prefix with custom options.
177    ///
178    /// ## Arguments
179    /// - `prefix`: the key prefix to scan
180    /// - `options`: the scan options to use
181    ///
182    /// ## Returns
183    /// - `Result<DbIterator, Error>`: An iterator with the results of the scan
184    async fn scan_prefix_with_options<P>(
185        &self,
186        prefix: P,
187        options: &ScanOptions,
188    ) -> Result<DbIterator, crate::Error>
189    where
190        P: AsRef<[u8]> + Send,
191    {
192        let range = BytesRange::from_prefix(prefix.as_ref());
193        self.scan_with_options(range, options).await
194    }
195}
196
197/// Trait for write-side database operations.
198///
199/// This trait defines the asynchronous write API exposed by [`Db`](crate::Db),
200/// allowing consumers to write generic code or test doubles over the writer
201/// surface without depending on the concrete `Db` type.
202#[async_trait::async_trait]
203pub trait DbWriteOps {
204    /// The transaction type returned by [`Self::begin`]. Stub
205    /// implementations supply their own [`DbTransactionOps`] type, while
206    /// the real `Db` returns a `DbTransaction`.
207    type Transaction: DbTransactionOps + Send;
208
209    /// Write a value into the database with default `PutOptions` and
210    /// `WriteOptions`.
211    ///
212    /// ## Arguments
213    /// - `key`: the key to write
214    /// - `value`: the value to write
215    ///
216    /// ## Errors
217    /// - `Error`: if there was an error writing the value.
218    async fn put<K, V>(&self, key: K, value: V) -> Result<WriteHandle, crate::Error>
219    where
220        K: AsRef<[u8]> + Send,
221        V: AsRef<[u8]> + Send,
222    {
223        self.put_with_options(key, value, &PutOptions::default(), &WriteOptions::default())
224            .await
225    }
226
227    /// Write a value into the database with custom `PutOptions` and
228    /// `WriteOptions`.
229    ///
230    /// ## Arguments
231    /// - `key`: the key to write
232    /// - `value`: the value to write
233    /// - `put_opts`: the put options to use
234    /// - `write_opts`: the write options to use
235    ///
236    /// ## Errors
237    /// - `Error`: if there was an error writing the value.
238    async fn put_with_options<K, V>(
239        &self,
240        key: K,
241        value: V,
242        put_opts: &PutOptions,
243        write_opts: &WriteOptions,
244    ) -> Result<WriteHandle, crate::Error>
245    where
246        K: AsRef<[u8]> + Send,
247        V: AsRef<[u8]> + Send;
248
249    /// Delete a key from the database with default `WriteOptions`.
250    ///
251    /// ## Arguments
252    /// - `key`: the key to delete
253    ///
254    /// ## Errors
255    /// - `Error`: if there was an error deleting the key.
256    async fn delete<K: AsRef<[u8]> + Send>(&self, key: K) -> Result<WriteHandle, crate::Error> {
257        self.delete_with_options(key, &WriteOptions::default())
258            .await
259    }
260
261    /// Delete a key from the database with custom `WriteOptions`.
262    ///
263    /// ## Arguments
264    /// - `key`: the key to delete
265    /// - `options`: the write options to use
266    ///
267    /// ## Errors
268    /// - `Error`: if there was an error deleting the key.
269    async fn delete_with_options<K: AsRef<[u8]> + Send>(
270        &self,
271        key: K,
272        options: &WriteOptions,
273    ) -> Result<WriteHandle, crate::Error>;
274
275    /// Merge a value into the database with default `MergeOptions` and
276    /// `WriteOptions`.
277    ///
278    /// Merge operations allow applications to bypass the traditional
279    /// read/modify/write cycle by expressing partial updates using an
280    /// associative operator. The merge operator must be configured when
281    /// opening the database.
282    ///
283    /// ## Arguments
284    /// - `key`: the key to merge into
285    /// - `value`: the merge operand to apply
286    ///
287    /// ## Errors
288    /// - `Error`: if there was an error merging the value, or if no merge
289    ///   operator is configured.
290    async fn merge<K, V>(&self, key: K, value: V) -> Result<WriteHandle, crate::Error>
291    where
292        K: AsRef<[u8]> + Send,
293        V: AsRef<[u8]> + Send,
294    {
295        self.merge_with_options(
296            key,
297            value,
298            &MergeOptions::default(),
299            &WriteOptions::default(),
300        )
301        .await
302    }
303
304    /// Merge a value into the database with custom `MergeOptions` and
305    /// `WriteOptions`.
306    ///
307    /// ## Arguments
308    /// - `key`: the key to merge into
309    /// - `value`: the merge operand to apply
310    /// - `merge_opts`: the merge options to use
311    /// - `write_opts`: the write options to use
312    ///
313    /// ## Errors
314    /// - `Error`: if there was an error merging the value, or if no merge
315    ///   operator is configured.
316    async fn merge_with_options<K, V>(
317        &self,
318        key: K,
319        value: V,
320        merge_opts: &MergeOptions,
321        write_opts: &WriteOptions,
322    ) -> Result<WriteHandle, crate::Error>
323    where
324        K: AsRef<[u8]> + Send,
325        V: AsRef<[u8]> + Send;
326
327    /// Write a batch of put/delete operations atomically to the database.
328    ///
329    /// ## Arguments
330    /// - `batch`: the batch of operations to write
331    ///
332    /// ## Errors
333    /// - `Error`: if there was an error writing the batch.
334    async fn write(&self, batch: WriteBatch) -> Result<WriteHandle, crate::Error> {
335        self.write_with_options(batch, &WriteOptions::default())
336            .await
337    }
338
339    /// Write a batch of put/delete operations atomically to the database with
340    /// custom `WriteOptions`.
341    ///
342    /// ## Arguments
343    /// - `batch`: the batch of operations to write
344    /// - `options`: the write options to use
345    ///
346    /// ## Errors
347    /// - `Error`: if there was an error writing the batch.
348    async fn write_with_options(
349        &self,
350        batch: WriteBatch,
351        options: &WriteOptions,
352    ) -> Result<WriteHandle, crate::Error>;
353
354    /// Flush in-memory writes to disk. This function blocks until the
355    /// in-memory data has been durably written to object storage.
356    ///
357    /// ## Errors
358    /// - `Error`: if there was an error flushing the database.
359    async fn flush(&self) -> Result<(), crate::Error>;
360
361    /// Flush in-memory writes to disk with custom options.
362    ///
363    /// An error will be returned if `options.flush_type` is `FlushType::Wal`
364    /// and the WAL is disabled.
365    ///
366    /// ## Arguments
367    /// - `options`: the flush options
368    ///
369    /// ## Errors
370    /// - `Error`: if there was an error flushing the database.
371    async fn flush_with_options(&self, options: FlushOptions) -> Result<(), crate::Error>;
372
373    /// Begin a new transaction with the specified isolation level.
374    ///
375    /// ## Arguments
376    /// - `isolation_level`: the isolation level for the transaction
377    ///
378    /// ## Returns
379    /// - `Result<Self::Transaction, Error>`: the transaction handle
380    async fn begin(
381        &self,
382        isolation_level: IsolationLevel,
383    ) -> Result<Self::Transaction, crate::Error>;
384}
385
386/// Trait for transactional database operations.
387///
388/// This trait defines the synchronous write API and lifecycle operations
389/// exposed by [`DbTransaction`](crate::DbTransaction), and extends
390/// [`DbReadOps`] so consumers can write generic code or test doubles over
391/// the full transaction surface without depending on the concrete
392/// `DbTransaction` type.
393#[async_trait::async_trait]
394pub trait DbTransactionOps: DbReadOps {
395    /// Put a key-value pair into the transaction with default `PutOptions`.
396    /// The write is buffered in the transaction's write batch until commit.
397    fn put<K, V>(&self, key: K, value: V) -> Result<(), crate::Error>
398    where
399        K: AsRef<[u8]>,
400        V: AsRef<[u8]>,
401    {
402        self.put_with_options(key, value, &PutOptions::default())
403    }
404
405    /// Put a key-value pair into the transaction with custom `PutOptions`.
406    /// The write is buffered in the transaction's write batch until commit.
407    fn put_with_options<K, V>(
408        &self,
409        key: K,
410        value: V,
411        options: &PutOptions,
412    ) -> Result<(), crate::Error>
413    where
414        K: AsRef<[u8]>,
415        V: AsRef<[u8]>;
416
417    /// Delete a key from the transaction. The delete is buffered in the
418    /// transaction's write batch until commit.
419    fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), crate::Error>;
420
421    /// Merge a key-value pair into the transaction with default
422    /// `MergeOptions`.
423    ///
424    /// ## Errors
425    /// - `Error`: if no merge operator is configured for the database.
426    fn merge<K, V>(&self, key: K, value: V) -> Result<(), crate::Error>
427    where
428        K: AsRef<[u8]>,
429        V: AsRef<[u8]>,
430    {
431        self.merge_with_options(key, value, &MergeOptions::default())
432    }
433
434    /// Merge a key-value pair into the transaction with custom `MergeOptions`.
435    ///
436    /// ## Errors
437    /// - `Error`: if no merge operator is configured for the database.
438    fn merge_with_options<K, V>(
439        &self,
440        key: K,
441        value: V,
442        options: &MergeOptions,
443    ) -> Result<(), crate::Error>
444    where
445        K: AsRef<[u8]>,
446        V: AsRef<[u8]>;
447
448    /// Mark keys as read for conflict detection.
449    ///
450    /// When keys are marked as read, the transaction will detect conflicts
451    /// if another transaction modifies any of those keys after this
452    /// transaction started, regardless of the isolation level.
453    fn mark_read<K, I>(&self, keys: I) -> Result<(), crate::Error>
454    where
455        K: AsRef<[u8]>,
456        I: IntoIterator<Item = K>;
457
458    /// Mark written keys as untracked for conflict detection.
459    ///
460    /// Keys marked with this method are still written atomically with the
461    /// rest of the transaction, but are excluded from transaction conflict
462    /// detection on commit for both this transaction and other transactions.
463    fn unmark_write<K, I>(&self, keys: I) -> Result<(), crate::Error>
464    where
465        K: AsRef<[u8]>,
466        I: IntoIterator<Item = K>;
467
468    /// Get the sequence number this transaction was started at.
469    fn seqnum(&self) -> u64;
470
471    /// Get the unique transaction ID assigned by the transaction manager.
472    fn id(&self) -> Uuid;
473
474    /// Commit the transaction with default `WriteOptions`.
475    ///
476    /// ## Returns
477    /// - `Ok(Some(WriteHandle))` if the commit is successful and there are
478    ///   writes in the batch.
479    /// - `Ok(None)` if the commit is successful but the write batch is empty.
480    ///
481    /// ## Errors
482    /// - `Error`: if the commit operation fails (I/O errors or conflict
483    ///   detection).
484    async fn commit(self) -> Result<Option<WriteHandle>, crate::Error>
485    where
486        Self: Sized + Send,
487    {
488        self.commit_with_options(&WriteOptions::default()).await
489    }
490
491    /// Commit the transaction with custom `WriteOptions`.
492    async fn commit_with_options(
493        self,
494        options: &WriteOptions,
495    ) -> Result<Option<WriteHandle>, crate::Error>
496    where
497        Self: Sized + Send;
498
499    /// Rollback the transaction by discarding all buffered operations.
500    fn rollback(self)
501    where
502        Self: Sized;
503}
504
505/// Trait for database metadata operations.
506///
507/// This trait provides access to database status and manifest information,
508/// implemented by [`Db`](crate::Db) and [`DbReader`](crate::DbReader) to
509/// provide a unified interface for metadata access.
510///
511/// The trait is object-safe, allowing for dynamic dispatch when needed.
512pub trait DbMetadataOps {
513    /// Get the current manifest state.
514    ///
515    /// Returns the current manifest snapshot known to this handle, paired
516    /// with its manifest version ID.
517    fn manifest(&self) -> VersionedManifest;
518
519    /// Subscribe to database state changes.
520    ///
521    /// Returns a [`tokio::sync::watch::Receiver<DbStatus>`] that always
522    /// reflects the latest database status. The status includes the latest durable
523    /// sequence number and the current manifest snapshot observed by this
524    /// handle. For [`Db`](crate::Db) is is the current in-memory snapshot and
525    /// for [`DbReader`](crate::DbReader) it is the latest manifest polled from object storage.
526    /// For example, you can wait for a specific sequence number to
527    /// become durable:
528    ///
529    /// ```ignore
530    /// let seq = 42; // sequence number from a write operation
531    /// let mut rx = db.subscribe();
532    /// rx.wait_for(|s| s.durable_seq >= seq).await.expect("db dropped");
533    /// ```
534    ///
535    /// # Deadlock risk
536    ///
537    /// The returned receiver holds a read lock on the current value while
538    /// borrowed (via [`borrow`](tokio::sync::watch::Receiver::borrow),
539    /// [`borrow_and_update`](tokio::sync::watch::Receiver::borrow_and_update),
540    /// or the guard returned by [`wait_for`](tokio::sync::watch::Receiver::wait_for)).
541    /// The database must acquire a write lock to publish new status updates.
542    /// Holding the read guard for an extended period will block all database status
543    /// updates and may cause a deadlock. See the [deadlock warning in
544    /// `Receiver::borrow`](https://docs.rs/tokio/latest/tokio/sync/watch/struct.Receiver.html#method.borrow)
545    /// for details. Always clone or copy the data you need:
546    ///
547    /// ```ignore
548    /// // Good: clone the status and release the lock immediately.
549    /// let status = rx.borrow().clone();
550    /// some_async_fn(status.durable_seq).await;
551    /// some_other_async_fn(status.current_manifest.clone()).await;
552    ///
553    /// // Good: copy the durable seq and release the lock immediately.
554    /// let durable_seq = rx.borrow().durable_seq; // uses Copy trait
555    /// some_async_fn(durable_seq).await;
556    ///
557    /// // Bad: holding the status across an await blocks all senders.
558    /// let status = rx.borrow();
559    /// some_async_fn(status.durable_seq).await; // deadlock!
560    /// ```
561    fn subscribe(&self) -> tokio::sync::watch::Receiver<DbStatus>;
562
563    /// Returns the latest database status.
564    ///
565    /// This is a snapshot of the current state and will not update automatically.
566    /// Use [`subscribe`](DbMetadataOps::subscribe) to receive real-time updates.
567    fn status(&self) -> DbStatus;
568}
569
570/// Trait for block-cache warming and eviction operations.
571#[async_trait::async_trait]
572pub trait DbCacheManagerOps {
573    /// Warms selected cache content for one SST.
574    ///
575    /// Callers fan out over SSTs themselves (for example with
576    /// `FuturesUnordered`) to get the concurrency they want. Per-target
577    /// outcomes are reflected in cache-manager metrics, not the return value.
578    ///
579    /// Returns `Err` on the first failing target. If no block cache is
580    /// configured, or if the SST is not reachable from the current manifest,
581    /// the call is a no-op that returns `Ok(())`.
582    async fn warm_sst(
583        &self,
584        sst_id: SsTableId,
585        targets: &[CacheTarget],
586    ) -> Result<(), crate::Error>;
587
588    /// Best-effort eviction of block-cache entries for one SST.
589    ///
590    /// If no block cache is configured, logs a warning and returns `Ok(())`.
591    /// Does not check whether the SST is still live in the current manifest —
592    /// callers own that policy.
593    async fn evict_cached_sst(&self, sst_id: SsTableId) -> Result<(), crate::Error>;
594}
595
596#[cfg(test)]
597mod tests {
598    use super::*;
599
600    // Compile-time check: the trait is object-safe.
601    fn _assert_object_safe(_: &dyn DbMetadataOps) {}
602    fn _assert_cache_manager_ops_object_safe(_: &dyn DbCacheManagerOps) {}
603}