Skip to main content

s2_lite/backend/
store.rs

1use bytes::Bytes;
2use s2_common::record::{SeqNum, StreamPosition, Timestamp};
3use slatedb::{
4    DbTransaction,
5    config::{DurabilityLevel, ReadOptions, ScanOptions},
6};
7
8use super::Backend;
9use crate::backend::{error::StorageError, kv, stream_id::StreamId};
10
11impl Backend {
12    // TODO: switch to `self.db.status()` once slatedb releases with
13    // https://github.com/slatedb/slatedb/pull/1234
14    pub async fn db_status(&self) -> Result<(), slatedb::Error> {
15        let _ = self.db.get(b"ping").await?;
16        Ok(())
17    }
18
19    pub(super) async fn db_get<K: AsRef<[u8]> + Send, V>(
20        &self,
21        key: K,
22        deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
23    ) -> Result<Option<V>, StorageError> {
24        static READ_OPTS: ReadOptions = ReadOptions {
25            durability_filter: DurabilityLevel::Remote,
26            dirty: false,
27            cache_blocks: true,
28        };
29        let value = self
30            .db
31            .get_with_options(key, &READ_OPTS)
32            .await?
33            .map(deser)
34            .transpose()?;
35        Ok(value)
36    }
37
38    pub(super) async fn resolve_timestamp(
39        &self,
40        stream_id: StreamId,
41        timestamp: Timestamp,
42    ) -> Result<Option<StreamPosition>, StorageError> {
43        let start_key = kv::stream_record_timestamp::ser_key(stream_id, timestamp, SeqNum::MIN);
44        let end_key = kv::stream_record_timestamp::ser_key(stream_id, Timestamp::MAX, SeqNum::MAX);
45        static SCAN_OPTS: ScanOptions = ScanOptions {
46            durability_filter: DurabilityLevel::Remote,
47            dirty: false,
48            read_ahead_bytes: 1,
49            cache_blocks: false,
50            max_fetch_tasks: 1,
51        };
52        let mut it = self
53            .db
54            .scan_with_options(start_key..end_key, &SCAN_OPTS)
55            .await?;
56        Ok(match it.next().await? {
57            Some(kv) => {
58                let (deser_stream_id, deser_timestamp, deser_seq_num) =
59                    kv::stream_record_timestamp::deser_key(kv.key)?;
60                assert_eq!(deser_stream_id, stream_id);
61                assert!(deser_timestamp >= timestamp);
62                kv::stream_record_timestamp::deser_value(kv.value)?;
63                Some(StreamPosition {
64                    seq_num: deser_seq_num,
65                    timestamp: deser_timestamp,
66                })
67            }
68            None => None,
69        })
70    }
71}
72
73pub(super) async fn db_txn_get<K: AsRef<[u8]> + Send, V>(
74    txn: &DbTransaction,
75    key: K,
76    deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
77) -> Result<Option<V>, StorageError> {
78    static READ_OPTS: ReadOptions = ReadOptions {
79        durability_filter: DurabilityLevel::Memory,
80        dirty: false,
81        cache_blocks: true,
82    };
83    let value = txn
84        .get_with_options(key, &READ_OPTS)
85        .await?
86        .map(deser)
87        .transpose()?;
88    Ok(value)
89}
90
91#[cfg(test)]
92mod tests {
93    use std::sync::Arc;
94
95    use bytesize::ByteSize;
96    use slatedb::{Db, object_store::memory::InMemory};
97
98    use super::*;
99
100    #[tokio::test]
101    async fn resolve_timestamp_bounded_to_stream() {
102        let object_store = Arc::new(InMemory::new());
103        let db = Db::builder("/test", object_store).build().await.unwrap();
104        let backend = Backend::new(db, ByteSize::mib(10));
105
106        let stream_a: StreamId = [0u8; 32].into();
107        let stream_b: StreamId = [1u8; 32].into();
108
109        backend
110            .db
111            .put(
112                kv::stream_record_timestamp::ser_key(stream_a, 1000, 0),
113                kv::stream_record_timestamp::ser_value(),
114            )
115            .await
116            .unwrap();
117        backend
118            .db
119            .put(
120                kv::stream_record_timestamp::ser_key(stream_b, 2000, 0),
121                kv::stream_record_timestamp::ser_value(),
122            )
123            .await
124            .unwrap();
125
126        // Should find record in stream_a
127        let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
128        assert_eq!(
129            result,
130            Some(StreamPosition {
131                seq_num: 0,
132                timestamp: 1000
133            })
134        );
135
136        // Should return None, not find stream_b's record
137        let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
138        assert_eq!(result, None);
139    }
140}