1use bytes::Bytes;
2use s2_common::record::{SeqNum, StreamPosition, Timestamp};
3use slatedb::{
4 DbTransaction,
5 config::{DurabilityLevel, ReadOptions, ScanOptions},
6};
7
8use super::Backend;
9use crate::backend::{error::StorageError, kv, stream_id::StreamId};
10
11impl Backend {
12 pub async fn db_status(&self) -> Result<(), slatedb::Error> {
15 let _ = self.db.get(b"ping").await?;
16 Ok(())
17 }
18
19 pub(super) async fn db_get<K: AsRef<[u8]> + Send, V>(
20 &self,
21 key: K,
22 deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
23 ) -> Result<Option<V>, StorageError> {
24 static READ_OPTS: ReadOptions = ReadOptions {
25 durability_filter: DurabilityLevel::Remote,
26 dirty: false,
27 cache_blocks: true,
28 };
29 let value = self
30 .db
31 .get_with_options(key, &READ_OPTS)
32 .await?
33 .map(deser)
34 .transpose()?;
35 Ok(value)
36 }
37
38 pub(super) async fn resolve_timestamp(
39 &self,
40 stream_id: StreamId,
41 timestamp: Timestamp,
42 ) -> Result<Option<StreamPosition>, StorageError> {
43 let start_key = kv::stream_record_timestamp::ser_key(
44 stream_id,
45 StreamPosition {
46 seq_num: SeqNum::MIN,
47 timestamp,
48 },
49 );
50 let end_key = kv::stream_record_timestamp::ser_key(
51 stream_id,
52 StreamPosition {
53 seq_num: SeqNum::MAX,
54 timestamp: Timestamp::MAX,
55 },
56 );
57 static SCAN_OPTS: ScanOptions = ScanOptions {
58 durability_filter: DurabilityLevel::Remote,
59 dirty: false,
60 read_ahead_bytes: 1,
61 cache_blocks: false,
62 max_fetch_tasks: 1,
63 };
64 let mut it = self
65 .db
66 .scan_with_options(start_key..end_key, &SCAN_OPTS)
67 .await?;
68 Ok(match it.next().await? {
69 Some(kv) => {
70 let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
71 assert_eq!(deser_stream_id, stream_id);
72 assert!(pos.timestamp >= timestamp);
73 kv::stream_record_timestamp::deser_value(kv.value)?;
74 Some(StreamPosition {
75 seq_num: pos.seq_num,
76 timestamp: pos.timestamp,
77 })
78 }
79 None => None,
80 })
81 }
82}
83
84pub(super) async fn db_txn_get<K: AsRef<[u8]> + Send, V>(
85 txn: &DbTransaction,
86 key: K,
87 deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
88) -> Result<Option<V>, StorageError> {
89 static READ_OPTS: ReadOptions = ReadOptions {
90 durability_filter: DurabilityLevel::Memory,
91 dirty: false,
92 cache_blocks: true,
93 };
94 let value = txn
95 .get_with_options(key, &READ_OPTS)
96 .await?
97 .map(deser)
98 .transpose()?;
99 Ok(value)
100}
101
102#[cfg(test)]
103mod tests {
104 use std::sync::Arc;
105
106 use bytesize::ByteSize;
107 use slatedb::{Db, object_store::memory::InMemory};
108
109 use super::*;
110
111 #[tokio::test]
112 async fn resolve_timestamp_bounded_to_stream() {
113 let object_store = Arc::new(InMemory::new());
114 let db = Db::builder("/test", object_store).build().await.unwrap();
115 let backend = Backend::new(db, ByteSize::mib(10));
116
117 let stream_a: StreamId = [0u8; 32].into();
118 let stream_b: StreamId = [1u8; 32].into();
119
120 backend
121 .db
122 .put(
123 kv::stream_record_timestamp::ser_key(
124 stream_a,
125 StreamPosition {
126 seq_num: 0,
127 timestamp: 1000,
128 },
129 ),
130 kv::stream_record_timestamp::ser_value(),
131 )
132 .await
133 .unwrap();
134 backend
135 .db
136 .put(
137 kv::stream_record_timestamp::ser_key(
138 stream_b,
139 StreamPosition {
140 seq_num: 0,
141 timestamp: 2000,
142 },
143 ),
144 kv::stream_record_timestamp::ser_value(),
145 )
146 .await
147 .unwrap();
148
149 let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
151 assert_eq!(
152 result,
153 Some(StreamPosition {
154 seq_num: 0,
155 timestamp: 1000
156 })
157 );
158
159 let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
161 assert_eq!(result, None);
162 }
163}