1use std::time::Duration;
2
3use futures::Stream;
4use s2_common::{
5 caps,
6 read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
7 record::{Metered, MeteredSize as _, SeqNum, SequencedRecord, StreamPosition, Timestamp},
8 types::{
9 basin::BasinName,
10 stream::{ReadBatch, ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11 },
12};
13use slatedb::config::{DurabilityLevel, ScanOptions};
14use tokio::sync::broadcast;
15
16use super::Backend;
17use crate::backend::{
18 error::{
19 CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
20 },
21 kv,
22 stream_id::StreamId,
23};
24
25impl Backend {
26 async fn read_start_seq_num(
27 &self,
28 stream_id: StreamId,
29 start: ReadStart,
30 end: ReadEnd,
31 tail: StreamPosition,
32 ) -> Result<SeqNum, ReadError> {
33 let mut read_pos = match start.from {
34 s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
35 s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
36 ReadPosition::Timestamp(timestamp)
37 }
38 s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
39 ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
40 }
41 };
42 if match read_pos {
43 ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
44 ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
45 } {
46 if start.clamp {
47 read_pos = ReadPosition::SeqNum(tail.seq_num);
48 } else {
49 return Err(UnwrittenError(tail).into());
50 }
51 }
52 if let ReadPosition::SeqNum(start_seq_num) = read_pos
53 && start_seq_num == tail.seq_num
54 && !end.may_follow()
55 {
56 return Err(UnwrittenError(tail).into());
57 }
58 Ok(match read_pos {
59 ReadPosition::SeqNum(start_seq_num) => start_seq_num,
60 ReadPosition::Timestamp(start_timestamp) => {
61 self.resolve_timestamp(stream_id, start_timestamp)
62 .await?
63 .unwrap_or(tail)
64 .seq_num
65 }
66 })
67 }
68
69 pub async fn check_tail(
70 &self,
71 basin: BasinName,
72 stream: StreamName,
73 ) -> Result<StreamPosition, CheckTailError> {
74 let client = self
75 .streamer_client_with_auto_create::<CheckTailError>(&basin, &stream, |config| {
76 config.create_stream_on_read
77 })
78 .await?;
79 let tail = client.check_tail().await?;
80 Ok(tail)
81 }
82
83 pub async fn read(
84 &self,
85 basin: BasinName,
86 stream: StreamName,
87 start: ReadStart,
88 end: ReadEnd,
89 ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
90 let client = self
91 .streamer_client_with_auto_create::<ReadError>(&basin, &stream, |config| {
92 config.create_stream_on_read
93 })
94 .await?;
95 let stream_id = client.stream_id();
96 let tail = client.check_tail().await?;
97 let mut state = ReadSessionState {
98 start_seq_num: self.read_start_seq_num(stream_id, start, end, tail).await?,
99 limit: EvaluatedReadLimit::Remaining(end.limit),
100 until: end.until,
101 tail,
102 };
103 let db = self.db.clone();
104 let session = async_stream::try_stream! {
105 'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
106 if state.start_seq_num < state.tail.seq_num {
107 let start_key = kv::stream_record_data::ser_key(
108 stream_id,
109 StreamPosition {
110 seq_num: state.start_seq_num,
111 timestamp: 0,
112 },
113 );
114 let end_key = kv::stream_record_data::ser_key(
115 stream_id,
116 StreamPosition {
117 seq_num: state.tail.seq_num,
118 timestamp: 0,
119 },
120 );
121 static SCAN_OPTS: ScanOptions = ScanOptions {
122 durability_filter: DurabilityLevel::Remote,
123 dirty: false,
124 read_ahead_bytes: 1024 * 1024,
125 cache_blocks: true,
126 max_fetch_tasks: 8,
127 };
128 let mut it = db
129 .scan_with_options(start_key..end_key, &SCAN_OPTS)
130 .await?;
131
132 let mut records = Metered::with_capacity(
133 limit.count()
134 .unwrap_or(usize::MAX)
135 .min(caps::RECORD_BATCH_MAX.count),
136 );
137
138 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
139 let Some(kv) = it.next().await? else {
140 break;
141 };
142 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
143 assert_eq!(deser_stream_id, stream_id);
144
145 let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
146
147 if end.until.deny(pos.timestamp)
148 || limit.deny(records.len() + 1, records.metered_size() + record.metered_size()) {
149 if records.is_empty() {
150 break 'session;
151 } else {
152 break;
153 }
154 }
155
156 if records.len() == caps::RECORD_BATCH_MAX.count
157 || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
158 {
159 let new_records_buf = Metered::with_capacity(
160 limit.count()
161 .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
162 .min(caps::RECORD_BATCH_MAX.count),
163 );
164 yield state.on_batch(ReadBatch {
165 records: std::mem::replace(&mut records, new_records_buf),
166 tail: None,
167 });
168 }
169
170 records.push(record);
171 }
172
173 if !records.is_empty() {
174 yield state.on_batch(ReadBatch {
175 records,
176 tail: None,
177 });
178 }
179 } else {
180 assert_eq!(state.start_seq_num, state.tail.seq_num);
181 if !end.may_follow() {
182 break;
183 }
184 match client.follow(state.start_seq_num).await? {
185 Ok(mut follow_rx) => {
186 yield ReadSessionOutput::Heartbeat(state.tail);
187 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
188 tokio::select! {
189 biased;
190 msg = follow_rx.recv() => {
191 match msg {
192 Ok(mut records) => {
193 let count = records.len();
194 let tail = super::streamer::next_pos(&records);
195 let allowed_count = count_allowed_records(limit, end.until, &records);
196 if allowed_count > 0 {
197 yield state.on_batch(ReadBatch {
198 records: records.drain(..allowed_count).collect(),
199 tail: Some(tail),
200 });
201 }
202 if allowed_count < count {
203 break 'session;
204 }
205 }
206 Err(broadcast::error::RecvError::Lagged(_)) => {
207 continue 'session;
209 }
210 Err(broadcast::error::RecvError::Closed) => {
211 break;
212 }
213 }
214 }
215 _ = new_heartbeat_sleep() => {
216 yield ReadSessionOutput::Heartbeat(state.tail);
217 }
218 _ = wait_sleep(end.wait) => {
219 break 'session;
220 }
221 }
222 }
223 Err(StreamerMissingInActionError)?;
224 }
225 Err(tail) => {
226 assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
227 state.tail = tail;
228 }
229 }
230 }
231 }
232 };
233 Ok(session)
234 }
235
236 pub(super) async fn resolve_timestamp(
237 &self,
238 stream_id: StreamId,
239 timestamp: Timestamp,
240 ) -> Result<Option<StreamPosition>, StorageError> {
241 let start_key = kv::stream_record_timestamp::ser_key(
242 stream_id,
243 StreamPosition {
244 seq_num: SeqNum::MIN,
245 timestamp,
246 },
247 );
248 let end_key = kv::stream_record_timestamp::ser_key(
249 stream_id,
250 StreamPosition {
251 seq_num: SeqNum::MAX,
252 timestamp: Timestamp::MAX,
253 },
254 );
255 static SCAN_OPTS: ScanOptions = ScanOptions {
256 durability_filter: DurabilityLevel::Remote,
257 dirty: false,
258 read_ahead_bytes: 1,
259 cache_blocks: false,
260 max_fetch_tasks: 1,
261 };
262 let mut it = self
263 .db
264 .scan_with_options(start_key..end_key, &SCAN_OPTS)
265 .await?;
266 Ok(match it.next().await? {
267 Some(kv) => {
268 let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
269 assert_eq!(deser_stream_id, stream_id);
270 assert!(pos.timestamp >= timestamp);
271 kv::stream_record_timestamp::deser_value(kv.value)?;
272 Some(StreamPosition {
273 seq_num: pos.seq_num,
274 timestamp: pos.timestamp,
275 })
276 }
277 None => None,
278 })
279 }
280}
281
282struct ReadSessionState {
283 start_seq_num: u64,
284 limit: EvaluatedReadLimit,
285 until: ReadUntil,
286 tail: StreamPosition,
287}
288
289impl ReadSessionState {
290 fn on_batch(&mut self, batch: ReadBatch) -> ReadSessionOutput {
291 if let Some(tail) = batch.tail {
292 self.tail = tail;
293 }
294 let last_record = batch.records.last().expect("non-empty");
295 let EvaluatedReadLimit::Remaining(limit) = self.limit else {
296 panic!("batch after exhausted limit");
297 };
298 let count = batch.records.len();
299 let bytes = batch.records.metered_size();
300 assert!(limit.allow(count, bytes));
301 assert!(self.until.allow(last_record.position.timestamp));
302 self.start_seq_num = last_record.position.seq_num + 1;
303 self.limit = limit.remaining(count, bytes);
304 ReadSessionOutput::Batch(batch)
305 }
306}
307
308fn count_allowed_records(
309 limit: ReadLimit,
310 until: ReadUntil,
311 records: &[Metered<SequencedRecord>],
312) -> usize {
313 let mut acc_size = 0;
314 let mut acc_count = 0;
315 for record in records {
316 if limit.deny(acc_count + 1, acc_size + record.metered_size())
317 || until.deny(record.position.timestamp)
318 {
319 break;
320 }
321 acc_count += 1;
322 acc_size += record.metered_size();
323 }
324 acc_count
325}
326
327fn new_heartbeat_sleep() -> tokio::time::Sleep {
328 tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
329}
330
331async fn wait_sleep(wait: Option<Duration>) {
332 match wait {
333 Some(wait) => tokio::time::sleep(wait).await,
334 None => {
335 std::future::pending::<()>().await;
336 }
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use std::sync::Arc;
343
344 use bytesize::ByteSize;
345 use slatedb::{Db, object_store::memory::InMemory};
346
347 use super::*;
348
349 #[tokio::test]
350 async fn resolve_timestamp_bounded_to_stream() {
351 let object_store = Arc::new(InMemory::new());
352 let db = Db::builder("/test", object_store).build().await.unwrap();
353 let backend = Backend::new(db, ByteSize::mib(10));
354
355 let stream_a: StreamId = [0u8; 32].into();
356 let stream_b: StreamId = [1u8; 32].into();
357
358 backend
359 .db
360 .put(
361 kv::stream_record_timestamp::ser_key(
362 stream_a,
363 StreamPosition {
364 seq_num: 0,
365 timestamp: 1000,
366 },
367 ),
368 kv::stream_record_timestamp::ser_value(),
369 )
370 .await
371 .unwrap();
372 backend
373 .db
374 .put(
375 kv::stream_record_timestamp::ser_key(
376 stream_b,
377 StreamPosition {
378 seq_num: 0,
379 timestamp: 2000,
380 },
381 ),
382 kv::stream_record_timestamp::ser_value(),
383 )
384 .await
385 .unwrap();
386
387 let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
389 assert_eq!(
390 result,
391 Some(StreamPosition {
392 seq_num: 0,
393 timestamp: 1000
394 })
395 );
396
397 let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
399 assert_eq!(result, None);
400 }
401}