1use std::time::Duration;
2
3use futures::Stream;
4use s2_common::{
5 caps,
6 read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
7 record::{Metered, MeteredSize as _, SeqNum, SequencedRecord, StreamPosition},
8 types::{
9 basin::BasinName,
10 stream::{ReadBatch, ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11 },
12};
13use slatedb::config::{DurabilityLevel, ScanOptions};
14use tokio::sync::broadcast;
15
16use super::Backend;
17use crate::backend::{
18 error::{CheckTailError, ReadError, StreamerMissingInActionError, UnwrittenError},
19 kv,
20 stream_id::StreamId,
21};
22
23impl Backend {
24 async fn read_start_seq_num(
25 &self,
26 stream_id: StreamId,
27 start: ReadStart,
28 end: ReadEnd,
29 tail: StreamPosition,
30 ) -> Result<SeqNum, ReadError> {
31 let mut read_pos = match start.from {
32 s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
33 s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
34 ReadPosition::Timestamp(timestamp)
35 }
36 s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
37 ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
38 }
39 };
40 if match read_pos {
41 ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
42 ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
43 } {
44 if start.clamp {
45 read_pos = ReadPosition::SeqNum(tail.seq_num);
46 } else {
47 return Err(UnwrittenError(tail).into());
48 }
49 }
50 if let ReadPosition::SeqNum(start_seq_num) = read_pos
51 && start_seq_num == tail.seq_num
52 && !end.may_follow()
53 {
54 return Err(UnwrittenError(tail).into());
55 }
56 Ok(match read_pos {
57 ReadPosition::SeqNum(start_seq_num) => start_seq_num,
58 ReadPosition::Timestamp(start_timestamp) => {
59 self.resolve_timestamp(stream_id, start_timestamp)
60 .await?
61 .unwrap_or(tail)
62 .seq_num
63 }
64 })
65 }
66
67 pub async fn check_tail(
68 &self,
69 basin: BasinName,
70 stream: StreamName,
71 ) -> Result<StreamPosition, CheckTailError> {
72 let client = self
73 .streamer_client_with_auto_create::<CheckTailError>(&basin, &stream, |config| {
74 config.create_stream_on_read
75 })
76 .await?;
77 let tail = client.check_tail().await?;
78 Ok(tail)
79 }
80
81 pub async fn read(
82 &self,
83 basin: BasinName,
84 stream: StreamName,
85 start: ReadStart,
86 end: ReadEnd,
87 ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
88 let client = self
89 .streamer_client_with_auto_create::<ReadError>(&basin, &stream, |config| {
90 config.create_stream_on_read
91 })
92 .await?;
93 let stream_id = client.stream_id();
94 let tail = client.check_tail().await?;
95 let mut state = ReadSessionState {
96 start_seq_num: self.read_start_seq_num(stream_id, start, end, tail).await?,
97 limit: EvaluatedReadLimit::Remaining(end.limit),
98 until: end.until,
99 tail,
100 };
101 let db = self.db.clone();
102 let session = async_stream::try_stream! {
103 'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
104 if state.start_seq_num < state.tail.seq_num {
105 let start_key = kv::stream_record_data::ser_key(
106 stream_id,
107 StreamPosition {
108 seq_num: state.start_seq_num,
109 timestamp: 0,
110 },
111 );
112 let end_key = kv::stream_record_data::ser_key(
113 stream_id,
114 StreamPosition {
115 seq_num: state.tail.seq_num,
116 timestamp: 0,
117 },
118 );
119 static SCAN_OPTS: ScanOptions = ScanOptions {
120 durability_filter: DurabilityLevel::Remote,
121 dirty: false,
122 read_ahead_bytes: 1024 * 1024,
123 cache_blocks: true,
124 max_fetch_tasks: 8,
125 };
126 let mut it = db
127 .scan_with_options(start_key..end_key, &SCAN_OPTS)
128 .await?;
129
130 let mut records = Metered::with_capacity(
131 limit.count()
132 .unwrap_or(usize::MAX)
133 .min(caps::RECORD_BATCH_MAX.count),
134 );
135
136 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
137 let Some(kv) = it.next().await? else {
138 break;
139 };
140 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
141 assert_eq!(deser_stream_id, stream_id);
142
143 let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
144
145 if end.until.deny(pos.timestamp)
146 || limit.deny(records.len() + 1, records.metered_size() + record.metered_size()) {
147 if records.is_empty() {
148 break 'session;
149 } else {
150 break;
151 }
152 }
153
154 if records.len() == caps::RECORD_BATCH_MAX.count
155 || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
156 {
157 let new_records_buf = Metered::with_capacity(
158 limit.count()
159 .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
160 .min(caps::RECORD_BATCH_MAX.count),
161 );
162 yield state.on_batch(ReadBatch {
163 records: std::mem::replace(&mut records, new_records_buf),
164 tail: None,
165 });
166 }
167
168 records.push(record);
169 }
170
171 if !records.is_empty() {
172 yield state.on_batch(ReadBatch {
173 records,
174 tail: None,
175 });
176 }
177 } else {
178 assert_eq!(state.start_seq_num, state.tail.seq_num);
179 if !end.may_follow() {
180 break;
181 }
182 match client.follow(state.start_seq_num).await? {
183 Ok(mut follow_rx) => {
184 yield ReadSessionOutput::Heartbeat(state.tail);
185 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
186 tokio::select! {
187 biased;
188 msg = follow_rx.recv() => {
189 match msg {
190 Ok(mut records) => {
191 let count = records.len();
192 let tail = super::streamer::next_pos(&records);
193 let allowed_count = count_allowed_records(limit, end.until, &records);
194 if allowed_count > 0 {
195 yield state.on_batch(ReadBatch {
196 records: records.drain(..allowed_count).collect(),
197 tail: Some(tail),
198 });
199 }
200 if allowed_count < count {
201 break 'session;
202 }
203 }
204 Err(broadcast::error::RecvError::Lagged(_)) => {
205 continue 'session;
207 }
208 Err(broadcast::error::RecvError::Closed) => {
209 break;
210 }
211 }
212 }
213 _ = new_heartbeat_sleep() => {
214 yield ReadSessionOutput::Heartbeat(state.tail);
215 }
216 _ = wait_sleep(end.wait) => {
217 break 'session;
218 }
219 }
220 }
221 Err(StreamerMissingInActionError)?;
222 }
223 Err(tail) => {
224 assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
225 state.tail = tail;
226 }
227 }
228 }
229 }
230 };
231 Ok(session)
232 }
233}
234
235struct ReadSessionState {
236 start_seq_num: u64,
237 limit: EvaluatedReadLimit,
238 until: ReadUntil,
239 tail: StreamPosition,
240}
241
242impl ReadSessionState {
243 fn on_batch(&mut self, batch: ReadBatch) -> ReadSessionOutput {
244 if let Some(tail) = batch.tail {
245 self.tail = tail;
246 }
247 let last_record = batch.records.last().expect("non-empty");
248 let EvaluatedReadLimit::Remaining(limit) = self.limit else {
249 panic!("batch after exhausted limit");
250 };
251 let count = batch.records.len();
252 let bytes = batch.records.metered_size();
253 assert!(limit.allow(count, bytes));
254 assert!(self.until.allow(last_record.position.timestamp));
255 self.start_seq_num = last_record.position.seq_num + 1;
256 self.limit = limit.remaining(count, bytes);
257 ReadSessionOutput::Batch(batch)
258 }
259}
260
261fn count_allowed_records(
262 limit: ReadLimit,
263 until: ReadUntil,
264 records: &[Metered<SequencedRecord>],
265) -> usize {
266 let mut acc_size = 0;
267 let mut acc_count = 0;
268 for record in records {
269 if limit.deny(acc_count + 1, acc_size + record.metered_size())
270 || until.deny(record.position.timestamp)
271 {
272 break;
273 }
274 acc_count += 1;
275 acc_size += record.metered_size();
276 }
277 acc_count
278}
279
280fn new_heartbeat_sleep() -> tokio::time::Sleep {
281 tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
282}
283
284async fn wait_sleep(wait: Option<Duration>) {
285 match wait {
286 Some(wait) => tokio::time::sleep(wait).await,
287 None => {
288 std::future::pending::<()>().await;
289 }
290 }
291}