Skip to main content

s2_lite/backend/
store.rs

1use bytes::Bytes;
2use s2_common::record::{SeqNum, StreamPosition, Timestamp};
3use slatedb::{
4    DbTransaction,
5    config::{DurabilityLevel, ReadOptions, ScanOptions},
6};
7
8use super::Backend;
9use crate::backend::{error::StorageError, kv, stream_id::StreamId};
10
11impl Backend {
12    // TODO: switch to `self.db.status()` once slatedb releases with
13    // https://github.com/slatedb/slatedb/pull/1234
14    pub async fn db_status(&self) -> Result<(), slatedb::Error> {
15        let _ = self.db.get(b"ping").await?;
16        Ok(())
17    }
18
19    pub(super) async fn db_get<K: AsRef<[u8]> + Send, V>(
20        &self,
21        key: K,
22        deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
23    ) -> Result<Option<V>, StorageError> {
24        static READ_OPTS: ReadOptions = ReadOptions {
25            durability_filter: DurabilityLevel::Remote,
26            dirty: false,
27            cache_blocks: true,
28        };
29        let value = self
30            .db
31            .get_with_options(key, &READ_OPTS)
32            .await?
33            .map(deser)
34            .transpose()?;
35        Ok(value)
36    }
37
38    pub(super) async fn resolve_timestamp(
39        &self,
40        stream_id: StreamId,
41        timestamp: Timestamp,
42    ) -> Result<Option<StreamPosition>, StorageError> {
43        let start_key = kv::stream_record_timestamp::ser_key(
44            stream_id,
45            StreamPosition {
46                seq_num: SeqNum::MIN,
47                timestamp,
48            },
49        );
50        let end_key = kv::stream_record_timestamp::ser_key(
51            stream_id,
52            StreamPosition {
53                seq_num: SeqNum::MAX,
54                timestamp: Timestamp::MAX,
55            },
56        );
57        static SCAN_OPTS: ScanOptions = ScanOptions {
58            durability_filter: DurabilityLevel::Remote,
59            dirty: false,
60            read_ahead_bytes: 1,
61            cache_blocks: false,
62            max_fetch_tasks: 1,
63        };
64        let mut it = self
65            .db
66            .scan_with_options(start_key..end_key, &SCAN_OPTS)
67            .await?;
68        Ok(match it.next().await? {
69            Some(kv) => {
70                let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
71                assert_eq!(deser_stream_id, stream_id);
72                assert!(pos.timestamp >= timestamp);
73                kv::stream_record_timestamp::deser_value(kv.value)?;
74                Some(StreamPosition {
75                    seq_num: pos.seq_num,
76                    timestamp: pos.timestamp,
77                })
78            }
79            None => None,
80        })
81    }
82}
83
84pub(super) async fn db_txn_get<K: AsRef<[u8]> + Send, V>(
85    txn: &DbTransaction,
86    key: K,
87    deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
88) -> Result<Option<V>, StorageError> {
89    static READ_OPTS: ReadOptions = ReadOptions {
90        durability_filter: DurabilityLevel::Memory,
91        dirty: false,
92        cache_blocks: true,
93    };
94    let value = txn
95        .get_with_options(key, &READ_OPTS)
96        .await?
97        .map(deser)
98        .transpose()?;
99    Ok(value)
100}
101
102#[cfg(test)]
103mod tests {
104    use std::sync::Arc;
105
106    use bytesize::ByteSize;
107    use slatedb::{Db, object_store::memory::InMemory};
108
109    use super::*;
110
111    #[tokio::test]
112    async fn resolve_timestamp_bounded_to_stream() {
113        let object_store = Arc::new(InMemory::new());
114        let db = Db::builder("/test", object_store).build().await.unwrap();
115        let backend = Backend::new(db, ByteSize::mib(10));
116
117        let stream_a: StreamId = [0u8; 32].into();
118        let stream_b: StreamId = [1u8; 32].into();
119
120        backend
121            .db
122            .put(
123                kv::stream_record_timestamp::ser_key(
124                    stream_a,
125                    StreamPosition {
126                        seq_num: 0,
127                        timestamp: 1000,
128                    },
129                ),
130                kv::stream_record_timestamp::ser_value(),
131            )
132            .await
133            .unwrap();
134        backend
135            .db
136            .put(
137                kv::stream_record_timestamp::ser_key(
138                    stream_b,
139                    StreamPosition {
140                        seq_num: 0,
141                        timestamp: 2000,
142                    },
143                ),
144                kv::stream_record_timestamp::ser_value(),
145            )
146            .await
147            .unwrap();
148
149        // Should find record in stream_a
150        let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
151        assert_eq!(
152            result,
153            Some(StreamPosition {
154                seq_num: 0,
155                timestamp: 1000
156            })
157        );
158
159        // Should return None, not find stream_b's record
160        let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
161        assert_eq!(result, None);
162    }
163}