Skip to main content

s2_lite/backend/
read.rs

1use std::time::Duration;
2
3use futures::Stream;
4use s2_common::{
5    caps,
6    read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
7    record::{Metered, MeteredSize as _, SeqNum, SequencedRecord, StreamPosition, Timestamp},
8    types::{
9        basin::BasinName,
10        stream::{ReadBatch, ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11    },
12};
13use slatedb::config::{DurabilityLevel, ScanOptions};
14use tokio::sync::broadcast;
15
16use super::Backend;
17use crate::backend::{
18    error::{
19        CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
20    },
21    kv,
22    stream_id::StreamId,
23};
24
25impl Backend {
26    async fn read_start_seq_num(
27        &self,
28        stream_id: StreamId,
29        start: ReadStart,
30        end: ReadEnd,
31        tail: StreamPosition,
32    ) -> Result<SeqNum, ReadError> {
33        let mut read_pos = match start.from {
34            s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
35            s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
36                ReadPosition::Timestamp(timestamp)
37            }
38            s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
39                ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
40            }
41        };
42        if match read_pos {
43            ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
44            ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
45        } {
46            if start.clamp {
47                read_pos = ReadPosition::SeqNum(tail.seq_num);
48            } else {
49                return Err(UnwrittenError(tail).into());
50            }
51        }
52        if let ReadPosition::SeqNum(start_seq_num) = read_pos
53            && start_seq_num == tail.seq_num
54            && !end.may_follow()
55        {
56            return Err(UnwrittenError(tail).into());
57        }
58        Ok(match read_pos {
59            ReadPosition::SeqNum(start_seq_num) => start_seq_num,
60            ReadPosition::Timestamp(start_timestamp) => {
61                self.resolve_timestamp(stream_id, start_timestamp)
62                    .await?
63                    .unwrap_or(tail)
64                    .seq_num
65            }
66        })
67    }
68
69    pub async fn check_tail(
70        &self,
71        basin: BasinName,
72        stream: StreamName,
73    ) -> Result<StreamPosition, CheckTailError> {
74        let client = self
75            .streamer_client_with_auto_create::<CheckTailError>(&basin, &stream, |config| {
76                config.create_stream_on_read
77            })
78            .await?;
79        let tail = client.check_tail().await?;
80        Ok(tail)
81    }
82
83    pub async fn read(
84        &self,
85        basin: BasinName,
86        stream: StreamName,
87        start: ReadStart,
88        end: ReadEnd,
89    ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
90        let client = self
91            .streamer_client_with_auto_create::<ReadError>(&basin, &stream, |config| {
92                config.create_stream_on_read
93            })
94            .await?;
95        let stream_id = client.stream_id();
96        let tail = client.check_tail().await?;
97        let mut state = ReadSessionState {
98            start_seq_num: self.read_start_seq_num(stream_id, start, end, tail).await?,
99            limit: EvaluatedReadLimit::Remaining(end.limit),
100            until: end.until,
101            tail,
102        };
103        let db = self.db.clone();
104        let session = async_stream::try_stream! {
105            'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
106                if state.start_seq_num < state.tail.seq_num {
107                    let start_key = kv::stream_record_data::ser_key(
108                        stream_id,
109                        StreamPosition {
110                            seq_num: state.start_seq_num,
111                            timestamp: 0,
112                        },
113                    );
114                    let end_key = kv::stream_record_data::ser_key(
115                        stream_id,
116                        StreamPosition {
117                            seq_num: state.tail.seq_num,
118                            timestamp: 0,
119                        },
120                    );
121                    static SCAN_OPTS: ScanOptions = ScanOptions {
122                        durability_filter: DurabilityLevel::Remote,
123                        dirty: false,
124                        read_ahead_bytes: 1024 * 1024,
125                        cache_blocks: true,
126                        max_fetch_tasks: 8,
127                    };
128                    let mut it = db
129                        .scan_with_options(start_key..end_key, &SCAN_OPTS)
130                        .await?;
131
132                    let mut records = Metered::with_capacity(
133                        limit.count()
134                            .unwrap_or(usize::MAX)
135                            .min(caps::RECORD_BATCH_MAX.count),
136                    );
137
138                    while let EvaluatedReadLimit::Remaining(limit) = state.limit {
139                        let Some(kv) = it.next().await? else {
140                            break;
141                        };
142                        let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
143                        assert_eq!(deser_stream_id, stream_id);
144
145                        let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
146
147                        if end.until.deny(pos.timestamp)
148                            || limit.deny(records.len() + 1, records.metered_size() + record.metered_size()) {
149                            if records.is_empty() {
150                                break 'session;
151                            } else {
152                                break;
153                            }
154                        }
155
156                        if records.len() == caps::RECORD_BATCH_MAX.count
157                            || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
158                        {
159                            let new_records_buf = Metered::with_capacity(
160                                limit.count()
161                                    .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
162                                    .min(caps::RECORD_BATCH_MAX.count),
163                            );
164                            yield state.on_batch(ReadBatch {
165                                records: std::mem::replace(&mut records, new_records_buf),
166                                tail: None,
167                            });
168                        }
169
170                        records.push(record);
171                    }
172
173                    if !records.is_empty() {
174                        yield state.on_batch(ReadBatch {
175                            records,
176                            tail: None,
177                        });
178                    } else {
179                        state.start_seq_num = state.tail.seq_num;
180                    }
181                } else {
182                    assert_eq!(state.start_seq_num, state.tail.seq_num);
183                    if !end.may_follow() {
184                        break;
185                    }
186                    match client.follow(state.start_seq_num).await? {
187                        Ok(mut follow_rx) => {
188                            yield ReadSessionOutput::Heartbeat(state.tail);
189                            while let EvaluatedReadLimit::Remaining(limit) = state.limit {
190                                tokio::select! {
191                                    biased;
192                                    msg = follow_rx.recv() => {
193                                        match msg {
194                                            Ok(mut records) => {
195                                                let count = records.len();
196                                                let tail = super::streamer::next_pos(&records);
197                                                let allowed_count = count_allowed_records(limit, end.until, &records);
198                                                if allowed_count > 0 {
199                                                    yield state.on_batch(ReadBatch {
200                                                        records: records.drain(..allowed_count).collect(),
201                                                        tail: Some(tail),
202                                                    });
203                                                }
204                                                if allowed_count < count {
205                                                    break 'session;
206                                                }
207                                            }
208                                            Err(broadcast::error::RecvError::Lagged(_)) => {
209                                                // Catch up using DB
210                                                continue 'session;
211                                            }
212                                            Err(broadcast::error::RecvError::Closed) => {
213                                                break;
214                                            }
215                                        }
216                                    }
217                                    _ = new_heartbeat_sleep() => {
218                                        yield ReadSessionOutput::Heartbeat(state.tail);
219                                    }
220                                    _ = wait_sleep(end.wait) => {
221                                        break 'session;
222                                    }
223                                }
224                            }
225                            Err(StreamerMissingInActionError)?;
226                        }
227                        Err(tail) => {
228                            assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
229                            state.tail = tail;
230                        }
231                    }
232                }
233            }
234        };
235        Ok(session)
236    }
237
238    pub(super) async fn resolve_timestamp(
239        &self,
240        stream_id: StreamId,
241        timestamp: Timestamp,
242    ) -> Result<Option<StreamPosition>, StorageError> {
243        let start_key = kv::stream_record_timestamp::ser_key(
244            stream_id,
245            StreamPosition {
246                seq_num: SeqNum::MIN,
247                timestamp,
248            },
249        );
250        let end_key = kv::stream_record_timestamp::ser_key(
251            stream_id,
252            StreamPosition {
253                seq_num: SeqNum::MAX,
254                timestamp: Timestamp::MAX,
255            },
256        );
257        static SCAN_OPTS: ScanOptions = ScanOptions {
258            durability_filter: DurabilityLevel::Remote,
259            dirty: false,
260            read_ahead_bytes: 1,
261            cache_blocks: false,
262            max_fetch_tasks: 1,
263        };
264        let mut it = self
265            .db
266            .scan_with_options(start_key..end_key, &SCAN_OPTS)
267            .await?;
268        Ok(match it.next().await? {
269            Some(kv) => {
270                let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
271                assert_eq!(deser_stream_id, stream_id);
272                assert!(pos.timestamp >= timestamp);
273                kv::stream_record_timestamp::deser_value(kv.value)?;
274                Some(StreamPosition {
275                    seq_num: pos.seq_num,
276                    timestamp: pos.timestamp,
277                })
278            }
279            None => None,
280        })
281    }
282}
283
284struct ReadSessionState {
285    start_seq_num: u64,
286    limit: EvaluatedReadLimit,
287    until: ReadUntil,
288    tail: StreamPosition,
289}
290
291impl ReadSessionState {
292    fn on_batch(&mut self, batch: ReadBatch) -> ReadSessionOutput {
293        if let Some(tail) = batch.tail {
294            self.tail = tail;
295        }
296        let last_record = batch.records.last().expect("non-empty");
297        let EvaluatedReadLimit::Remaining(limit) = self.limit else {
298            panic!("batch after exhausted limit");
299        };
300        let count = batch.records.len();
301        let bytes = batch.records.metered_size();
302        assert!(limit.allow(count, bytes));
303        assert!(self.until.allow(last_record.position.timestamp));
304        self.start_seq_num = last_record.position.seq_num + 1;
305        self.limit = limit.remaining(count, bytes);
306        ReadSessionOutput::Batch(batch)
307    }
308}
309
310fn count_allowed_records(
311    limit: ReadLimit,
312    until: ReadUntil,
313    records: &[Metered<SequencedRecord>],
314) -> usize {
315    let mut acc_size = 0;
316    let mut acc_count = 0;
317    for record in records {
318        if limit.deny(acc_count + 1, acc_size + record.metered_size())
319            || until.deny(record.position.timestamp)
320        {
321            break;
322        }
323        acc_count += 1;
324        acc_size += record.metered_size();
325    }
326    acc_count
327}
328
329fn new_heartbeat_sleep() -> tokio::time::Sleep {
330    tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
331}
332
333async fn wait_sleep(wait: Option<Duration>) {
334    match wait {
335        Some(wait) => tokio::time::sleep(wait).await,
336        None => {
337            std::future::pending::<()>().await;
338        }
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use std::sync::Arc;
345
346    use bytesize::ByteSize;
347    use s2_common::{
348        read_extent::{ReadLimit, ReadUntil},
349        types::{
350            basin::BasinName,
351            config::OptionalStreamConfig,
352            resources::CreateMode,
353            stream::{
354                AppendInput, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom, ReadStart,
355            },
356        },
357    };
358    use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
359
360    use super::*;
361    use crate::backend::{kv, stream_id::StreamId};
362
363    #[tokio::test]
364    async fn resolve_timestamp_bounded_to_stream() {
365        let object_store = Arc::new(InMemory::new());
366        let db = Db::builder("/test", object_store).build().await.unwrap();
367        let backend = Backend::new(db, ByteSize::mib(10));
368
369        let stream_a: StreamId = [0u8; 32].into();
370        let stream_b: StreamId = [1u8; 32].into();
371
372        backend
373            .db
374            .put(
375                kv::stream_record_timestamp::ser_key(
376                    stream_a,
377                    StreamPosition {
378                        seq_num: 0,
379                        timestamp: 1000,
380                    },
381                ),
382                kv::stream_record_timestamp::ser_value(),
383            )
384            .await
385            .unwrap();
386        backend
387            .db
388            .put(
389                kv::stream_record_timestamp::ser_key(
390                    stream_b,
391                    StreamPosition {
392                        seq_num: 0,
393                        timestamp: 2000,
394                    },
395                ),
396                kv::stream_record_timestamp::ser_value(),
397            )
398            .await
399            .unwrap();
400
401        // Should find record in stream_a
402        let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
403        assert_eq!(
404            result,
405            Some(StreamPosition {
406                seq_num: 0,
407                timestamp: 1000
408            })
409        );
410
411        // Should return None, not find stream_b's record
412        let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
413        assert_eq!(result, None);
414    }
415
416    #[tokio::test]
417    async fn read_completes_when_all_records_deleted() {
418        let object_store = Arc::new(InMemory::new());
419        let db = Db::builder("/test", object_store).build().await.unwrap();
420        let backend = Backend::new(db, ByteSize::mib(10));
421
422        let basin: BasinName = "test-basin".parse().unwrap();
423        backend
424            .create_basin(
425                basin.clone(),
426                Default::default(),
427                CreateMode::CreateOnly(None),
428            )
429            .await
430            .unwrap();
431        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
432        backend
433            .create_stream(
434                basin.clone(),
435                stream.clone(),
436                OptionalStreamConfig::default(),
437                CreateMode::CreateOnly(None),
438            )
439            .await
440            .unwrap();
441
442        let record =
443            s2_common::record::Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap();
444        let metered: s2_common::record::Metered<s2_common::record::Record> = record.into();
445        let parts = AppendRecordParts {
446            timestamp: None,
447            record: metered,
448        };
449        let append_record: s2_common::types::stream::AppendRecord = parts.try_into().unwrap();
450        let batch: AppendRecordBatch = vec![append_record].try_into().unwrap();
451        let input = AppendInput {
452            records: batch,
453            match_seq_num: None,
454            fencing_token: None,
455        };
456        let ack = backend
457            .append(basin.clone(), stream.clone(), input)
458            .await
459            .unwrap();
460        assert!(ack.end.seq_num > 0);
461
462        let stream_id = StreamId::new(&basin, &stream);
463        let mut batch = WriteBatch::new();
464        batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
465        static WRITE_OPTS: WriteOptions = WriteOptions {
466            await_durable: true,
467        };
468        backend
469            .db
470            .write_with_options(batch, &WRITE_OPTS)
471            .await
472            .unwrap();
473
474        let start = ReadStart {
475            from: ReadFrom::SeqNum(0),
476            clamp: false,
477        };
478        let end = ReadEnd {
479            limit: ReadLimit::Count(10),
480            until: ReadUntil::Unbounded,
481            wait: None,
482        };
483        let session = backend.read(basin, stream, start, end).await.unwrap();
484        let records: Vec<_> = tokio::time::timeout(
485            Duration::from_secs(2),
486            futures::StreamExt::collect::<Vec<_>>(session),
487        )
488        .await
489        .expect("read should not spin forever");
490        assert!(records.into_iter().all(|r| r.is_ok()));
491    }
492}