1use std::time::Duration;
2
3use futures::Stream;
4use s2_common::{
5 caps,
6 read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
7 record::{Metered, MeteredSize as _, SeqNum, SequencedRecord, StreamPosition, Timestamp},
8 types::{
9 basin::BasinName,
10 stream::{ReadBatch, ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11 },
12};
13use slatedb::config::{DurabilityLevel, ScanOptions};
14use tokio::sync::broadcast;
15
16use super::Backend;
17use crate::backend::{
18 error::{
19 CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
20 },
21 kv,
22 stream_id::StreamId,
23};
24
25impl Backend {
26 async fn read_start_seq_num(
27 &self,
28 stream_id: StreamId,
29 start: ReadStart,
30 end: ReadEnd,
31 tail: StreamPosition,
32 ) -> Result<SeqNum, ReadError> {
33 let mut read_pos = match start.from {
34 s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
35 s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
36 ReadPosition::Timestamp(timestamp)
37 }
38 s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
39 ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
40 }
41 };
42 if match read_pos {
43 ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
44 ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
45 } {
46 if start.clamp {
47 read_pos = ReadPosition::SeqNum(tail.seq_num);
48 } else {
49 return Err(UnwrittenError(tail).into());
50 }
51 }
52 if let ReadPosition::SeqNum(start_seq_num) = read_pos
53 && start_seq_num == tail.seq_num
54 && !end.may_follow()
55 {
56 return Err(UnwrittenError(tail).into());
57 }
58 Ok(match read_pos {
59 ReadPosition::SeqNum(start_seq_num) => start_seq_num,
60 ReadPosition::Timestamp(start_timestamp) => {
61 self.resolve_timestamp(stream_id, start_timestamp)
62 .await?
63 .unwrap_or(tail)
64 .seq_num
65 }
66 })
67 }
68
69 pub async fn check_tail(
70 &self,
71 basin: BasinName,
72 stream: StreamName,
73 ) -> Result<StreamPosition, CheckTailError> {
74 let client = self
75 .streamer_client_with_auto_create::<CheckTailError>(&basin, &stream, |config| {
76 config.create_stream_on_read
77 })
78 .await?;
79 let tail = client.check_tail().await?;
80 Ok(tail)
81 }
82
83 pub async fn read(
84 &self,
85 basin: BasinName,
86 stream: StreamName,
87 start: ReadStart,
88 end: ReadEnd,
89 ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
90 let client = self
91 .streamer_client_with_auto_create::<ReadError>(&basin, &stream, |config| {
92 config.create_stream_on_read
93 })
94 .await?;
95 let stream_id = client.stream_id();
96 let tail = client.check_tail().await?;
97 let mut state = ReadSessionState {
98 start_seq_num: self.read_start_seq_num(stream_id, start, end, tail).await?,
99 limit: EvaluatedReadLimit::Remaining(end.limit),
100 until: end.until,
101 tail,
102 };
103 let db = self.db.clone();
104 let session = async_stream::try_stream! {
105 'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
106 if state.start_seq_num < state.tail.seq_num {
107 let start_key = kv::stream_record_data::ser_key(
108 stream_id,
109 StreamPosition {
110 seq_num: state.start_seq_num,
111 timestamp: 0,
112 },
113 );
114 let end_key = kv::stream_record_data::ser_key(
115 stream_id,
116 StreamPosition {
117 seq_num: state.tail.seq_num,
118 timestamp: 0,
119 },
120 );
121 static SCAN_OPTS: ScanOptions = ScanOptions {
122 durability_filter: DurabilityLevel::Remote,
123 dirty: false,
124 read_ahead_bytes: 1024 * 1024,
125 cache_blocks: true,
126 max_fetch_tasks: 8,
127 };
128 let mut it = db
129 .scan_with_options(start_key..end_key, &SCAN_OPTS)
130 .await?;
131
132 let mut records = Metered::with_capacity(
133 limit.count()
134 .unwrap_or(usize::MAX)
135 .min(caps::RECORD_BATCH_MAX.count),
136 );
137
138 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
139 let Some(kv) = it.next().await? else {
140 break;
141 };
142 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
143 assert_eq!(deser_stream_id, stream_id);
144
145 let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
146
147 if end.until.deny(pos.timestamp)
148 || limit.deny(records.len() + 1, records.metered_size() + record.metered_size()) {
149 if records.is_empty() {
150 break 'session;
151 } else {
152 break;
153 }
154 }
155
156 if records.len() == caps::RECORD_BATCH_MAX.count
157 || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
158 {
159 let new_records_buf = Metered::with_capacity(
160 limit.count()
161 .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
162 .min(caps::RECORD_BATCH_MAX.count),
163 );
164 yield state.on_batch(ReadBatch {
165 records: std::mem::replace(&mut records, new_records_buf),
166 tail: None,
167 });
168 }
169
170 records.push(record);
171 }
172
173 if !records.is_empty() {
174 yield state.on_batch(ReadBatch {
175 records,
176 tail: None,
177 });
178 } else {
179 state.start_seq_num = state.tail.seq_num;
180 }
181 } else {
182 assert_eq!(state.start_seq_num, state.tail.seq_num);
183 if !end.may_follow() {
184 break;
185 }
186 match client.follow(state.start_seq_num).await? {
187 Ok(mut follow_rx) => {
188 yield ReadSessionOutput::Heartbeat(state.tail);
189 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
190 tokio::select! {
191 biased;
192 msg = follow_rx.recv() => {
193 match msg {
194 Ok(mut records) => {
195 let count = records.len();
196 let tail = super::streamer::next_pos(&records);
197 let allowed_count = count_allowed_records(limit, end.until, &records);
198 if allowed_count > 0 {
199 yield state.on_batch(ReadBatch {
200 records: records.drain(..allowed_count).collect(),
201 tail: Some(tail),
202 });
203 }
204 if allowed_count < count {
205 break 'session;
206 }
207 }
208 Err(broadcast::error::RecvError::Lagged(_)) => {
209 continue 'session;
211 }
212 Err(broadcast::error::RecvError::Closed) => {
213 break;
214 }
215 }
216 }
217 _ = new_heartbeat_sleep() => {
218 yield ReadSessionOutput::Heartbeat(state.tail);
219 }
220 _ = wait_sleep(end.wait) => {
221 break 'session;
222 }
223 }
224 }
225 Err(StreamerMissingInActionError)?;
226 }
227 Err(tail) => {
228 assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
229 state.tail = tail;
230 }
231 }
232 }
233 }
234 };
235 Ok(session)
236 }
237
238 pub(super) async fn resolve_timestamp(
239 &self,
240 stream_id: StreamId,
241 timestamp: Timestamp,
242 ) -> Result<Option<StreamPosition>, StorageError> {
243 let start_key = kv::stream_record_timestamp::ser_key(
244 stream_id,
245 StreamPosition {
246 seq_num: SeqNum::MIN,
247 timestamp,
248 },
249 );
250 let end_key = kv::stream_record_timestamp::ser_key(
251 stream_id,
252 StreamPosition {
253 seq_num: SeqNum::MAX,
254 timestamp: Timestamp::MAX,
255 },
256 );
257 static SCAN_OPTS: ScanOptions = ScanOptions {
258 durability_filter: DurabilityLevel::Remote,
259 dirty: false,
260 read_ahead_bytes: 1,
261 cache_blocks: false,
262 max_fetch_tasks: 1,
263 };
264 let mut it = self
265 .db
266 .scan_with_options(start_key..end_key, &SCAN_OPTS)
267 .await?;
268 Ok(match it.next().await? {
269 Some(kv) => {
270 let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
271 assert_eq!(deser_stream_id, stream_id);
272 assert!(pos.timestamp >= timestamp);
273 kv::stream_record_timestamp::deser_value(kv.value)?;
274 Some(StreamPosition {
275 seq_num: pos.seq_num,
276 timestamp: pos.timestamp,
277 })
278 }
279 None => None,
280 })
281 }
282}
283
284struct ReadSessionState {
285 start_seq_num: u64,
286 limit: EvaluatedReadLimit,
287 until: ReadUntil,
288 tail: StreamPosition,
289}
290
291impl ReadSessionState {
292 fn on_batch(&mut self, batch: ReadBatch) -> ReadSessionOutput {
293 if let Some(tail) = batch.tail {
294 self.tail = tail;
295 }
296 let last_record = batch.records.last().expect("non-empty");
297 let EvaluatedReadLimit::Remaining(limit) = self.limit else {
298 panic!("batch after exhausted limit");
299 };
300 let count = batch.records.len();
301 let bytes = batch.records.metered_size();
302 assert!(limit.allow(count, bytes));
303 assert!(self.until.allow(last_record.position.timestamp));
304 self.start_seq_num = last_record.position.seq_num + 1;
305 self.limit = limit.remaining(count, bytes);
306 ReadSessionOutput::Batch(batch)
307 }
308}
309
310fn count_allowed_records(
311 limit: ReadLimit,
312 until: ReadUntil,
313 records: &[Metered<SequencedRecord>],
314) -> usize {
315 let mut acc_size = 0;
316 let mut acc_count = 0;
317 for record in records {
318 if limit.deny(acc_count + 1, acc_size + record.metered_size())
319 || until.deny(record.position.timestamp)
320 {
321 break;
322 }
323 acc_count += 1;
324 acc_size += record.metered_size();
325 }
326 acc_count
327}
328
329fn new_heartbeat_sleep() -> tokio::time::Sleep {
330 tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
331}
332
333async fn wait_sleep(wait: Option<Duration>) {
334 match wait {
335 Some(wait) => tokio::time::sleep(wait).await,
336 None => {
337 std::future::pending::<()>().await;
338 }
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use std::sync::Arc;
345
346 use bytesize::ByteSize;
347 use s2_common::{
348 read_extent::{ReadLimit, ReadUntil},
349 types::{
350 basin::BasinName,
351 config::OptionalStreamConfig,
352 resources::CreateMode,
353 stream::{
354 AppendInput, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom, ReadStart,
355 },
356 },
357 };
358 use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
359
360 use super::*;
361 use crate::backend::{kv, stream_id::StreamId};
362
363 #[tokio::test]
364 async fn resolve_timestamp_bounded_to_stream() {
365 let object_store = Arc::new(InMemory::new());
366 let db = Db::builder("/test", object_store).build().await.unwrap();
367 let backend = Backend::new(db, ByteSize::mib(10));
368
369 let stream_a: StreamId = [0u8; 32].into();
370 let stream_b: StreamId = [1u8; 32].into();
371
372 backend
373 .db
374 .put(
375 kv::stream_record_timestamp::ser_key(
376 stream_a,
377 StreamPosition {
378 seq_num: 0,
379 timestamp: 1000,
380 },
381 ),
382 kv::stream_record_timestamp::ser_value(),
383 )
384 .await
385 .unwrap();
386 backend
387 .db
388 .put(
389 kv::stream_record_timestamp::ser_key(
390 stream_b,
391 StreamPosition {
392 seq_num: 0,
393 timestamp: 2000,
394 },
395 ),
396 kv::stream_record_timestamp::ser_value(),
397 )
398 .await
399 .unwrap();
400
401 let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
403 assert_eq!(
404 result,
405 Some(StreamPosition {
406 seq_num: 0,
407 timestamp: 1000
408 })
409 );
410
411 let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
413 assert_eq!(result, None);
414 }
415
416 #[tokio::test]
417 async fn read_completes_when_all_records_deleted() {
418 let object_store = Arc::new(InMemory::new());
419 let db = Db::builder("/test", object_store).build().await.unwrap();
420 let backend = Backend::new(db, ByteSize::mib(10));
421
422 let basin: BasinName = "test-basin".parse().unwrap();
423 backend
424 .create_basin(
425 basin.clone(),
426 Default::default(),
427 CreateMode::CreateOnly(None),
428 )
429 .await
430 .unwrap();
431 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
432 backend
433 .create_stream(
434 basin.clone(),
435 stream.clone(),
436 OptionalStreamConfig::default(),
437 CreateMode::CreateOnly(None),
438 )
439 .await
440 .unwrap();
441
442 let record =
443 s2_common::record::Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap();
444 let metered: s2_common::record::Metered<s2_common::record::Record> = record.into();
445 let parts = AppendRecordParts {
446 timestamp: None,
447 record: metered,
448 };
449 let append_record: s2_common::types::stream::AppendRecord = parts.try_into().unwrap();
450 let batch: AppendRecordBatch = vec![append_record].try_into().unwrap();
451 let input = AppendInput {
452 records: batch,
453 match_seq_num: None,
454 fencing_token: None,
455 };
456 let ack = backend
457 .append(basin.clone(), stream.clone(), input)
458 .await
459 .unwrap();
460 assert!(ack.end.seq_num > 0);
461
462 let stream_id = StreamId::new(&basin, &stream);
463 let mut batch = WriteBatch::new();
464 batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
465 static WRITE_OPTS: WriteOptions = WriteOptions {
466 await_durable: true,
467 };
468 backend
469 .db
470 .write_with_options(batch, &WRITE_OPTS)
471 .await
472 .unwrap();
473
474 let start = ReadStart {
475 from: ReadFrom::SeqNum(0),
476 clamp: false,
477 };
478 let end = ReadEnd {
479 limit: ReadLimit::Count(10),
480 until: ReadUntil::Unbounded,
481 wait: None,
482 };
483 let session = backend.read(basin, stream, start, end).await.unwrap();
484 let records: Vec<_> = tokio::time::timeout(
485 Duration::from_secs(2),
486 futures::StreamExt::collect::<Vec<_>>(session),
487 )
488 .await
489 .expect("read should not spin forever");
490 assert!(records.into_iter().all(|r| r.is_ok()));
491 }
492}