1use std::{sync::Arc, time::Duration};
2
3use dashmap::DashMap;
4use enum_ordinalize::Ordinalize;
5use s2_common::{
6 record::{SeqNum, StreamPosition},
7 types::{
8 basin::BasinName,
9 config::{BasinConfig, OptionalStreamConfig},
10 resources::CreateMode,
11 stream::StreamName,
12 },
13};
14use slatedb::config::{DurabilityLevel, ScanOptions};
15use tokio::time::Instant;
16
17use super::{
18 error::{
19 BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, GetBasinConfigError,
20 StorageError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
21 TransactionConflictError, UnavailableError,
22 },
23 kv,
24 stream_id::StreamId,
25 streamer::{StreamerClient, StreamerClientState},
26};
27
28pub const FOLLOWER_MAX_LAG: usize = 25;
29
30#[derive(Clone)]
31pub struct Backend {
32 pub(super) db: slatedb::Db,
33 client_states: Arc<DashMap<StreamId, StreamerClientState>>,
34}
35
36impl Backend {
37 const FAILED_INIT_MEMORY: Duration = Duration::from_secs(1);
38
39 pub fn new(db: slatedb::Db) -> Self {
40 Self {
41 db,
42 client_states: Arc::new(DashMap::new()),
43 }
44 }
45
46 async fn start_streamer(
47 &self,
48 basin: BasinName,
49 stream: StreamName,
50 ) -> Result<StreamerClient, StreamerError> {
51 let stream_id = StreamId::new(&basin, &stream);
52
53 let (meta, tail_pos, fencing_token, trim_point) = tokio::try_join!(
54 self.db_get(
55 kv::stream_meta::ser_key(&basin, &stream),
56 kv::stream_meta::deser_value,
57 ),
58 self.db_get(
59 kv::stream_tail_position::ser_key(stream_id),
60 kv::stream_tail_position::deser_value,
61 ),
62 self.db_get(
63 kv::stream_fencing_token::ser_key(stream_id),
64 kv::stream_fencing_token::deser_value,
65 ),
66 self.db_get(
67 kv::stream_trim_point::ser_key(stream_id),
68 kv::stream_trim_point::deser_value,
69 )
70 )?;
71
72 let Some(meta) = meta else {
73 return Err(StreamNotFoundError { basin, stream }.into());
74 };
75
76 let tail_pos = tail_pos.unwrap_or(StreamPosition::MIN);
77 self.assert_no_records_following_tail(stream_id, &basin, &stream, tail_pos)
78 .await?;
79
80 let fencing_token = fencing_token.unwrap_or_default();
81 let trim_point = trim_point.unwrap_or(..SeqNum::MIN);
82
83 if trim_point.end == SeqNum::MAX {
84 return Err(StreamDeletionPendingError { basin, stream }.into());
85 }
86
87 let client_states = self.client_states.clone();
88 Ok(super::streamer::spawn_streamer(
89 self.db.clone(),
90 stream_id,
91 meta.config,
92 tail_pos,
93 fencing_token,
94 trim_point,
95 client_states,
96 ))
97 }
98
99 async fn assert_no_records_following_tail(
100 &self,
101 stream_id: StreamId,
102 basin: &BasinName,
103 stream: &StreamName,
104 tail_pos: StreamPosition,
105 ) -> Result<(), StorageError> {
106 let start_key = kv::stream_record_data::ser_key(
107 stream_id,
108 StreamPosition {
109 seq_num: tail_pos.seq_num,
110 timestamp: 0,
111 },
112 );
113 static SCAN_OPTS: ScanOptions = ScanOptions {
114 durability_filter: DurabilityLevel::Remote,
115 dirty: false,
116 read_ahead_bytes: 1,
117 cache_blocks: false,
118 max_fetch_tasks: 1,
119 };
120 let mut it = self.db.scan_with_options(start_key.., &SCAN_OPTS).await?;
121 let Some(kv) = it.next().await? else {
122 return Ok(());
123 };
124 if kv.key.first().copied() != Some(kv::KeyType::StreamRecordData.ordinal()) {
125 return Ok(());
126 }
127 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
128 assert!(
129 deser_stream_id != stream_id,
130 "invariant violation: stream `{basin}/{stream}` tail_pos {tail_pos:?} but found record at {pos:?}"
131 );
132 Ok(())
133 }
134
135 fn streamer_client_state(&self, basin: &BasinName, stream: &StreamName) -> StreamerClientState {
136 match self.client_states.entry(StreamId::new(basin, stream)) {
137 dashmap::Entry::Occupied(oe) => oe.get().clone(),
138 dashmap::Entry::Vacant(ve) => {
139 let this = self.clone();
140 let stream_id = *(ve.key());
141 let basin = basin.clone();
142 let stream = stream.clone();
143 tokio::spawn(async move {
144 let state = match this.start_streamer(basin, stream).await {
145 Ok(client) => StreamerClientState::Ready { client },
146 Err(error) => StreamerClientState::InitError {
147 error: Box::new(error),
148 timestamp: Instant::now(),
149 },
150 };
151 let replaced_state = this.client_states.insert(stream_id, state);
152 let Some(StreamerClientState::Blocked { notify }) = replaced_state else {
153 panic!("expected Blocked client but replaced: {replaced_state:?}");
154 };
155 notify.notify_waiters();
156 });
157 ve.insert(StreamerClientState::Blocked {
158 notify: Default::default(),
159 })
160 .value()
161 .clone()
162 }
163 }
164 }
165
166 fn streamer_remove_unready(&self, stream_id: StreamId) {
167 if let dashmap::Entry::Occupied(oe) = self.client_states.entry(stream_id)
168 && let StreamerClientState::InitError { .. } = oe.get()
169 {
170 oe.remove();
171 }
172 }
173
174 pub(super) async fn streamer_client(
175 &self,
176 basin: &BasinName,
177 stream: &StreamName,
178 ) -> Result<StreamerClient, StreamerError> {
179 let mut waited = false;
180 loop {
181 match self.streamer_client_state(basin, stream) {
182 StreamerClientState::Blocked { notify } => {
183 notify.notified().await;
184 waited = true;
185 }
186 StreamerClientState::InitError { error, timestamp } => {
187 if !waited || timestamp.elapsed() > Self::FAILED_INIT_MEMORY {
188 self.streamer_remove_unready(StreamId::new(basin, stream));
189 } else {
190 return Err(*error);
191 }
192 }
193 StreamerClientState::Ready { client } => {
194 return Ok(client);
195 }
196 }
197 }
198 }
199
200 pub(super) fn streamer_client_if_active(
201 &self,
202 basin: &BasinName,
203 stream: &StreamName,
204 ) -> Option<StreamerClient> {
205 match self.streamer_client_state(basin, stream) {
206 StreamerClientState::Ready { client } => Some(client),
207 _ => None,
208 }
209 }
210
211 pub(super) async fn streamer_client_with_auto_create<E>(
212 &self,
213 basin: &BasinName,
214 stream: &StreamName,
215 should_auto_create: impl FnOnce(&BasinConfig) -> bool,
216 ) -> Result<StreamerClient, E>
217 where
218 E: From<StreamerError>
219 + From<StorageError>
220 + From<BasinNotFoundError>
221 + From<TransactionConflictError>
222 + From<UnavailableError>
223 + From<BasinDeletionPendingError>
224 + From<StreamDeletionPendingError>
225 + From<StreamNotFoundError>,
226 {
227 match self.streamer_client(basin, stream).await {
228 Ok(client) => Ok(client),
229 Err(StreamerError::StreamNotFound(e)) => {
230 let config = match self.get_basin_config(basin.clone()).await {
231 Ok(config) => config,
232 Err(GetBasinConfigError::Storage(e)) => Err(e)?,
233 Err(GetBasinConfigError::BasinNotFound(e)) => Err(e)?,
234 };
235 if should_auto_create(&config) {
236 if let Err(e) = self
237 .create_stream(
238 basin.clone(),
239 stream.clone(),
240 OptionalStreamConfig::default(),
241 CreateMode::CreateOnly(None),
242 )
243 .await
244 {
245 match e {
246 CreateStreamError::Storage(e) => Err(e)?,
247 CreateStreamError::TransactionConflict(e) => Err(e)?,
248 CreateStreamError::Unavailable(e) => Err(e)?,
249 CreateStreamError::BasinDeletionPending(e) => Err(e)?,
250 CreateStreamError::StreamDeletionPending(e) => Err(e)?,
251 CreateStreamError::BasinNotFound(e) => Err(e)?,
252 CreateStreamError::StreamAlreadyExists(_) => {}
253 }
254 }
255 Ok(self.streamer_client(basin, stream).await?)
256 } else {
257 Err(e.into())
258 }
259 }
260 Err(e) => Err(e.into()),
261 }
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use std::str::FromStr as _;
268
269 use bytes::Bytes;
270 use s2_common::record::{Metered, Record, StreamPosition};
271 use slatedb::{WriteBatch, config::WriteOptions, object_store};
272 use time::OffsetDateTime;
273
274 use super::*;
275
276 #[tokio::test]
277 #[should_panic(expected = "invariant violation: stream `testbasin1/stream1` tail_pos")]
278 async fn start_streamer_fails_if_records_exist_after_tail_pos() {
279 let object_store: Arc<dyn object_store::ObjectStore> =
280 Arc::new(object_store::memory::InMemory::new());
281 let db = slatedb::Db::builder("test", object_store)
282 .build()
283 .await
284 .unwrap();
285
286 let backend = Backend::new(db.clone());
287
288 let basin = BasinName::from_str("testbasin1").unwrap();
289 let stream = StreamName::from_str("stream1").unwrap();
290 let stream_id = StreamId::new(&basin, &stream);
291
292 let meta = kv::stream_meta::StreamMeta {
293 config: OptionalStreamConfig::default(),
294 created_at: OffsetDateTime::now_utc(),
295 deleted_at: None,
296 creation_idempotency_key: None,
297 };
298
299 let tail_pos = StreamPosition {
300 seq_num: 1,
301 timestamp: 123,
302 };
303 let record_pos = StreamPosition {
304 seq_num: tail_pos.seq_num,
305 timestamp: tail_pos.timestamp,
306 };
307
308 let record = Record::try_from_parts(vec![], Bytes::from_static(b"hello")).unwrap();
309 let metered_record: Metered<Record> = record.into();
310
311 let mut wb = WriteBatch::new();
312 wb.put(
313 kv::stream_meta::ser_key(&basin, &stream),
314 kv::stream_meta::ser_value(&meta),
315 );
316 wb.put(
317 kv::stream_tail_position::ser_key(stream_id),
318 kv::stream_tail_position::ser_value(tail_pos),
319 );
320 wb.put(
321 kv::stream_record_data::ser_key(stream_id, record_pos),
322 kv::stream_record_data::ser_value(metered_record.as_ref()),
323 );
324 static WRITE_OPTS: WriteOptions = WriteOptions {
325 await_durable: true,
326 };
327 db.write_with_options(wb, &WRITE_OPTS).await.unwrap();
328
329 backend
330 .start_streamer(basin.clone(), stream.clone())
331 .await
332 .unwrap();
333 }
334}