1use std::{sync::Arc, time::Duration};
2
3use bytesize::ByteSize;
4use dashmap::DashMap;
5use enum_ordinalize::Ordinalize;
6use s2_common::{
7 record::{NonZeroSeqNum, SeqNum, StreamPosition},
8 types::{
9 basin::BasinName,
10 config::{BasinConfig, OptionalStreamConfig},
11 resources::CreateMode,
12 stream::StreamName,
13 },
14};
15use slatedb::config::{DurabilityLevel, ScanOptions};
16use tokio::{sync::broadcast, time::Instant};
17
18use super::{
19 error::{
20 BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, GetBasinConfigError,
21 StorageError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
22 TransactionConflictError,
23 },
24 kv,
25 stream_id::StreamId,
26 streamer::{StreamerClient, StreamerClientState},
27};
28use crate::backend::bgtasks::BgtaskTrigger;
29
30#[derive(Clone)]
31pub struct Backend {
32 pub(super) db: slatedb::Db,
33 client_states: Arc<DashMap<StreamId, StreamerClientState>>,
34 append_inflight_max: ByteSize,
35 bgtask_trigger_tx: broadcast::Sender<BgtaskTrigger>,
36}
37
38impl Backend {
39 const FAILED_INIT_MEMORY: Duration = Duration::from_secs(1);
40
41 pub fn new(db: slatedb::Db, append_inflight_max: ByteSize) -> Self {
42 let (bgtask_trigger_tx, _) = broadcast::channel(16);
43 Self {
44 db,
45 client_states: Arc::new(DashMap::new()),
46 append_inflight_max,
47 bgtask_trigger_tx,
48 }
49 }
50
51 pub(super) fn bgtask_trigger(&self, trigger: BgtaskTrigger) {
52 let _ = self.bgtask_trigger_tx.send(trigger);
53 }
54
55 pub(super) fn bgtask_trigger_subscribe(&self) -> broadcast::Receiver<BgtaskTrigger> {
56 self.bgtask_trigger_tx.subscribe()
57 }
58
59 async fn start_streamer(
60 &self,
61 basin: BasinName,
62 stream: StreamName,
63 ) -> Result<StreamerClient, StreamerError> {
64 let stream_id = StreamId::new(&basin, &stream);
65
66 let (meta, tail_pos, fencing_token, trim_point) = tokio::try_join!(
67 self.db_get(
68 kv::stream_meta::ser_key(&basin, &stream),
69 kv::stream_meta::deser_value,
70 ),
71 self.db_get(
72 kv::stream_tail_position::ser_key(stream_id),
73 kv::stream_tail_position::deser_value,
74 ),
75 self.db_get(
76 kv::stream_fencing_token::ser_key(stream_id),
77 kv::stream_fencing_token::deser_value,
78 ),
79 self.db_get(
80 kv::stream_trim_point::ser_key(stream_id),
81 kv::stream_trim_point::deser_value,
82 )
83 )?;
84
85 let Some(meta) = meta else {
86 return Err(StreamNotFoundError { basin, stream }.into());
87 };
88
89 let tail_pos = tail_pos.map(|(pos, _)| pos).unwrap_or(StreamPosition::MIN);
90 self.assert_no_records_following_tail(stream_id, &basin, &stream, tail_pos)
91 .await?;
92
93 let fencing_token = fencing_token.unwrap_or_default();
94
95 if trim_point == Some(..NonZeroSeqNum::MAX) {
96 return Err(StreamDeletionPendingError { basin, stream }.into());
97 }
98
99 let client_states = self.client_states.clone();
100 Ok(super::streamer::Spawner {
101 db: self.db.clone(),
102 stream_id,
103 config: meta.config,
104 tail_pos,
105 fencing_token,
106 trim_point: ..trim_point.map_or(SeqNum::MIN, |tp| tp.end.get()),
107 append_inflight_max: self.append_inflight_max,
108 bgtask_trigger_tx: self.bgtask_trigger_tx.clone(),
109 }
110 .spawn(move || {
111 client_states.remove(&stream_id);
112 }))
113 }
114
115 async fn assert_no_records_following_tail(
116 &self,
117 stream_id: StreamId,
118 basin: &BasinName,
119 stream: &StreamName,
120 tail_pos: StreamPosition,
121 ) -> Result<(), StorageError> {
122 let start_key = kv::stream_record_data::ser_key(
123 stream_id,
124 StreamPosition {
125 seq_num: tail_pos.seq_num,
126 timestamp: 0,
127 },
128 );
129 static SCAN_OPTS: ScanOptions = ScanOptions {
130 durability_filter: DurabilityLevel::Remote,
131 dirty: false,
132 read_ahead_bytes: 1,
133 cache_blocks: false,
134 max_fetch_tasks: 1,
135 };
136 let mut it = self.db.scan_with_options(start_key.., &SCAN_OPTS).await?;
137 let Some(kv) = it.next().await? else {
138 return Ok(());
139 };
140 if kv.key.first().copied() != Some(kv::KeyType::StreamRecordData.ordinal()) {
141 return Ok(());
142 }
143 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
144 assert!(
145 deser_stream_id != stream_id,
146 "invariant violation: stream `{basin}/{stream}` tail_pos {tail_pos:?} but found record at {pos:?}"
147 );
148 Ok(())
149 }
150
151 fn streamer_client_state(&self, basin: &BasinName, stream: &StreamName) -> StreamerClientState {
152 match self.client_states.entry(StreamId::new(basin, stream)) {
153 dashmap::Entry::Occupied(oe) => oe.get().clone(),
154 dashmap::Entry::Vacant(ve) => {
155 let this = self.clone();
156 let stream_id = *(ve.key());
157 let basin = basin.clone();
158 let stream = stream.clone();
159 tokio::spawn(async move {
160 let state = match this.start_streamer(basin, stream).await {
161 Ok(client) => StreamerClientState::Ready { client },
162 Err(error) => StreamerClientState::InitError {
163 error: Box::new(error),
164 timestamp: Instant::now(),
165 },
166 };
167 let replaced_state = this.client_states.insert(stream_id, state);
168 let Some(StreamerClientState::Blocked { notify }) = replaced_state else {
169 panic!("expected Blocked client but replaced: {replaced_state:?}");
170 };
171 notify.notify_waiters();
172 });
173 ve.insert(StreamerClientState::Blocked {
174 notify: Default::default(),
175 })
176 .value()
177 .clone()
178 }
179 }
180 }
181
182 fn streamer_remove_unready(&self, stream_id: StreamId) {
183 if let dashmap::Entry::Occupied(oe) = self.client_states.entry(stream_id)
184 && let StreamerClientState::InitError { .. } = oe.get()
185 {
186 oe.remove();
187 }
188 }
189
190 pub(super) async fn streamer_client(
191 &self,
192 basin: &BasinName,
193 stream: &StreamName,
194 ) -> Result<StreamerClient, StreamerError> {
195 let mut waited = false;
196 loop {
197 match self.streamer_client_state(basin, stream) {
198 StreamerClientState::Blocked { notify } => {
199 notify.notified().await;
200 waited = true;
201 }
202 StreamerClientState::InitError { error, timestamp } => {
203 if !waited || timestamp.elapsed() > Self::FAILED_INIT_MEMORY {
204 self.streamer_remove_unready(StreamId::new(basin, stream));
205 } else {
206 return Err(*error);
207 }
208 }
209 StreamerClientState::Ready { client } => {
210 return Ok(client);
211 }
212 }
213 }
214 }
215
216 pub(super) fn streamer_client_if_active(
217 &self,
218 basin: &BasinName,
219 stream: &StreamName,
220 ) -> Option<StreamerClient> {
221 match self.streamer_client_state(basin, stream) {
222 StreamerClientState::Ready { client } => Some(client),
223 _ => None,
224 }
225 }
226
227 pub(super) async fn streamer_client_with_auto_create<E>(
228 &self,
229 basin: &BasinName,
230 stream: &StreamName,
231 should_auto_create: impl FnOnce(&BasinConfig) -> bool,
232 ) -> Result<StreamerClient, E>
233 where
234 E: From<StreamerError>
235 + From<StorageError>
236 + From<BasinNotFoundError>
237 + From<TransactionConflictError>
238 + From<BasinDeletionPendingError>
239 + From<StreamDeletionPendingError>
240 + From<StreamNotFoundError>,
241 {
242 match self.streamer_client(basin, stream).await {
243 Ok(client) => Ok(client),
244 Err(StreamerError::StreamNotFound(e)) => {
245 let config = match self.get_basin_config(basin.clone()).await {
246 Ok(config) => config,
247 Err(GetBasinConfigError::Storage(e)) => Err(e)?,
248 Err(GetBasinConfigError::BasinNotFound(e)) => Err(e)?,
249 };
250 if should_auto_create(&config) {
251 if let Err(e) = self
252 .create_stream(
253 basin.clone(),
254 stream.clone(),
255 OptionalStreamConfig::default(),
256 CreateMode::CreateOnly(None),
257 )
258 .await
259 {
260 match e {
261 CreateStreamError::Storage(e) => Err(e)?,
262 CreateStreamError::TransactionConflict(e) => Err(e)?,
263 CreateStreamError::BasinDeletionPending(e) => Err(e)?,
264 CreateStreamError::StreamDeletionPending(e) => Err(e)?,
265 CreateStreamError::BasinNotFound(e) => Err(e)?,
266 CreateStreamError::StreamAlreadyExists(_) => {}
267 }
268 }
269 Ok(self.streamer_client(basin, stream).await?)
270 } else {
271 Err(e.into())
272 }
273 }
274 Err(e) => Err(e.into()),
275 }
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use std::str::FromStr as _;
282
283 use bytes::Bytes;
284 use s2_common::record::{Metered, Record, StreamPosition};
285 use slatedb::{WriteBatch, config::WriteOptions, object_store};
286 use time::OffsetDateTime;
287
288 use super::*;
289
290 #[tokio::test]
291 #[should_panic(expected = "invariant violation: stream `testbasin1/stream1` tail_pos")]
292 async fn start_streamer_fails_if_records_exist_after_tail_pos() {
293 let object_store: Arc<dyn object_store::ObjectStore> =
294 Arc::new(object_store::memory::InMemory::new());
295 let db = slatedb::Db::builder("test", object_store)
296 .build()
297 .await
298 .unwrap();
299
300 let backend = Backend::new(db.clone(), ByteSize::b(1));
301
302 let basin = BasinName::from_str("testbasin1").unwrap();
303 let stream = StreamName::from_str("stream1").unwrap();
304 let stream_id = StreamId::new(&basin, &stream);
305
306 let meta = kv::stream_meta::StreamMeta {
307 config: OptionalStreamConfig::default(),
308 created_at: OffsetDateTime::now_utc(),
309 deleted_at: None,
310 creation_idempotency_key: None,
311 };
312
313 let tail_pos = StreamPosition {
314 seq_num: 1,
315 timestamp: 123,
316 };
317 let record_pos = StreamPosition {
318 seq_num: tail_pos.seq_num,
319 timestamp: tail_pos.timestamp,
320 };
321
322 let record = Record::try_from_parts(vec![], Bytes::from_static(b"hello")).unwrap();
323 let metered_record: Metered<Record> = record.into();
324
325 let mut wb = WriteBatch::new();
326 wb.put(
327 kv::stream_meta::ser_key(&basin, &stream),
328 kv::stream_meta::ser_value(&meta),
329 );
330 wb.put(
331 kv::stream_tail_position::ser_key(stream_id),
332 kv::stream_tail_position::ser_value(
333 tail_pos,
334 kv::timestamp::TimestampSecs::from_secs(1),
335 ),
336 );
337 wb.put(
338 kv::stream_record_data::ser_key(stream_id, record_pos),
339 kv::stream_record_data::ser_value(metered_record.as_ref()),
340 );
341 static WRITE_OPTS: WriteOptions = WriteOptions {
342 await_durable: true,
343 };
344 db.write_with_options(wb, &WRITE_OPTS).await.unwrap();
345
346 backend
347 .start_streamer(basin.clone(), stream.clone())
348 .await
349 .unwrap();
350 }
351}