1use std::time::Duration;
2
3use futures::Stream;
4use s2_common::{
5 caps,
6 read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
7 record::{Metered, MeteredSize as _, SeqNum, SequencedRecord, StreamPosition, Timestamp},
8 types::{
9 basin::BasinName,
10 stream::{ReadBatch, ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11 },
12};
13use slatedb::config::{DurabilityLevel, ScanOptions};
14use tokio::sync::broadcast;
15
16use super::Backend;
17use crate::backend::{
18 error::{
19 CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
20 },
21 kv,
22 stream_id::StreamId,
23};
24
25impl Backend {
26 async fn read_start_seq_num(
27 &self,
28 stream_id: StreamId,
29 start: ReadStart,
30 end: ReadEnd,
31 tail: StreamPosition,
32 ) -> Result<SeqNum, ReadError> {
33 let mut read_pos = match start.from {
34 s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
35 s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
36 ReadPosition::Timestamp(timestamp)
37 }
38 s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
39 ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
40 }
41 };
42 if match read_pos {
43 ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
44 ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
45 } {
46 if start.clamp {
47 read_pos = ReadPosition::SeqNum(tail.seq_num);
48 } else {
49 return Err(UnwrittenError(tail).into());
50 }
51 }
52 if let ReadPosition::SeqNum(start_seq_num) = read_pos
53 && start_seq_num == tail.seq_num
54 && !end.may_follow()
55 {
56 return Err(UnwrittenError(tail).into());
57 }
58 Ok(match read_pos {
59 ReadPosition::SeqNum(start_seq_num) => start_seq_num,
60 ReadPosition::Timestamp(start_timestamp) => {
61 self.resolve_timestamp(stream_id, start_timestamp)
62 .await?
63 .unwrap_or(tail)
64 .seq_num
65 }
66 })
67 }
68
69 pub async fn check_tail(
70 &self,
71 basin: BasinName,
72 stream: StreamName,
73 ) -> Result<StreamPosition, CheckTailError> {
74 let client = self
75 .streamer_client_with_auto_create::<CheckTailError>(&basin, &stream, |config| {
76 config.create_stream_on_read
77 })
78 .await?;
79 let tail = client.check_tail().await?;
80 Ok(tail)
81 }
82
83 pub async fn read(
84 &self,
85 basin: BasinName,
86 stream: StreamName,
87 start: ReadStart,
88 end: ReadEnd,
89 ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
90 let client = self
91 .streamer_client_with_auto_create::<ReadError>(&basin, &stream, |config| {
92 config.create_stream_on_read
93 })
94 .await?;
95 let stream_id = client.stream_id();
96 let tail = client.check_tail().await?;
97 let mut state = ReadSessionState {
98 start_seq_num: self.read_start_seq_num(stream_id, start, end, tail).await?,
99 limit: EvaluatedReadLimit::Remaining(end.limit),
100 until: end.until,
101 tail,
102 };
103 let db = self.db.clone();
104 let session = async_stream::try_stream! {
105 'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
106 if state.start_seq_num < state.tail.seq_num {
107 let start_key = kv::stream_record_data::ser_key(
108 stream_id,
109 StreamPosition {
110 seq_num: state.start_seq_num,
111 timestamp: 0,
112 },
113 );
114 let end_key = kv::stream_record_data::ser_key(
115 stream_id,
116 StreamPosition {
117 seq_num: state.tail.seq_num,
118 timestamp: 0,
119 },
120 );
121 static SCAN_OPTS: ScanOptions = ScanOptions {
122 durability_filter: DurabilityLevel::Remote,
123 dirty: false,
124 read_ahead_bytes: 1024 * 1024,
125 cache_blocks: true,
126 max_fetch_tasks: 8,
127 };
128 let mut it = db
129 .scan_with_options(start_key..end_key, &SCAN_OPTS)
130 .await?;
131
132 let mut records = Metered::with_capacity(
133 limit.count()
134 .unwrap_or(usize::MAX)
135 .min(caps::RECORD_BATCH_MAX.count),
136 );
137
138 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
139 let Some(kv) = it.next().await? else {
140 break;
141 };
142 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
143 assert_eq!(deser_stream_id, stream_id);
144
145 let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
146
147 if end.until.deny(pos.timestamp)
148 || limit.deny(records.len() + 1, records.metered_size() + record.metered_size()) {
149 if records.is_empty() {
150 break 'session;
151 } else {
152 break;
153 }
154 }
155
156 if records.len() == caps::RECORD_BATCH_MAX.count
157 || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
158 {
159 let new_records_buf = Metered::with_capacity(
160 limit.count()
161 .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
162 .min(caps::RECORD_BATCH_MAX.count),
163 );
164 yield state.on_batch(ReadBatch {
165 records: std::mem::replace(&mut records, new_records_buf),
166 tail: None,
167 });
168 }
169
170 records.push(record);
171 }
172
173 if !records.is_empty() {
174 yield state.on_batch(ReadBatch {
175 records,
176 tail: None,
177 });
178 } else {
179 state.start_seq_num = state.tail.seq_num;
180 }
181 } else {
182 assert_eq!(state.start_seq_num, state.tail.seq_num);
183 if !end.may_follow() {
184 break;
185 }
186 match client.follow(state.start_seq_num).await? {
187 Ok(mut follow_rx) => {
188 yield ReadSessionOutput::Heartbeat(state.tail);
189 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
190 tokio::select! {
191 biased;
192 msg = follow_rx.recv() => {
193 match msg {
194 Ok(mut records) => {
195 let count = records.len();
196 let tail = super::streamer::next_pos(&records);
197 let allowed_count = count_allowed_records(limit, end.until, &records);
198 if allowed_count > 0 {
199 yield state.on_batch(ReadBatch {
200 records: records.drain(..allowed_count).collect(),
201 tail: Some(tail),
202 });
203 }
204 if allowed_count < count {
205 break 'session;
206 }
207 Ok(())
208 }
209 Err(broadcast::error::RecvError::Lagged(_)) => {
210 continue 'session;
212 }
213 Err(broadcast::error::RecvError::Closed) => {
214 Err(StreamerMissingInActionError)
215 }
216 }
217 }
218 _ = new_heartbeat_sleep() => {
219 yield ReadSessionOutput::Heartbeat(state.tail);
220 Ok(())
221 }
222 _ = wait_sleep(end.wait) => {
223 break 'session;
224 }
225 }?;
226 }
227 }
228 Err(tail) => {
229 assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
230 state.tail = tail;
231 }
232 }
233 }
234 }
235 };
236 Ok(session)
237 }
238
239 pub(super) async fn resolve_timestamp(
240 &self,
241 stream_id: StreamId,
242 timestamp: Timestamp,
243 ) -> Result<Option<StreamPosition>, StorageError> {
244 let start_key = kv::stream_record_timestamp::ser_key(
245 stream_id,
246 StreamPosition {
247 seq_num: SeqNum::MIN,
248 timestamp,
249 },
250 );
251 let end_key = kv::stream_record_timestamp::ser_key(
252 stream_id,
253 StreamPosition {
254 seq_num: SeqNum::MAX,
255 timestamp: Timestamp::MAX,
256 },
257 );
258 static SCAN_OPTS: ScanOptions = ScanOptions {
259 durability_filter: DurabilityLevel::Remote,
260 dirty: false,
261 read_ahead_bytes: 1,
262 cache_blocks: false,
263 max_fetch_tasks: 1,
264 };
265 let mut it = self
266 .db
267 .scan_with_options(start_key..end_key, &SCAN_OPTS)
268 .await?;
269 Ok(match it.next().await? {
270 Some(kv) => {
271 let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
272 assert_eq!(deser_stream_id, stream_id);
273 assert!(pos.timestamp >= timestamp);
274 kv::stream_record_timestamp::deser_value(kv.value)?;
275 Some(StreamPosition {
276 seq_num: pos.seq_num,
277 timestamp: pos.timestamp,
278 })
279 }
280 None => None,
281 })
282 }
283}
284
285struct ReadSessionState {
286 start_seq_num: u64,
287 limit: EvaluatedReadLimit,
288 until: ReadUntil,
289 tail: StreamPosition,
290}
291
292impl ReadSessionState {
293 fn on_batch(&mut self, batch: ReadBatch) -> ReadSessionOutput {
294 if let Some(tail) = batch.tail {
295 self.tail = tail;
296 }
297 let last_record = batch.records.last().expect("non-empty");
298 let EvaluatedReadLimit::Remaining(limit) = self.limit else {
299 panic!("batch after exhausted limit");
300 };
301 let count = batch.records.len();
302 let bytes = batch.records.metered_size();
303 assert!(limit.allow(count, bytes));
304 assert!(self.until.allow(last_record.position.timestamp));
305 self.start_seq_num = last_record.position.seq_num + 1;
306 self.limit = limit.remaining(count, bytes);
307 ReadSessionOutput::Batch(batch)
308 }
309}
310
311fn count_allowed_records(
312 limit: ReadLimit,
313 until: ReadUntil,
314 records: &[Metered<SequencedRecord>],
315) -> usize {
316 let mut acc_size = 0;
317 let mut acc_count = 0;
318 for record in records {
319 if limit.deny(acc_count + 1, acc_size + record.metered_size())
320 || until.deny(record.position.timestamp)
321 {
322 break;
323 }
324 acc_count += 1;
325 acc_size += record.metered_size();
326 }
327 acc_count
328}
329
330fn new_heartbeat_sleep() -> tokio::time::Sleep {
331 tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
332}
333
334async fn wait_sleep(wait: Option<Duration>) {
335 match wait {
336 Some(wait) => tokio::time::sleep(wait).await,
337 None => {
338 std::future::pending::<()>().await;
339 }
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use std::sync::Arc;
346
347 use bytesize::ByteSize;
348 use s2_common::{
349 read_extent::{ReadLimit, ReadUntil},
350 types::{
351 basin::BasinName,
352 config::OptionalStreamConfig,
353 resources::CreateMode,
354 stream::{
355 AppendInput, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom, ReadStart,
356 },
357 },
358 };
359 use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
360
361 use super::*;
362 use crate::backend::{kv, stream_id::StreamId};
363
364 #[tokio::test]
365 async fn resolve_timestamp_bounded_to_stream() {
366 let object_store = Arc::new(InMemory::new());
367 let db = Db::builder("/test", object_store).build().await.unwrap();
368 let backend = Backend::new(db, ByteSize::mib(10));
369
370 let stream_a: StreamId = [0u8; 32].into();
371 let stream_b: StreamId = [1u8; 32].into();
372
373 backend
374 .db
375 .put(
376 kv::stream_record_timestamp::ser_key(
377 stream_a,
378 StreamPosition {
379 seq_num: 0,
380 timestamp: 1000,
381 },
382 ),
383 kv::stream_record_timestamp::ser_value(),
384 )
385 .await
386 .unwrap();
387 backend
388 .db
389 .put(
390 kv::stream_record_timestamp::ser_key(
391 stream_b,
392 StreamPosition {
393 seq_num: 0,
394 timestamp: 2000,
395 },
396 ),
397 kv::stream_record_timestamp::ser_value(),
398 )
399 .await
400 .unwrap();
401
402 let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
404 assert_eq!(
405 result,
406 Some(StreamPosition {
407 seq_num: 0,
408 timestamp: 1000
409 })
410 );
411
412 let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
414 assert_eq!(result, None);
415 }
416
417 #[tokio::test]
418 async fn read_completes_when_all_records_deleted() {
419 let object_store = Arc::new(InMemory::new());
420 let db = Db::builder("/test", object_store).build().await.unwrap();
421 let backend = Backend::new(db, ByteSize::mib(10));
422
423 let basin: BasinName = "test-basin".parse().unwrap();
424 backend
425 .create_basin(
426 basin.clone(),
427 Default::default(),
428 CreateMode::CreateOnly(None),
429 )
430 .await
431 .unwrap();
432 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
433 backend
434 .create_stream(
435 basin.clone(),
436 stream.clone(),
437 OptionalStreamConfig::default(),
438 CreateMode::CreateOnly(None),
439 )
440 .await
441 .unwrap();
442
443 let record =
444 s2_common::record::Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap();
445 let metered: s2_common::record::Metered<s2_common::record::Record> = record.into();
446 let parts = AppendRecordParts {
447 timestamp: None,
448 record: metered,
449 };
450 let append_record: s2_common::types::stream::AppendRecord = parts.try_into().unwrap();
451 let batch: AppendRecordBatch = vec![append_record].try_into().unwrap();
452 let input = AppendInput {
453 records: batch,
454 match_seq_num: None,
455 fencing_token: None,
456 };
457 let ack = backend
458 .append(basin.clone(), stream.clone(), input)
459 .await
460 .unwrap();
461 assert!(ack.end.seq_num > 0);
462
463 let stream_id = StreamId::new(&basin, &stream);
464 let mut batch = WriteBatch::new();
465 batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
466 static WRITE_OPTS: WriteOptions = WriteOptions {
467 await_durable: true,
468 };
469 backend
470 .db
471 .write_with_options(batch, &WRITE_OPTS)
472 .await
473 .unwrap();
474
475 let start = ReadStart {
476 from: ReadFrom::SeqNum(0),
477 clamp: false,
478 };
479 let end = ReadEnd {
480 limit: ReadLimit::Count(10),
481 until: ReadUntil::Unbounded,
482 wait: None,
483 };
484 let session = backend.read(basin, stream, start, end).await.unwrap();
485 let records: Vec<_> = tokio::time::timeout(
486 Duration::from_secs(2),
487 futures::StreamExt::collect::<Vec<_>>(session),
488 )
489 .await
490 .expect("read should not spin forever");
491 assert!(records.into_iter().all(|r| r.is_ok()));
492 }
493}