Skip to main content

s2_lite/backend/
store.rs

1use bytes::Bytes;
2use s2_common::record::{SeqNum, StreamPosition, Timestamp};
3use slatedb::{
4    DbTransaction,
5    config::{DurabilityLevel, ReadOptions, ScanOptions},
6};
7
8use super::Backend;
9use crate::backend::{error::StorageError, kv, stream_id::StreamId};
10
11impl Backend {
12    pub async fn db_ping(&self) -> Result<(), slatedb::Error> {
13        let _ = self.db.get(b"ping").await?;
14        Ok(())
15    }
16
17    pub(super) async fn db_get<K: AsRef<[u8]> + Send, V>(
18        &self,
19        key: K,
20        deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
21    ) -> Result<Option<V>, StorageError> {
22        static READ_OPTS: ReadOptions = ReadOptions {
23            durability_filter: DurabilityLevel::Remote,
24            dirty: false,
25            cache_blocks: true,
26        };
27        let value = self
28            .db
29            .get_with_options(key, &READ_OPTS)
30            .await?
31            .map(deser)
32            .transpose()?;
33        Ok(value)
34    }
35
36    pub(super) async fn resolve_timestamp(
37        &self,
38        stream_id: StreamId,
39        timestamp: Timestamp,
40    ) -> Result<Option<StreamPosition>, StorageError> {
41        let start_key = kv::stream_record_timestamp::ser_key(stream_id, timestamp, SeqNum::MIN);
42        let end_key = kv::stream_record_timestamp::ser_key(stream_id, Timestamp::MAX, SeqNum::MAX);
43        static SCAN_OPTS: ScanOptions = ScanOptions {
44            durability_filter: DurabilityLevel::Remote,
45            dirty: false,
46            read_ahead_bytes: 1,
47            cache_blocks: false,
48            max_fetch_tasks: 1,
49        };
50        let mut it = self
51            .db
52            .scan_with_options(start_key..end_key, &SCAN_OPTS)
53            .await?;
54        Ok(match it.next().await? {
55            Some(kv) => {
56                let (deser_stream_id, deser_timestamp, deser_seq_num) =
57                    kv::stream_record_timestamp::deser_key(kv.key)?;
58                assert_eq!(deser_stream_id, stream_id);
59                assert!(deser_timestamp >= timestamp);
60                kv::stream_record_timestamp::deser_value(kv.value)?;
61                Some(StreamPosition {
62                    seq_num: deser_seq_num,
63                    timestamp: deser_timestamp,
64                })
65            }
66            None => None,
67        })
68    }
69}
70
71pub(super) async fn db_txn_get<K: AsRef<[u8]> + Send, V>(
72    txn: &DbTransaction,
73    key: K,
74    deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
75) -> Result<Option<V>, StorageError> {
76    static READ_OPTS: ReadOptions = ReadOptions {
77        durability_filter: DurabilityLevel::Memory,
78        dirty: false,
79        cache_blocks: true,
80    };
81    let value = txn
82        .get_with_options(key, &READ_OPTS)
83        .await?
84        .map(deser)
85        .transpose()?;
86    Ok(value)
87}
88
89#[cfg(test)]
90mod tests {
91    use std::sync::Arc;
92
93    use bytesize::ByteSize;
94    use slatedb::{Db, object_store::memory::InMemory};
95
96    use super::*;
97
98    #[tokio::test]
99    async fn resolve_timestamp_bounded_to_stream() {
100        let object_store = Arc::new(InMemory::new());
101        let db = Db::builder("/test", object_store).build().await.unwrap();
102        let backend = Backend::new(db, ByteSize::mib(10));
103
104        let stream_a: StreamId = [0u8; 32].into();
105        let stream_b: StreamId = [1u8; 32].into();
106
107        backend
108            .db
109            .put(
110                kv::stream_record_timestamp::ser_key(stream_a, 1000, 0),
111                kv::stream_record_timestamp::ser_value(),
112            )
113            .await
114            .unwrap();
115        backend
116            .db
117            .put(
118                kv::stream_record_timestamp::ser_key(stream_b, 2000, 0),
119                kv::stream_record_timestamp::ser_value(),
120            )
121            .await
122            .unwrap();
123
124        // Should find record in stream_a
125        let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
126        assert_eq!(
127            result,
128            Some(StreamPosition {
129                seq_num: 0,
130                timestamp: 1000
131            })
132        );
133
134        // Should return None, not find stream_b's record
135        let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
136        assert_eq!(result, None);
137    }
138}