1use bytes::Bytes;
2use s2_common::record::{SeqNum, StreamPosition, Timestamp};
3use slatedb::{
4 DbTransaction,
5 config::{DurabilityLevel, ReadOptions, ScanOptions},
6};
7
8use super::Backend;
9use crate::backend::{error::StorageError, kv, stream_id::StreamId};
10
11impl Backend {
12 pub async fn db_ping(&self) -> Result<(), slatedb::Error> {
13 let _ = self.db.get(b"ping").await?;
14 Ok(())
15 }
16
17 pub(super) async fn db_get<K: AsRef<[u8]> + Send, V>(
18 &self,
19 key: K,
20 deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
21 ) -> Result<Option<V>, StorageError> {
22 static READ_OPTS: ReadOptions = ReadOptions {
23 durability_filter: DurabilityLevel::Remote,
24 dirty: false,
25 cache_blocks: true,
26 };
27 let value = self
28 .db
29 .get_with_options(key, &READ_OPTS)
30 .await?
31 .map(deser)
32 .transpose()?;
33 Ok(value)
34 }
35
36 pub(super) async fn resolve_timestamp(
37 &self,
38 stream_id: StreamId,
39 timestamp: Timestamp,
40 ) -> Result<Option<StreamPosition>, StorageError> {
41 let start_key = kv::stream_record_timestamp::ser_key(stream_id, timestamp, SeqNum::MIN);
42 let end_key = kv::stream_record_timestamp::ser_key(stream_id, Timestamp::MAX, SeqNum::MAX);
43 static SCAN_OPTS: ScanOptions = ScanOptions {
44 durability_filter: DurabilityLevel::Remote,
45 dirty: false,
46 read_ahead_bytes: 1,
47 cache_blocks: false,
48 max_fetch_tasks: 1,
49 };
50 let mut it = self
51 .db
52 .scan_with_options(start_key..end_key, &SCAN_OPTS)
53 .await?;
54 Ok(match it.next().await? {
55 Some(kv) => {
56 let (deser_stream_id, deser_timestamp, deser_seq_num) =
57 kv::stream_record_timestamp::deser_key(kv.key)?;
58 assert_eq!(deser_stream_id, stream_id);
59 assert!(deser_timestamp >= timestamp);
60 kv::stream_record_timestamp::deser_value(kv.value)?;
61 Some(StreamPosition {
62 seq_num: deser_seq_num,
63 timestamp: deser_timestamp,
64 })
65 }
66 None => None,
67 })
68 }
69}
70
71pub(super) async fn db_txn_get<K: AsRef<[u8]> + Send, V>(
72 txn: &DbTransaction,
73 key: K,
74 deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
75) -> Result<Option<V>, StorageError> {
76 static READ_OPTS: ReadOptions = ReadOptions {
77 durability_filter: DurabilityLevel::Memory,
78 dirty: false,
79 cache_blocks: true,
80 };
81 let value = txn
82 .get_with_options(key, &READ_OPTS)
83 .await?
84 .map(deser)
85 .transpose()?;
86 Ok(value)
87}
88
89#[cfg(test)]
90mod tests {
91 use std::sync::Arc;
92
93 use bytesize::ByteSize;
94 use slatedb::{Db, object_store::memory::InMemory};
95
96 use super::*;
97
98 #[tokio::test]
99 async fn resolve_timestamp_bounded_to_stream() {
100 let object_store = Arc::new(InMemory::new());
101 let db = Db::builder("/test", object_store).build().await.unwrap();
102 let backend = Backend::new(db, ByteSize::mib(10));
103
104 let stream_a: StreamId = [0u8; 32].into();
105 let stream_b: StreamId = [1u8; 32].into();
106
107 backend
108 .db
109 .put(
110 kv::stream_record_timestamp::ser_key(stream_a, 1000, 0),
111 kv::stream_record_timestamp::ser_value(),
112 )
113 .await
114 .unwrap();
115 backend
116 .db
117 .put(
118 kv::stream_record_timestamp::ser_key(stream_b, 2000, 0),
119 kv::stream_record_timestamp::ser_value(),
120 )
121 .await
122 .unwrap();
123
124 let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
126 assert_eq!(
127 result,
128 Some(StreamPosition {
129 seq_num: 0,
130 timestamp: 1000
131 })
132 );
133
134 let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
136 assert_eq!(result, None);
137 }
138}