Skip to main content

s2_lite/backend/
read.rs

1use std::time::Duration;
2
3use futures::Stream;
4use s2_common::{
5    caps,
6    read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
7    record::{Metered, MeteredSize as _, SeqNum, SequencedRecord, StreamPosition, Timestamp},
8    types::{
9        basin::BasinName,
10        stream::{ReadBatch, ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11    },
12};
13use slatedb::config::{DurabilityLevel, ScanOptions};
14use tokio::sync::broadcast;
15
16use super::Backend;
17use crate::backend::{
18    error::{
19        CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
20    },
21    kv,
22    stream_id::StreamId,
23};
24
25impl Backend {
26    async fn read_start_seq_num(
27        &self,
28        stream_id: StreamId,
29        start: ReadStart,
30        end: ReadEnd,
31        tail: StreamPosition,
32    ) -> Result<SeqNum, ReadError> {
33        let mut read_pos = match start.from {
34            s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
35            s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
36                ReadPosition::Timestamp(timestamp)
37            }
38            s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
39                ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
40            }
41        };
42        if match read_pos {
43            ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
44            ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
45        } {
46            if start.clamp {
47                read_pos = ReadPosition::SeqNum(tail.seq_num);
48            } else {
49                return Err(UnwrittenError(tail).into());
50            }
51        }
52        if let ReadPosition::SeqNum(start_seq_num) = read_pos
53            && start_seq_num == tail.seq_num
54            && !end.may_follow()
55        {
56            return Err(UnwrittenError(tail).into());
57        }
58        Ok(match read_pos {
59            ReadPosition::SeqNum(start_seq_num) => start_seq_num,
60            ReadPosition::Timestamp(start_timestamp) => {
61                self.resolve_timestamp(stream_id, start_timestamp)
62                    .await?
63                    .unwrap_or(tail)
64                    .seq_num
65            }
66        })
67    }
68
69    pub async fn check_tail(
70        &self,
71        basin: BasinName,
72        stream: StreamName,
73    ) -> Result<StreamPosition, CheckTailError> {
74        let client = self
75            .streamer_client_with_auto_create::<CheckTailError>(&basin, &stream, |config| {
76                config.create_stream_on_read
77            })
78            .await?;
79        let tail = client.check_tail().await?;
80        Ok(tail)
81    }
82
83    pub async fn read(
84        &self,
85        basin: BasinName,
86        stream: StreamName,
87        start: ReadStart,
88        end: ReadEnd,
89    ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
90        let client = self
91            .streamer_client_with_auto_create::<ReadError>(&basin, &stream, |config| {
92                config.create_stream_on_read
93            })
94            .await?;
95        let stream_id = client.stream_id();
96        let tail = client.check_tail().await?;
97        let mut state = ReadSessionState {
98            start_seq_num: self.read_start_seq_num(stream_id, start, end, tail).await?,
99            limit: EvaluatedReadLimit::Remaining(end.limit),
100            until: end.until,
101            tail,
102        };
103        let db = self.db.clone();
104        let session = async_stream::try_stream! {
105            'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
106                if state.start_seq_num < state.tail.seq_num {
107                    let start_key = kv::stream_record_data::ser_key(
108                        stream_id,
109                        StreamPosition {
110                            seq_num: state.start_seq_num,
111                            timestamp: 0,
112                        },
113                    );
114                    let end_key = kv::stream_record_data::ser_key(
115                        stream_id,
116                        StreamPosition {
117                            seq_num: state.tail.seq_num,
118                            timestamp: 0,
119                        },
120                    );
121                    static SCAN_OPTS: ScanOptions = ScanOptions {
122                        durability_filter: DurabilityLevel::Remote,
123                        dirty: false,
124                        read_ahead_bytes: 1024 * 1024,
125                        cache_blocks: true,
126                        max_fetch_tasks: 8,
127                    };
128                    let mut it = db
129                        .scan_with_options(start_key..end_key, &SCAN_OPTS)
130                        .await?;
131
132                    let mut records = Metered::with_capacity(
133                        limit.count()
134                            .unwrap_or(usize::MAX)
135                            .min(caps::RECORD_BATCH_MAX.count),
136                    );
137
138                    while let EvaluatedReadLimit::Remaining(limit) = state.limit {
139                        let Some(kv) = it.next().await? else {
140                            break;
141                        };
142                        let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
143                        assert_eq!(deser_stream_id, stream_id);
144
145                        let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
146
147                        if end.until.deny(pos.timestamp)
148                            || limit.deny(records.len() + 1, records.metered_size() + record.metered_size()) {
149                            if records.is_empty() {
150                                break 'session;
151                            } else {
152                                break;
153                            }
154                        }
155
156                        if records.len() == caps::RECORD_BATCH_MAX.count
157                            || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
158                        {
159                            let new_records_buf = Metered::with_capacity(
160                                limit.count()
161                                    .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
162                                    .min(caps::RECORD_BATCH_MAX.count),
163                            );
164                            yield state.on_batch(ReadBatch {
165                                records: std::mem::replace(&mut records, new_records_buf),
166                                tail: None,
167                            });
168                        }
169
170                        records.push(record);
171                    }
172
173                    if !records.is_empty() {
174                        yield state.on_batch(ReadBatch {
175                            records,
176                            tail: None,
177                        });
178                    } else {
179                        state.start_seq_num = state.tail.seq_num;
180                    }
181                } else {
182                    assert_eq!(state.start_seq_num, state.tail.seq_num);
183                    if !end.may_follow() {
184                        break;
185                    }
186                    match client.follow(state.start_seq_num).await? {
187                        Ok(mut follow_rx) => {
188                            yield ReadSessionOutput::Heartbeat(state.tail);
189                            while let EvaluatedReadLimit::Remaining(limit) = state.limit {
190                                tokio::select! {
191                                    biased;
192                                    msg = follow_rx.recv() => {
193                                        match msg {
194                                            Ok(mut records) => {
195                                                let count = records.len();
196                                                let tail = super::streamer::next_pos(&records);
197                                                let allowed_count = count_allowed_records(limit, end.until, &records);
198                                                if allowed_count > 0 {
199                                                    yield state.on_batch(ReadBatch {
200                                                        records: records.drain(..allowed_count).collect(),
201                                                        tail: Some(tail),
202                                                    });
203                                                }
204                                                if allowed_count < count {
205                                                    break 'session;
206                                                }
207                                                Ok(())
208                                            }
209                                            Err(broadcast::error::RecvError::Lagged(_)) => {
210                                                // Catch up using DB
211                                                continue 'session;
212                                            }
213                                            Err(broadcast::error::RecvError::Closed) => {
214                                                Err(StreamerMissingInActionError)
215                                            }
216                                        }
217                                    }
218                                    _ = new_heartbeat_sleep() => {
219                                        yield ReadSessionOutput::Heartbeat(state.tail);
220                                        Ok(())
221                                    }
222                                    _ = wait_sleep(end.wait) => {
223                                        break 'session;
224                                    }
225                                }?;
226                            }
227                        }
228                        Err(tail) => {
229                            assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
230                            state.tail = tail;
231                        }
232                    }
233                }
234            }
235        };
236        Ok(session)
237    }
238
239    pub(super) async fn resolve_timestamp(
240        &self,
241        stream_id: StreamId,
242        timestamp: Timestamp,
243    ) -> Result<Option<StreamPosition>, StorageError> {
244        let start_key = kv::stream_record_timestamp::ser_key(
245            stream_id,
246            StreamPosition {
247                seq_num: SeqNum::MIN,
248                timestamp,
249            },
250        );
251        let end_key = kv::stream_record_timestamp::ser_key(
252            stream_id,
253            StreamPosition {
254                seq_num: SeqNum::MAX,
255                timestamp: Timestamp::MAX,
256            },
257        );
258        static SCAN_OPTS: ScanOptions = ScanOptions {
259            durability_filter: DurabilityLevel::Remote,
260            dirty: false,
261            read_ahead_bytes: 1,
262            cache_blocks: false,
263            max_fetch_tasks: 1,
264        };
265        let mut it = self
266            .db
267            .scan_with_options(start_key..end_key, &SCAN_OPTS)
268            .await?;
269        Ok(match it.next().await? {
270            Some(kv) => {
271                let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
272                assert_eq!(deser_stream_id, stream_id);
273                assert!(pos.timestamp >= timestamp);
274                kv::stream_record_timestamp::deser_value(kv.value)?;
275                Some(StreamPosition {
276                    seq_num: pos.seq_num,
277                    timestamp: pos.timestamp,
278                })
279            }
280            None => None,
281        })
282    }
283}
284
285struct ReadSessionState {
286    start_seq_num: u64,
287    limit: EvaluatedReadLimit,
288    until: ReadUntil,
289    tail: StreamPosition,
290}
291
292impl ReadSessionState {
293    fn on_batch(&mut self, batch: ReadBatch) -> ReadSessionOutput {
294        if let Some(tail) = batch.tail {
295            self.tail = tail;
296        }
297        let last_record = batch.records.last().expect("non-empty");
298        let EvaluatedReadLimit::Remaining(limit) = self.limit else {
299            panic!("batch after exhausted limit");
300        };
301        let count = batch.records.len();
302        let bytes = batch.records.metered_size();
303        assert!(limit.allow(count, bytes));
304        assert!(self.until.allow(last_record.position.timestamp));
305        self.start_seq_num = last_record.position.seq_num + 1;
306        self.limit = limit.remaining(count, bytes);
307        ReadSessionOutput::Batch(batch)
308    }
309}
310
311fn count_allowed_records(
312    limit: ReadLimit,
313    until: ReadUntil,
314    records: &[Metered<SequencedRecord>],
315) -> usize {
316    let mut acc_size = 0;
317    let mut acc_count = 0;
318    for record in records {
319        if limit.deny(acc_count + 1, acc_size + record.metered_size())
320            || until.deny(record.position.timestamp)
321        {
322            break;
323        }
324        acc_count += 1;
325        acc_size += record.metered_size();
326    }
327    acc_count
328}
329
330fn new_heartbeat_sleep() -> tokio::time::Sleep {
331    tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
332}
333
334async fn wait_sleep(wait: Option<Duration>) {
335    match wait {
336        Some(wait) => tokio::time::sleep(wait).await,
337        None => {
338            std::future::pending::<()>().await;
339        }
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use std::sync::Arc;
346
347    use bytesize::ByteSize;
348    use s2_common::{
349        read_extent::{ReadLimit, ReadUntil},
350        types::{
351            basin::BasinName,
352            config::OptionalStreamConfig,
353            resources::CreateMode,
354            stream::{
355                AppendInput, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom, ReadStart,
356            },
357        },
358    };
359    use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
360
361    use super::*;
362    use crate::backend::{kv, stream_id::StreamId};
363
364    #[tokio::test]
365    async fn resolve_timestamp_bounded_to_stream() {
366        let object_store = Arc::new(InMemory::new());
367        let db = Db::builder("/test", object_store).build().await.unwrap();
368        let backend = Backend::new(db, ByteSize::mib(10));
369
370        let stream_a: StreamId = [0u8; 32].into();
371        let stream_b: StreamId = [1u8; 32].into();
372
373        backend
374            .db
375            .put(
376                kv::stream_record_timestamp::ser_key(
377                    stream_a,
378                    StreamPosition {
379                        seq_num: 0,
380                        timestamp: 1000,
381                    },
382                ),
383                kv::stream_record_timestamp::ser_value(),
384            )
385            .await
386            .unwrap();
387        backend
388            .db
389            .put(
390                kv::stream_record_timestamp::ser_key(
391                    stream_b,
392                    StreamPosition {
393                        seq_num: 0,
394                        timestamp: 2000,
395                    },
396                ),
397                kv::stream_record_timestamp::ser_value(),
398            )
399            .await
400            .unwrap();
401
402        // Should find record in stream_a
403        let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
404        assert_eq!(
405            result,
406            Some(StreamPosition {
407                seq_num: 0,
408                timestamp: 1000
409            })
410        );
411
412        // Should return None, not find stream_b's record
413        let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
414        assert_eq!(result, None);
415    }
416
417    #[tokio::test]
418    async fn read_completes_when_all_records_deleted() {
419        let object_store = Arc::new(InMemory::new());
420        let db = Db::builder("/test", object_store).build().await.unwrap();
421        let backend = Backend::new(db, ByteSize::mib(10));
422
423        let basin: BasinName = "test-basin".parse().unwrap();
424        backend
425            .create_basin(
426                basin.clone(),
427                Default::default(),
428                CreateMode::CreateOnly(None),
429            )
430            .await
431            .unwrap();
432        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
433        backend
434            .create_stream(
435                basin.clone(),
436                stream.clone(),
437                OptionalStreamConfig::default(),
438                CreateMode::CreateOnly(None),
439            )
440            .await
441            .unwrap();
442
443        let record =
444            s2_common::record::Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap();
445        let metered: s2_common::record::Metered<s2_common::record::Record> = record.into();
446        let parts = AppendRecordParts {
447            timestamp: None,
448            record: metered,
449        };
450        let append_record: s2_common::types::stream::AppendRecord = parts.try_into().unwrap();
451        let batch: AppendRecordBatch = vec![append_record].try_into().unwrap();
452        let input = AppendInput {
453            records: batch,
454            match_seq_num: None,
455            fencing_token: None,
456        };
457        let ack = backend
458            .append(basin.clone(), stream.clone(), input)
459            .await
460            .unwrap();
461        assert!(ack.end.seq_num > 0);
462
463        let stream_id = StreamId::new(&basin, &stream);
464        let mut batch = WriteBatch::new();
465        batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
466        static WRITE_OPTS: WriteOptions = WriteOptions {
467            await_durable: true,
468        };
469        backend
470            .db
471            .write_with_options(batch, &WRITE_OPTS)
472            .await
473            .unwrap();
474
475        let start = ReadStart {
476            from: ReadFrom::SeqNum(0),
477            clamp: false,
478        };
479        let end = ReadEnd {
480            limit: ReadLimit::Count(10),
481            until: ReadUntil::Unbounded,
482            wait: None,
483        };
484        let session = backend.read(basin, stream, start, end).await.unwrap();
485        let records: Vec<_> = tokio::time::timeout(
486            Duration::from_secs(2),
487            futures::StreamExt::collect::<Vec<_>>(session),
488        )
489        .await
490        .expect("read should not spin forever");
491        assert!(records.into_iter().all(|r| r.is_ok()));
492    }
493}