1use bytes::Bytes;
2use s2_common::record::{SeqNum, StreamPosition, Timestamp};
3use slatedb::{
4 DbTransaction,
5 config::{DurabilityLevel, ReadOptions, ScanOptions},
6};
7
8use super::Backend;
9use crate::backend::{error::StorageError, kv, stream_id::StreamId};
10
11impl Backend {
12 pub async fn db_status(&self) -> Result<(), slatedb::Error> {
15 let _ = self.db.get(b"ping").await?;
16 Ok(())
17 }
18
19 pub(super) async fn db_get<K: AsRef<[u8]> + Send, V>(
20 &self,
21 key: K,
22 deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
23 ) -> Result<Option<V>, StorageError> {
24 static READ_OPTS: ReadOptions = ReadOptions {
25 durability_filter: DurabilityLevel::Remote,
26 dirty: false,
27 cache_blocks: true,
28 };
29 let value = self
30 .db
31 .get_with_options(key, &READ_OPTS)
32 .await?
33 .map(deser)
34 .transpose()?;
35 Ok(value)
36 }
37
38 pub(super) async fn resolve_timestamp(
39 &self,
40 stream_id: StreamId,
41 timestamp: Timestamp,
42 ) -> Result<Option<StreamPosition>, StorageError> {
43 let start_key = kv::stream_record_timestamp::ser_key(stream_id, timestamp, SeqNum::MIN);
44 let end_key = kv::stream_record_timestamp::ser_key(stream_id, Timestamp::MAX, SeqNum::MAX);
45 static SCAN_OPTS: ScanOptions = ScanOptions {
46 durability_filter: DurabilityLevel::Remote,
47 dirty: false,
48 read_ahead_bytes: 1,
49 cache_blocks: false,
50 max_fetch_tasks: 1,
51 };
52 let mut it = self
53 .db
54 .scan_with_options(start_key..end_key, &SCAN_OPTS)
55 .await?;
56 Ok(match it.next().await? {
57 Some(kv) => {
58 let (deser_stream_id, deser_timestamp, deser_seq_num) =
59 kv::stream_record_timestamp::deser_key(kv.key)?;
60 assert_eq!(deser_stream_id, stream_id);
61 assert!(deser_timestamp >= timestamp);
62 kv::stream_record_timestamp::deser_value(kv.value)?;
63 Some(StreamPosition {
64 seq_num: deser_seq_num,
65 timestamp: deser_timestamp,
66 })
67 }
68 None => None,
69 })
70 }
71}
72
73pub(super) async fn db_txn_get<K: AsRef<[u8]> + Send, V>(
74 txn: &DbTransaction,
75 key: K,
76 deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
77) -> Result<Option<V>, StorageError> {
78 static READ_OPTS: ReadOptions = ReadOptions {
79 durability_filter: DurabilityLevel::Memory,
80 dirty: false,
81 cache_blocks: true,
82 };
83 let value = txn
84 .get_with_options(key, &READ_OPTS)
85 .await?
86 .map(deser)
87 .transpose()?;
88 Ok(value)
89}
90
91#[cfg(test)]
92mod tests {
93 use std::sync::Arc;
94
95 use bytesize::ByteSize;
96 use slatedb::{Db, object_store::memory::InMemory};
97
98 use super::*;
99
100 #[tokio::test]
101 async fn resolve_timestamp_bounded_to_stream() {
102 let object_store = Arc::new(InMemory::new());
103 let db = Db::builder("/test", object_store).build().await.unwrap();
104 let backend = Backend::new(db, ByteSize::mib(10));
105
106 let stream_a: StreamId = [0u8; 32].into();
107 let stream_b: StreamId = [1u8; 32].into();
108
109 backend
110 .db
111 .put(
112 kv::stream_record_timestamp::ser_key(stream_a, 1000, 0),
113 kv::stream_record_timestamp::ser_value(),
114 )
115 .await
116 .unwrap();
117 backend
118 .db
119 .put(
120 kv::stream_record_timestamp::ser_key(stream_b, 2000, 0),
121 kv::stream_record_timestamp::ser_value(),
122 )
123 .await
124 .unwrap();
125
126 let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
128 assert_eq!(
129 result,
130 Some(StreamPosition {
131 seq_num: 0,
132 timestamp: 1000
133 })
134 );
135
136 let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
138 assert_eq!(result, None);
139 }
140}