1use std::{sync::Arc, time::Duration};
2
3use bytesize::ByteSize;
4use dashmap::DashMap;
5use enum_ordinalize::Ordinalize;
6use s2_common::{
7 record::{SeqNum, StreamPosition},
8 types::{
9 basin::BasinName,
10 config::{BasinConfig, OptionalStreamConfig},
11 resources::CreateMode,
12 stream::StreamName,
13 },
14};
15use slatedb::config::{DurabilityLevel, ScanOptions};
16use tokio::{sync::broadcast, time::Instant};
17
18use super::{
19 error::{
20 BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, GetBasinConfigError,
21 StorageError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
22 TransactionConflictError,
23 },
24 kv,
25 stream_id::StreamId,
26 streamer::{StreamerClient, StreamerClientState},
27};
28use crate::backend::bgtasks::BgtaskTrigger;
29
30#[derive(Clone)]
31pub struct Backend {
32 pub(super) db: slatedb::Db,
33 client_states: Arc<DashMap<StreamId, StreamerClientState>>,
34 append_inflight_max: ByteSize,
35 bgtask_trigger_tx: broadcast::Sender<BgtaskTrigger>,
36}
37
38impl Backend {
39 const FAILED_INIT_MEMORY: Duration = Duration::from_secs(1);
40
41 pub fn new(db: slatedb::Db, append_inflight_max: ByteSize) -> Self {
42 let (bgtask_trigger_tx, _) = broadcast::channel(16);
43 Self {
44 db,
45 client_states: Arc::new(DashMap::new()),
46 append_inflight_max,
47 bgtask_trigger_tx,
48 }
49 }
50
51 pub(super) fn bgtask_trigger(&self, trigger: BgtaskTrigger) {
52 let _ = self.bgtask_trigger_tx.send(trigger);
53 }
54
55 pub(super) fn bgtask_trigger_subscribe(&self) -> broadcast::Receiver<BgtaskTrigger> {
56 self.bgtask_trigger_tx.subscribe()
57 }
58
59 async fn start_streamer(
60 &self,
61 basin: BasinName,
62 stream: StreamName,
63 ) -> Result<StreamerClient, StreamerError> {
64 let stream_id = StreamId::new(&basin, &stream);
65
66 let (meta, tail_pos, fencing_token, trim_point) = tokio::try_join!(
67 self.db_get(
68 kv::stream_meta::ser_key(&basin, &stream),
69 kv::stream_meta::deser_value,
70 ),
71 self.db_get(
72 kv::stream_tail_position::ser_key(stream_id),
73 kv::stream_tail_position::deser_value,
74 ),
75 self.db_get(
76 kv::stream_fencing_token::ser_key(stream_id),
77 kv::stream_fencing_token::deser_value,
78 ),
79 self.db_get(
80 kv::stream_trim_point::ser_key(stream_id),
81 kv::stream_trim_point::deser_value,
82 )
83 )?;
84
85 let Some(meta) = meta else {
86 return Err(StreamNotFoundError { basin, stream }.into());
87 };
88
89 let tail_pos = tail_pos.unwrap_or(StreamPosition::MIN);
90 self.assert_no_records_following_tail(stream_id, &basin, &stream, tail_pos)
91 .await?;
92
93 let fencing_token = fencing_token.unwrap_or_default();
94 let trim_point = trim_point.unwrap_or(..SeqNum::MIN);
95
96 if trim_point.end == SeqNum::MAX {
97 return Err(StreamDeletionPendingError { basin, stream }.into());
98 }
99
100 let client_states = self.client_states.clone();
101 Ok(super::streamer::Spawner {
102 db: self.db.clone(),
103 stream_id,
104 config: meta.config,
105 tail_pos,
106 fencing_token,
107 trim_point,
108 append_inflight_max: self.append_inflight_max,
109 bgtask_trigger_tx: self.bgtask_trigger_tx.clone(),
110 }
111 .spawn(move || {
112 client_states.remove(&stream_id);
113 }))
114 }
115
116 async fn assert_no_records_following_tail(
117 &self,
118 stream_id: StreamId,
119 basin: &BasinName,
120 stream: &StreamName,
121 tail_pos: StreamPosition,
122 ) -> Result<(), StorageError> {
123 let start_key = kv::stream_record_data::ser_key(
124 stream_id,
125 StreamPosition {
126 seq_num: tail_pos.seq_num,
127 timestamp: 0,
128 },
129 );
130 static SCAN_OPTS: ScanOptions = ScanOptions {
131 durability_filter: DurabilityLevel::Remote,
132 dirty: false,
133 read_ahead_bytes: 1,
134 cache_blocks: false,
135 max_fetch_tasks: 1,
136 };
137 let mut it = self.db.scan_with_options(start_key.., &SCAN_OPTS).await?;
138 let Some(kv) = it.next().await? else {
139 return Ok(());
140 };
141 if kv.key.first().copied() != Some(kv::KeyType::StreamRecordData.ordinal()) {
142 return Ok(());
143 }
144 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
145 assert!(
146 deser_stream_id != stream_id,
147 "invariant violation: stream `{basin}/{stream}` tail_pos {tail_pos:?} but found record at {pos:?}"
148 );
149 Ok(())
150 }
151
152 fn streamer_client_state(&self, basin: &BasinName, stream: &StreamName) -> StreamerClientState {
153 match self.client_states.entry(StreamId::new(basin, stream)) {
154 dashmap::Entry::Occupied(oe) => oe.get().clone(),
155 dashmap::Entry::Vacant(ve) => {
156 let this = self.clone();
157 let stream_id = *(ve.key());
158 let basin = basin.clone();
159 let stream = stream.clone();
160 tokio::spawn(async move {
161 let state = match this.start_streamer(basin, stream).await {
162 Ok(client) => StreamerClientState::Ready { client },
163 Err(error) => StreamerClientState::InitError {
164 error: Box::new(error),
165 timestamp: Instant::now(),
166 },
167 };
168 let replaced_state = this.client_states.insert(stream_id, state);
169 let Some(StreamerClientState::Blocked { notify }) = replaced_state else {
170 panic!("expected Blocked client but replaced: {replaced_state:?}");
171 };
172 notify.notify_waiters();
173 });
174 ve.insert(StreamerClientState::Blocked {
175 notify: Default::default(),
176 })
177 .value()
178 .clone()
179 }
180 }
181 }
182
183 fn streamer_remove_unready(&self, stream_id: StreamId) {
184 if let dashmap::Entry::Occupied(oe) = self.client_states.entry(stream_id)
185 && let StreamerClientState::InitError { .. } = oe.get()
186 {
187 oe.remove();
188 }
189 }
190
191 pub(super) async fn streamer_client(
192 &self,
193 basin: &BasinName,
194 stream: &StreamName,
195 ) -> Result<StreamerClient, StreamerError> {
196 let mut waited = false;
197 loop {
198 match self.streamer_client_state(basin, stream) {
199 StreamerClientState::Blocked { notify } => {
200 notify.notified().await;
201 waited = true;
202 }
203 StreamerClientState::InitError { error, timestamp } => {
204 if !waited || timestamp.elapsed() > Self::FAILED_INIT_MEMORY {
205 self.streamer_remove_unready(StreamId::new(basin, stream));
206 } else {
207 return Err(*error);
208 }
209 }
210 StreamerClientState::Ready { client } => {
211 return Ok(client);
212 }
213 }
214 }
215 }
216
217 pub(super) fn streamer_client_if_active(
218 &self,
219 basin: &BasinName,
220 stream: &StreamName,
221 ) -> Option<StreamerClient> {
222 match self.streamer_client_state(basin, stream) {
223 StreamerClientState::Ready { client } => Some(client),
224 _ => None,
225 }
226 }
227
228 pub(super) async fn streamer_client_with_auto_create<E>(
229 &self,
230 basin: &BasinName,
231 stream: &StreamName,
232 should_auto_create: impl FnOnce(&BasinConfig) -> bool,
233 ) -> Result<StreamerClient, E>
234 where
235 E: From<StreamerError>
236 + From<StorageError>
237 + From<BasinNotFoundError>
238 + From<TransactionConflictError>
239 + From<BasinDeletionPendingError>
240 + From<StreamDeletionPendingError>
241 + From<StreamNotFoundError>,
242 {
243 match self.streamer_client(basin, stream).await {
244 Ok(client) => Ok(client),
245 Err(StreamerError::StreamNotFound(e)) => {
246 let config = match self.get_basin_config(basin.clone()).await {
247 Ok(config) => config,
248 Err(GetBasinConfigError::Storage(e)) => Err(e)?,
249 Err(GetBasinConfigError::BasinNotFound(e)) => Err(e)?,
250 };
251 if should_auto_create(&config) {
252 if let Err(e) = self
253 .create_stream(
254 basin.clone(),
255 stream.clone(),
256 OptionalStreamConfig::default(),
257 CreateMode::CreateOnly(None),
258 )
259 .await
260 {
261 match e {
262 CreateStreamError::Storage(e) => Err(e)?,
263 CreateStreamError::TransactionConflict(e) => Err(e)?,
264 CreateStreamError::BasinDeletionPending(e) => Err(e)?,
265 CreateStreamError::StreamDeletionPending(e) => Err(e)?,
266 CreateStreamError::BasinNotFound(e) => Err(e)?,
267 CreateStreamError::StreamAlreadyExists(_) => {}
268 }
269 }
270 Ok(self.streamer_client(basin, stream).await?)
271 } else {
272 Err(e.into())
273 }
274 }
275 Err(e) => Err(e.into()),
276 }
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use std::str::FromStr as _;
283
284 use bytes::Bytes;
285 use s2_common::record::{Metered, Record, StreamPosition};
286 use slatedb::{WriteBatch, config::WriteOptions, object_store};
287 use time::OffsetDateTime;
288
289 use super::*;
290
291 #[tokio::test]
292 #[should_panic(expected = "invariant violation: stream `testbasin1/stream1` tail_pos")]
293 async fn start_streamer_fails_if_records_exist_after_tail_pos() {
294 let object_store: Arc<dyn object_store::ObjectStore> =
295 Arc::new(object_store::memory::InMemory::new());
296 let db = slatedb::Db::builder("test", object_store)
297 .build()
298 .await
299 .unwrap();
300
301 let backend = Backend::new(db.clone(), ByteSize::b(1));
302
303 let basin = BasinName::from_str("testbasin1").unwrap();
304 let stream = StreamName::from_str("stream1").unwrap();
305 let stream_id = StreamId::new(&basin, &stream);
306
307 let meta = kv::stream_meta::StreamMeta {
308 config: OptionalStreamConfig::default(),
309 created_at: OffsetDateTime::now_utc(),
310 deleted_at: None,
311 creation_idempotency_key: None,
312 };
313
314 let tail_pos = StreamPosition {
315 seq_num: 1,
316 timestamp: 123,
317 };
318 let record_pos = StreamPosition {
319 seq_num: tail_pos.seq_num,
320 timestamp: tail_pos.timestamp,
321 };
322
323 let record = Record::try_from_parts(vec![], Bytes::from_static(b"hello")).unwrap();
324 let metered_record: Metered<Record> = record.into();
325
326 let mut wb = WriteBatch::new();
327 wb.put(
328 kv::stream_meta::ser_key(&basin, &stream),
329 kv::stream_meta::ser_value(&meta),
330 );
331 wb.put(
332 kv::stream_tail_position::ser_key(stream_id),
333 kv::stream_tail_position::ser_value(tail_pos),
334 );
335 wb.put(
336 kv::stream_record_data::ser_key(stream_id, record_pos),
337 kv::stream_record_data::ser_value(metered_record.as_ref()),
338 );
339 static WRITE_OPTS: WriteOptions = WriteOptions {
340 await_durable: true,
341 };
342 db.write_with_options(wb, &WRITE_OPTS).await.unwrap();
343
344 backend
345 .start_streamer(basin.clone(), stream.clone())
346 .await
347 .unwrap();
348 }
349}