s2_lite/backend/
read.rs

1use std::time::Duration;
2
3use futures::Stream;
4use s2_common::{
5    caps,
6    read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
7    record::{Metered, MeteredSize as _, SeqNum, SequencedRecord, StreamPosition},
8    types::{
9        basin::BasinName,
10        stream::{ReadBatch, ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11    },
12};
13use slatedb::config::{DurabilityLevel, ScanOptions};
14use tokio::sync::broadcast;
15
16use super::Backend;
17use crate::backend::{
18    error::{CheckTailError, ReadError, StreamerMissingInActionError, UnwrittenError},
19    kv,
20    stream_id::StreamId,
21};
22
23impl Backend {
24    async fn read_start_seq_num(
25        &self,
26        stream_id: StreamId,
27        start: ReadStart,
28        end: ReadEnd,
29        tail: StreamPosition,
30    ) -> Result<SeqNum, ReadError> {
31        let mut read_pos = match start.from {
32            s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
33            s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
34                ReadPosition::Timestamp(timestamp)
35            }
36            s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
37                ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
38            }
39        };
40        if match read_pos {
41            ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
42            ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
43        } {
44            if start.clamp {
45                read_pos = ReadPosition::SeqNum(tail.seq_num);
46            } else {
47                return Err(UnwrittenError(tail).into());
48            }
49        }
50        if let ReadPosition::SeqNum(start_seq_num) = read_pos
51            && start_seq_num == tail.seq_num
52            && !end.may_follow()
53        {
54            return Err(UnwrittenError(tail).into());
55        }
56        Ok(match read_pos {
57            ReadPosition::SeqNum(start_seq_num) => start_seq_num,
58            ReadPosition::Timestamp(start_timestamp) => {
59                self.resolve_timestamp(stream_id, start_timestamp)
60                    .await?
61                    .unwrap_or(tail)
62                    .seq_num
63            }
64        })
65    }
66
67    pub async fn check_tail(
68        &self,
69        basin: BasinName,
70        stream: StreamName,
71    ) -> Result<StreamPosition, CheckTailError> {
72        let client = self
73            .streamer_client_with_auto_create::<CheckTailError>(&basin, &stream, |config| {
74                config.create_stream_on_read
75            })
76            .await?;
77        let tail = client.check_tail().await?;
78        Ok(tail)
79    }
80
81    pub async fn read(
82        &self,
83        basin: BasinName,
84        stream: StreamName,
85        start: ReadStart,
86        end: ReadEnd,
87    ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
88        let client = self
89            .streamer_client_with_auto_create::<ReadError>(&basin, &stream, |config| {
90                config.create_stream_on_read
91            })
92            .await?;
93        let stream_id = client.stream_id();
94        let tail = client.check_tail().await?;
95        let mut state = ReadSessionState {
96            start_seq_num: self.read_start_seq_num(stream_id, start, end, tail).await?,
97            limit: EvaluatedReadLimit::Remaining(end.limit),
98            until: end.until,
99            tail,
100        };
101        let db = self.db.clone();
102        let session = async_stream::try_stream! {
103            'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
104                if state.start_seq_num < state.tail.seq_num {
105                    let start_key = kv::stream_record_data::ser_key(
106                        stream_id,
107                        StreamPosition {
108                            seq_num: state.start_seq_num,
109                            timestamp: 0,
110                        },
111                    );
112                    let end_key = kv::stream_record_data::ser_key(
113                        stream_id,
114                        StreamPosition {
115                            seq_num: state.tail.seq_num,
116                            timestamp: 0,
117                        },
118                    );
119                    static SCAN_OPTS: ScanOptions = ScanOptions {
120                        durability_filter: DurabilityLevel::Remote,
121                        dirty: false,
122                        read_ahead_bytes: 1024 * 1024,
123                        cache_blocks: true,
124                        max_fetch_tasks: 8,
125                    };
126                    let mut it = db
127                        .scan_with_options(start_key..end_key, &SCAN_OPTS)
128                        .await?;
129
130                    let mut records = Metered::with_capacity(
131                        limit.count()
132                            .unwrap_or(usize::MAX)
133                            .min(caps::RECORD_BATCH_MAX.count),
134                    );
135
136                    while let EvaluatedReadLimit::Remaining(limit) = state.limit {
137                        let Some(kv) = it.next().await? else {
138                            break;
139                        };
140                        let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
141                        assert_eq!(deser_stream_id, stream_id);
142
143                        let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
144
145                        if end.until.deny(pos.timestamp)
146                            || limit.deny(records.len() + 1, records.metered_size() + record.metered_size()) {
147                            if records.is_empty() {
148                                break 'session;
149                            } else {
150                                break;
151                            }
152                        }
153
154                        if records.len() == caps::RECORD_BATCH_MAX.count
155                            || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
156                        {
157                            let new_records_buf = Metered::with_capacity(
158                                limit.count()
159                                    .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
160                                    .min(caps::RECORD_BATCH_MAX.count),
161                            );
162                            yield state.on_batch(ReadBatch {
163                                records: std::mem::replace(&mut records, new_records_buf),
164                                tail: None,
165                            });
166                        }
167
168                        records.push(record);
169                    }
170
171                    if !records.is_empty() {
172                        yield state.on_batch(ReadBatch {
173                            records,
174                            tail: None,
175                        });
176                    }
177                } else {
178                    assert_eq!(state.start_seq_num, state.tail.seq_num);
179                    if !end.may_follow() {
180                        break;
181                    }
182                    match client.follow(state.start_seq_num).await? {
183                        Ok(mut follow_rx) => {
184                            yield ReadSessionOutput::Heartbeat(state.tail);
185                            while let EvaluatedReadLimit::Remaining(limit) = state.limit {
186                                tokio::select! {
187                                    biased;
188                                    msg = follow_rx.recv() => {
189                                        match msg {
190                                            Ok(mut records) => {
191                                                let count = records.len();
192                                                let tail = super::streamer::next_pos(&records);
193                                                let allowed_count = count_allowed_records(limit, end.until, &records);
194                                                if allowed_count > 0 {
195                                                    yield state.on_batch(ReadBatch {
196                                                        records: records.drain(..allowed_count).collect(),
197                                                        tail: Some(tail),
198                                                    });
199                                                }
200                                                if allowed_count < count {
201                                                    break 'session;
202                                                }
203                                            }
204                                            Err(broadcast::error::RecvError::Lagged(_)) => {
205                                                // Catch up using DB
206                                                continue 'session;
207                                            }
208                                            Err(broadcast::error::RecvError::Closed) => {
209                                                break;
210                                            }
211                                        }
212                                    }
213                                    _ = new_heartbeat_sleep() => {
214                                        yield ReadSessionOutput::Heartbeat(state.tail);
215                                    }
216                                    _ = wait_sleep(end.wait) => {
217                                        break 'session;
218                                    }
219                                }
220                            }
221                            Err(StreamerMissingInActionError)?;
222                        }
223                        Err(tail) => {
224                            assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
225                            state.tail = tail;
226                        }
227                    }
228                }
229            }
230        };
231        Ok(session)
232    }
233}
234
235struct ReadSessionState {
236    start_seq_num: u64,
237    limit: EvaluatedReadLimit,
238    until: ReadUntil,
239    tail: StreamPosition,
240}
241
242impl ReadSessionState {
243    fn on_batch(&mut self, batch: ReadBatch) -> ReadSessionOutput {
244        if let Some(tail) = batch.tail {
245            self.tail = tail;
246        }
247        let last_record = batch.records.last().expect("non-empty");
248        let EvaluatedReadLimit::Remaining(limit) = self.limit else {
249            panic!("batch after exhausted limit");
250        };
251        let count = batch.records.len();
252        let bytes = batch.records.metered_size();
253        assert!(limit.allow(count, bytes));
254        assert!(self.until.allow(last_record.position.timestamp));
255        self.start_seq_num = last_record.position.seq_num + 1;
256        self.limit = limit.remaining(count, bytes);
257        ReadSessionOutput::Batch(batch)
258    }
259}
260
261fn count_allowed_records(
262    limit: ReadLimit,
263    until: ReadUntil,
264    records: &[Metered<SequencedRecord>],
265) -> usize {
266    let mut acc_size = 0;
267    let mut acc_count = 0;
268    for record in records {
269        if limit.deny(acc_count + 1, acc_size + record.metered_size())
270            || until.deny(record.position.timestamp)
271        {
272            break;
273        }
274        acc_count += 1;
275        acc_size += record.metered_size();
276    }
277    acc_count
278}
279
280fn new_heartbeat_sleep() -> tokio::time::Sleep {
281    tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
282}
283
284async fn wait_sleep(wait: Option<Duration>) {
285    match wait {
286        Some(wait) => tokio::time::sleep(wait).await,
287        None => {
288            std::future::pending::<()>().await;
289        }
290    }
291}