Skip to main content

s2_lite/backend/
read.rs

1use std::time::Duration;
2
3use futures::Stream;
4use s2_common::{
5    caps,
6    read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
7    record::{Metered, MeteredSize as _, SeqNum, SequencedRecord, StreamPosition, Timestamp},
8    types::{
9        basin::BasinName,
10        stream::{ReadBatch, ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11    },
12};
13use slatedb::config::{DurabilityLevel, ScanOptions};
14use tokio::sync::broadcast;
15
16use super::Backend;
17use crate::backend::{
18    error::{
19        CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
20    },
21    kv,
22    stream_id::StreamId,
23};
24
25impl Backend {
26    async fn read_start_seq_num(
27        &self,
28        stream_id: StreamId,
29        start: ReadStart,
30        end: ReadEnd,
31        tail: StreamPosition,
32    ) -> Result<SeqNum, ReadError> {
33        let mut read_pos = match start.from {
34            s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
35            s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
36                ReadPosition::Timestamp(timestamp)
37            }
38            s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
39                ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
40            }
41        };
42        if match read_pos {
43            ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
44            ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
45        } {
46            if start.clamp {
47                read_pos = ReadPosition::SeqNum(tail.seq_num);
48            } else {
49                return Err(UnwrittenError(tail).into());
50            }
51        }
52        if let ReadPosition::SeqNum(start_seq_num) = read_pos
53            && start_seq_num == tail.seq_num
54            && !end.may_follow()
55        {
56            return Err(UnwrittenError(tail).into());
57        }
58        Ok(match read_pos {
59            ReadPosition::SeqNum(start_seq_num) => start_seq_num,
60            ReadPosition::Timestamp(start_timestamp) => {
61                self.resolve_timestamp(stream_id, start_timestamp)
62                    .await?
63                    .unwrap_or(tail)
64                    .seq_num
65            }
66        })
67    }
68
69    pub async fn check_tail(
70        &self,
71        basin: BasinName,
72        stream: StreamName,
73    ) -> Result<StreamPosition, CheckTailError> {
74        let client = self
75            .streamer_client_with_auto_create::<CheckTailError>(&basin, &stream, |config| {
76                config.create_stream_on_read
77            })
78            .await?;
79        let tail = client.check_tail().await?;
80        Ok(tail)
81    }
82
83    pub async fn read(
84        &self,
85        basin: BasinName,
86        stream: StreamName,
87        start: ReadStart,
88        end: ReadEnd,
89    ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
90        let client = self
91            .streamer_client_with_auto_create::<ReadError>(&basin, &stream, |config| {
92                config.create_stream_on_read
93            })
94            .await?;
95        let stream_id = client.stream_id();
96        let tail = client.check_tail().await?;
97        let mut state = ReadSessionState {
98            start_seq_num: self.read_start_seq_num(stream_id, start, end, tail).await?,
99            limit: EvaluatedReadLimit::Remaining(end.limit),
100            until: end.until,
101            tail,
102        };
103        let db = self.db.clone();
104        let session = async_stream::try_stream! {
105            'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
106                if state.start_seq_num < state.tail.seq_num {
107                    let start_key = kv::stream_record_data::ser_key(
108                        stream_id,
109                        StreamPosition {
110                            seq_num: state.start_seq_num,
111                            timestamp: 0,
112                        },
113                    );
114                    let end_key = kv::stream_record_data::ser_key(
115                        stream_id,
116                        StreamPosition {
117                            seq_num: state.tail.seq_num,
118                            timestamp: 0,
119                        },
120                    );
121                    static SCAN_OPTS: ScanOptions = ScanOptions {
122                        durability_filter: DurabilityLevel::Remote,
123                        dirty: false,
124                        read_ahead_bytes: 1024 * 1024,
125                        cache_blocks: true,
126                        max_fetch_tasks: 8,
127                    };
128                    let mut it = db
129                        .scan_with_options(start_key..end_key, &SCAN_OPTS)
130                        .await?;
131
132                    let mut records = Metered::with_capacity(
133                        limit.count()
134                            .unwrap_or(usize::MAX)
135                            .min(caps::RECORD_BATCH_MAX.count),
136                    );
137
138                    while let EvaluatedReadLimit::Remaining(limit) = state.limit {
139                        let Some(kv) = it.next().await? else {
140                            break;
141                        };
142                        let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
143                        assert_eq!(deser_stream_id, stream_id);
144
145                        let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
146
147                        if end.until.deny(pos.timestamp)
148                            || limit.deny(records.len() + 1, records.metered_size() + record.metered_size()) {
149                            if records.is_empty() {
150                                break 'session;
151                            } else {
152                                break;
153                            }
154                        }
155
156                        if records.len() == caps::RECORD_BATCH_MAX.count
157                            || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
158                        {
159                            let new_records_buf = Metered::with_capacity(
160                                limit.count()
161                                    .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
162                                    .min(caps::RECORD_BATCH_MAX.count),
163                            );
164                            yield state.on_batch(ReadBatch {
165                                records: std::mem::replace(&mut records, new_records_buf),
166                                tail: None,
167                            });
168                        }
169
170                        records.push(record);
171                    }
172
173                    if !records.is_empty() {
174                        yield state.on_batch(ReadBatch {
175                            records,
176                            tail: None,
177                        });
178                    }
179                } else {
180                    assert_eq!(state.start_seq_num, state.tail.seq_num);
181                    if !end.may_follow() {
182                        break;
183                    }
184                    match client.follow(state.start_seq_num).await? {
185                        Ok(mut follow_rx) => {
186                            yield ReadSessionOutput::Heartbeat(state.tail);
187                            while let EvaluatedReadLimit::Remaining(limit) = state.limit {
188                                tokio::select! {
189                                    biased;
190                                    msg = follow_rx.recv() => {
191                                        match msg {
192                                            Ok(mut records) => {
193                                                let count = records.len();
194                                                let tail = super::streamer::next_pos(&records);
195                                                let allowed_count = count_allowed_records(limit, end.until, &records);
196                                                if allowed_count > 0 {
197                                                    yield state.on_batch(ReadBatch {
198                                                        records: records.drain(..allowed_count).collect(),
199                                                        tail: Some(tail),
200                                                    });
201                                                }
202                                                if allowed_count < count {
203                                                    break 'session;
204                                                }
205                                            }
206                                            Err(broadcast::error::RecvError::Lagged(_)) => {
207                                                // Catch up using DB
208                                                continue 'session;
209                                            }
210                                            Err(broadcast::error::RecvError::Closed) => {
211                                                break;
212                                            }
213                                        }
214                                    }
215                                    _ = new_heartbeat_sleep() => {
216                                        yield ReadSessionOutput::Heartbeat(state.tail);
217                                    }
218                                    _ = wait_sleep(end.wait) => {
219                                        break 'session;
220                                    }
221                                }
222                            }
223                            Err(StreamerMissingInActionError)?;
224                        }
225                        Err(tail) => {
226                            assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
227                            state.tail = tail;
228                        }
229                    }
230                }
231            }
232        };
233        Ok(session)
234    }
235
236    pub(super) async fn resolve_timestamp(
237        &self,
238        stream_id: StreamId,
239        timestamp: Timestamp,
240    ) -> Result<Option<StreamPosition>, StorageError> {
241        let start_key = kv::stream_record_timestamp::ser_key(
242            stream_id,
243            StreamPosition {
244                seq_num: SeqNum::MIN,
245                timestamp,
246            },
247        );
248        let end_key = kv::stream_record_timestamp::ser_key(
249            stream_id,
250            StreamPosition {
251                seq_num: SeqNum::MAX,
252                timestamp: Timestamp::MAX,
253            },
254        );
255        static SCAN_OPTS: ScanOptions = ScanOptions {
256            durability_filter: DurabilityLevel::Remote,
257            dirty: false,
258            read_ahead_bytes: 1,
259            cache_blocks: false,
260            max_fetch_tasks: 1,
261        };
262        let mut it = self
263            .db
264            .scan_with_options(start_key..end_key, &SCAN_OPTS)
265            .await?;
266        Ok(match it.next().await? {
267            Some(kv) => {
268                let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
269                assert_eq!(deser_stream_id, stream_id);
270                assert!(pos.timestamp >= timestamp);
271                kv::stream_record_timestamp::deser_value(kv.value)?;
272                Some(StreamPosition {
273                    seq_num: pos.seq_num,
274                    timestamp: pos.timestamp,
275                })
276            }
277            None => None,
278        })
279    }
280}
281
282struct ReadSessionState {
283    start_seq_num: u64,
284    limit: EvaluatedReadLimit,
285    until: ReadUntil,
286    tail: StreamPosition,
287}
288
289impl ReadSessionState {
290    fn on_batch(&mut self, batch: ReadBatch) -> ReadSessionOutput {
291        if let Some(tail) = batch.tail {
292            self.tail = tail;
293        }
294        let last_record = batch.records.last().expect("non-empty");
295        let EvaluatedReadLimit::Remaining(limit) = self.limit else {
296            panic!("batch after exhausted limit");
297        };
298        let count = batch.records.len();
299        let bytes = batch.records.metered_size();
300        assert!(limit.allow(count, bytes));
301        assert!(self.until.allow(last_record.position.timestamp));
302        self.start_seq_num = last_record.position.seq_num + 1;
303        self.limit = limit.remaining(count, bytes);
304        ReadSessionOutput::Batch(batch)
305    }
306}
307
308fn count_allowed_records(
309    limit: ReadLimit,
310    until: ReadUntil,
311    records: &[Metered<SequencedRecord>],
312) -> usize {
313    let mut acc_size = 0;
314    let mut acc_count = 0;
315    for record in records {
316        if limit.deny(acc_count + 1, acc_size + record.metered_size())
317            || until.deny(record.position.timestamp)
318        {
319            break;
320        }
321        acc_count += 1;
322        acc_size += record.metered_size();
323    }
324    acc_count
325}
326
327fn new_heartbeat_sleep() -> tokio::time::Sleep {
328    tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
329}
330
331async fn wait_sleep(wait: Option<Duration>) {
332    match wait {
333        Some(wait) => tokio::time::sleep(wait).await,
334        None => {
335            std::future::pending::<()>().await;
336        }
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use std::sync::Arc;
343
344    use bytesize::ByteSize;
345    use slatedb::{Db, object_store::memory::InMemory};
346
347    use super::*;
348
349    #[tokio::test]
350    async fn resolve_timestamp_bounded_to_stream() {
351        let object_store = Arc::new(InMemory::new());
352        let db = Db::builder("/test", object_store).build().await.unwrap();
353        let backend = Backend::new(db, ByteSize::mib(10));
354
355        let stream_a: StreamId = [0u8; 32].into();
356        let stream_b: StreamId = [1u8; 32].into();
357
358        backend
359            .db
360            .put(
361                kv::stream_record_timestamp::ser_key(
362                    stream_a,
363                    StreamPosition {
364                        seq_num: 0,
365                        timestamp: 1000,
366                    },
367                ),
368                kv::stream_record_timestamp::ser_value(),
369            )
370            .await
371            .unwrap();
372        backend
373            .db
374            .put(
375                kv::stream_record_timestamp::ser_key(
376                    stream_b,
377                    StreamPosition {
378                        seq_num: 0,
379                        timestamp: 2000,
380                    },
381                ),
382                kv::stream_record_timestamp::ser_value(),
383            )
384            .await
385            .unwrap();
386
387        // Should find record in stream_a
388        let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
389        assert_eq!(
390            result,
391            Some(StreamPosition {
392                seq_num: 0,
393                timestamp: 1000
394            })
395        );
396
397        // Should return None, not find stream_b's record
398        let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
399        assert_eq!(result, None);
400    }
401}