durable_streams_server/storage/mod.rs
1//! Storage backends and the persistence contract used by the server.
2//!
3//! [`Storage`] is the central abstraction. The built-in implementations are:
4//!
5//! - [`memory::InMemoryStorage`] for ephemeral development and tests
6//! - [`file::FileStorage`] for append-log persistence on the local filesystem
7//! - [`acid::AcidStorage`] for crash-resilient redb-backed persistence
8
9pub mod acid;
10pub mod file;
11pub mod memory;
12
13use crate::protocol::error::{Error, Result};
14use crate::protocol::offset::Offset;
15use crate::protocol::producer::ProducerHeaders;
16use bytes::Bytes;
17use chrono::{DateTime, Utc};
18use std::collections::HashMap;
19use tokio::sync::broadcast;
20
21/// Immutable stream configuration captured at create time.
22///
23/// This is the durable metadata returned by `HEAD` and used for idempotent
24/// create checks. The server treats it as part of the stream identity: recreating
25/// an existing stream with a different configuration is a conflict.
26///
27/// Custom `PartialEq`: when `ttl_seconds` is `Some`, `expires_at` is
28/// derived from `Utc::now()` and will drift between requests. The
29/// comparison therefore ignores `expires_at` in that case. When
30/// `ttl_seconds` is `None` and `expires_at` was set directly (via
31/// `Expires-At` header), the parsed timestamp is stable so we compare it.
32#[derive(Debug, Clone, Eq, serde::Serialize, serde::Deserialize)]
33pub struct StreamConfig {
34 /// Content-Type header value (normalized, lowercase)
35 pub content_type: String,
36 /// Time-to-live in seconds (optional)
37 pub ttl_seconds: Option<u64>,
38 /// Absolute expiration time (optional)
39 pub expires_at: Option<DateTime<Utc>>,
40 /// Whether the stream was created closed
41 pub created_closed: bool,
42}
43
44impl PartialEq for StreamConfig {
45 fn eq(&self, other: &Self) -> bool {
46 self.content_type == other.content_type
47 && self.ttl_seconds == other.ttl_seconds
48 && self.created_closed == other.created_closed
49 && if self.ttl_seconds.is_some() {
50 // TTL-derived expires_at drifts with Utc::now(); skip comparison
51 true
52 } else {
53 self.expires_at == other.expires_at
54 }
55 }
56}
57
58impl StreamConfig {
59 /// Create a config with a normalized content type and default flags.
60 #[must_use]
61 pub fn new(content_type: String) -> Self {
62 Self {
63 content_type,
64 ttl_seconds: None,
65 expires_at: None,
66 created_closed: false,
67 }
68 }
69
70 /// Set a relative time-to-live in seconds.
71 #[must_use]
72 pub fn with_ttl(mut self, ttl_seconds: u64) -> Self {
73 self.ttl_seconds = Some(ttl_seconds);
74 self
75 }
76
77 /// Set an absolute expiration timestamp.
78 #[must_use]
79 pub fn with_expires_at(mut self, expires_at: DateTime<Utc>) -> Self {
80 self.expires_at = Some(expires_at);
81 self
82 }
83
84 /// Record that the stream should be considered created in the closed state.
85 #[must_use]
86 pub fn with_created_closed(mut self, created_closed: bool) -> Self {
87 self.created_closed = created_closed;
88 self
89 }
90}
91
92/// Stored message plus bookkeeping metadata used by storage backends.
93#[derive(Debug, Clone)]
94pub struct Message {
95 /// Message offset (unique identifier within stream)
96 pub offset: Offset,
97 /// Message data
98 pub data: Bytes,
99 /// Byte length (for memory tracking)
100 pub byte_len: u64,
101}
102
103impl Message {
104 /// Create a stored message and derive its tracked byte length.
105 #[must_use]
106 pub fn new(offset: Offset, data: Bytes) -> Self {
107 let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
108 Self {
109 offset,
110 data,
111 byte_len,
112 }
113 }
114}
115
116/// Snapshot returned by [`Storage::read`].
117///
118/// Handlers map this directly into catch-up, long-poll, and SSE responses.
119#[derive(Debug)]
120pub struct ReadResult {
121 /// Messages read
122 pub messages: Vec<Bytes>,
123 /// Next offset to read from (for resumption)
124 pub next_offset: Offset,
125 /// Whether we're at the end of the stream
126 pub at_tail: bool,
127 /// Whether the stream is closed
128 pub closed: bool,
129}
130
131/// Stream-level metadata returned by [`Storage::head`].
132#[derive(Debug, Clone)]
133pub struct StreamMetadata {
134 /// Stream configuration
135 pub config: StreamConfig,
136 /// Next offset that will be assigned
137 pub next_offset: Offset,
138 /// Whether the stream is closed
139 pub closed: bool,
140 /// Total bytes stored in this stream
141 pub total_bytes: u64,
142 /// Number of messages in the stream
143 pub message_count: u64,
144 /// Creation timestamp
145 pub created_at: DateTime<Utc>,
146}
147
148/// Outcome of [`Storage::create_stream`].
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
150pub enum CreateStreamResult {
151 /// A new stream was created.
152 Created,
153 /// Stream already existed with matching config (idempotent create).
154 AlreadyExists,
155}
156
157/// Outcome of [`Storage::create_stream_with_data`].
158///
159/// Bundles creation status with a metadata snapshot taken under the
160/// same lock hold so the handler never needs a separate `head()` call.
161#[derive(Debug)]
162pub struct CreateWithDataResult {
163 /// Whether the stream was newly created or already existed.
164 pub status: CreateStreamResult,
165 /// Next offset (for `Stream-Next-Offset` response header).
166 pub next_offset: Offset,
167 /// Whether the stream is closed (for `Stream-Closed` response header).
168 pub closed: bool,
169}
170
171/// Outcome of [`Storage::append_with_producer`].
172///
173/// Includes a snapshot of stream state taken atomically with the operation
174/// so handlers never need a separate `head()` call.
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub enum ProducerAppendResult {
177 /// New data accepted (200 OK)
178 Accepted {
179 epoch: u64,
180 seq: u64,
181 next_offset: Offset,
182 closed: bool,
183 },
184 /// Duplicate detected, data already persisted (204 No Content)
185 Duplicate {
186 epoch: u64,
187 seq: u64,
188 next_offset: Offset,
189 closed: bool,
190 },
191}
192
193// ── Shared constants and validation helpers ─────────────────────────
194//
195// Extracted from InMemoryStorage/FileStorage to avoid duplication.
196// Both implementations delegate to these for content-type, seq,
197// producer, and expiry validation.
198
199/// Duration after which stale producer state is cleaned up (7 days).
200pub(crate) const PRODUCER_STATE_TTL_SECS: i64 = 7 * 24 * 60 * 60;
201
202/// Broadcast channel capacity for long-poll/SSE notifications.
203/// Small because notifications are hints (no payload), not data delivery.
204pub(crate) const NOTIFY_CHANNEL_CAPACITY: usize = 16;
205
206/// Per-producer state tracked within a stream.
207///
208/// Shared between storage implementations. Includes serde derives
209/// for the file-backed storage which persists this to disk.
210#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
211pub(crate) struct ProducerState {
212 pub epoch: u64,
213 pub last_seq: u64,
214 pub updated_at: DateTime<Utc>,
215}
216
217/// Outcome of producer validation before any mutation.
218pub(crate) enum ProducerCheck {
219 /// Request is valid; proceed with append.
220 Accept,
221 /// Request is a duplicate; return idempotent success.
222 Duplicate { epoch: u64, seq: u64 },
223}
224
225/// Check if a stream has expired based on its configuration.
226pub(crate) fn is_stream_expired(config: &StreamConfig) -> bool {
227 config
228 .expires_at
229 .is_some_and(|expires_at| Utc::now() >= expires_at)
230}
231
232/// Validate content-type matches the stream's configured type (case-insensitive).
233pub(crate) fn validate_content_type(stream_ct: &str, request_ct: &str) -> Result<()> {
234 if !request_ct.eq_ignore_ascii_case(stream_ct) {
235 return Err(Error::ContentTypeMismatch {
236 expected: stream_ct.to_string(),
237 actual: request_ct.to_string(),
238 });
239 }
240 Ok(())
241}
242
243/// Validate Stream-Seq ordering and return the pending value to commit.
244///
245/// Returns `Err(SeqOrderingViolation)` if the new seq is not strictly
246/// greater than the last seq (lexicographic comparison).
247pub(crate) fn validate_seq(
248 last_seq: Option<&str>,
249 new_seq: Option<&str>,
250) -> Result<Option<String>> {
251 if let Some(new) = new_seq {
252 if let Some(last) = last_seq
253 && new <= last
254 {
255 return Err(Error::SeqOrderingViolation {
256 last: last.to_string(),
257 received: new.to_string(),
258 });
259 }
260 return Ok(Some(new.to_string()));
261 }
262 Ok(None)
263}
264
265/// Remove producer state entries older than `PRODUCER_STATE_TTL_SECS`.
266pub(crate) fn cleanup_stale_producers(producers: &mut HashMap<String, ProducerState>) {
267 let cutoff = Utc::now()
268 - chrono::TimeDelta::try_seconds(PRODUCER_STATE_TTL_SECS)
269 .expect("7 days fits in TimeDelta");
270 producers.retain(|_, state| state.updated_at > cutoff);
271}
272
273/// Validate producer epoch/sequence against existing state.
274///
275/// Implements the standard validation order:
276/// 1. Epoch fencing (403)
277/// 2. Duplicate detection (204) — before closed check so retries work
278/// 3. Closed check (409) — blocks new sequences on closed streams
279/// 4. Gap / epoch-bump validation
280pub(crate) fn check_producer(
281 existing: Option<&ProducerState>,
282 producer: &ProducerHeaders,
283 stream_closed: bool,
284) -> Result<ProducerCheck> {
285 if let Some(state) = existing {
286 if producer.epoch < state.epoch {
287 return Err(Error::EpochFenced {
288 current: state.epoch,
289 received: producer.epoch,
290 });
291 }
292
293 if producer.epoch == state.epoch && producer.seq <= state.last_seq {
294 return Ok(ProducerCheck::Duplicate {
295 epoch: state.epoch,
296 seq: state.last_seq,
297 });
298 }
299
300 // Not a duplicate — if stream is closed, reject
301 if stream_closed {
302 return Err(Error::StreamClosed);
303 }
304
305 if producer.epoch > state.epoch {
306 if producer.seq != 0 {
307 return Err(Error::InvalidProducerState(
308 "new epoch must start at seq 0".to_string(),
309 ));
310 }
311 } else if producer.seq > state.last_seq + 1 {
312 return Err(Error::SequenceGap {
313 expected: state.last_seq + 1,
314 actual: producer.seq,
315 });
316 }
317 } else {
318 // New producer
319 if stream_closed {
320 return Err(Error::StreamClosed);
321 }
322 if producer.seq != 0 {
323 return Err(Error::SequenceGap {
324 expected: 0,
325 actual: producer.seq,
326 });
327 }
328 }
329 Ok(ProducerCheck::Accept)
330}
331
332/// Persistence contract for Durable Streams server state.
333///
334/// Methods are intentionally synchronous. The server keeps async boundaries in
335/// the HTTP and notification layers so storage implementations can focus on
336/// atomicity, ordering, and recovery.
337///
338/// Implementations are expected to preserve these invariants:
339///
340/// - per-stream offsets are monotonic
341/// - create, append, close, and delete are atomic at the stream level
342/// - duplicate producer appends are idempotent
343/// - reads observe a coherent snapshot
344/// - expired streams behave as if they no longer exist
345///
346/// Implementations must also be thread-safe (`Send + Sync`), because the axum
347/// server shares them across request handlers.
348///
349/// Error conditions are documented inline rather than in separate sections
350/// to avoid repetitive documentation on internal trait methods.
351#[allow(clippy::missing_errors_doc)]
352pub trait Storage: Send + Sync {
353 /// Create a stream entry with immutable configuration.
354 ///
355 /// Returns whether the stream was newly created or already existed with
356 /// matching configuration.
357 ///
358 /// Returns `Err(Error::ConfigMismatch)` if stream exists with different config.
359 fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult>;
360
361 /// Append one message to an existing stream.
362 ///
363 /// Generates and returns the offset assigned to the appended message.
364 /// Offsets must remain monotonically increasing within a stream.
365 ///
366 /// Returns `Err(Error::StreamClosed)` if stream is closed.
367 /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
368 fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>;
369
370 /// Append a batch of messages as one atomic operation.
371 ///
372 /// All messages are validated and committed as a single atomic operation.
373 /// Either all messages are appended successfully, or none are.
374 /// Returns the next offset (the offset that will be assigned to the
375 /// next message appended after this batch).
376 ///
377 /// If `seq` is `Some`, validates lexicographic ordering against the
378 /// stream's last seq and updates it on success.
379 ///
380 /// Returns `Err(Error::StreamClosed)` if stream is closed.
381 /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
382 /// Returns `Err(Error::SeqOrderingViolation)` if seq <= last seq.
383 /// Returns `Err(Error::MemoryLimitExceeded)` if batch would exceed limits.
384 fn batch_append(
385 &self,
386 name: &str,
387 messages: Vec<Bytes>,
388 content_type: &str,
389 seq: Option<&str>,
390 ) -> Result<Offset>;
391
392 /// Read from a stream starting at `from_offset`.
393 ///
394 /// `Offset::start()` reads from the beginning of the stream.
395 /// `Offset::now()` positions the caller at the current tail and returns
396 /// an empty catch-up result.
397 ///
398 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
399 /// Returns `Err(Error::InvalidOffset)` if offset is invalid.
400 fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
401
402 /// Delete a stream and all of its persisted data.
403 ///
404 /// Returns `Ok(())` on successful deletion.
405 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
406 fn delete(&self, name: &str) -> Result<()>;
407
408 /// Return stream metadata without reading message bodies.
409 ///
410 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
411 fn head(&self, name: &str) -> Result<StreamMetadata>;
412
413 /// Mark a stream closed so future appends are rejected.
414 ///
415 /// Prevents further appends.
416 /// Returns `Ok(())` if already closed (idempotent).
417 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
418 fn close_stream(&self, name: &str) -> Result<()>;
419
420 /// Append with idempotent producer sequencing.
421 ///
422 /// Validates producer epoch/sequence, appends data if accepted, and
423 /// optionally closes the stream — all within a single lock hold.
424 ///
425 /// Returns `ProducerAppendResult::Accepted` for new data (200 OK).
426 /// Returns `ProducerAppendResult::Duplicate` for already-seen seq (204).
427 /// Returns `Err(EpochFenced)` if epoch < current (403).
428 /// Returns `Err(SequenceGap)` if seq > expected (409).
429 /// Returns `Err(InvalidProducerState)` if epoch bump with seq != 0 (400).
430 fn append_with_producer(
431 &self,
432 name: &str,
433 messages: Vec<Bytes>,
434 content_type: &str,
435 producer: &ProducerHeaders,
436 should_close: bool,
437 seq: Option<&str>,
438 ) -> Result<ProducerAppendResult>;
439
440 /// Atomically create a stream, optionally seed it with data, and optionally close it.
441 ///
442 /// Creates the stream, appends `messages` (if non-empty), and closes
443 /// (if `should_close`) — all before the entry becomes visible to other
444 /// operations. If `commit_messages` fails (e.g. memory limit), the
445 /// stream is never created.
446 ///
447 /// For idempotent recreates (`AlreadyExists`), the body and close
448 /// flag are ignored and existing metadata is returned.
449 fn create_stream_with_data(
450 &self,
451 name: &str,
452 config: StreamConfig,
453 messages: Vec<Bytes>,
454 should_close: bool,
455 ) -> Result<CreateWithDataResult>;
456
457 /// Check if a stream exists
458 fn exists(&self, name: &str) -> bool;
459
460 /// Subscribe to notifications for new data on a stream.
461 ///
462 /// Returns a broadcast receiver that fires when data is appended
463 /// or the stream is closed. Returns `None` if the stream does not
464 /// exist or has expired.
465 ///
466 /// The method itself is sync; the handler awaits on the receiver.
467 fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>>;
468
469 /// Proactively remove all expired streams, returning the count deleted.
470 ///
471 /// By default, expired streams are only cleaned up lazily when accessed.
472 /// This method sweeps all streams and deletes any that have expired,
473 /// reclaiming their resources immediately.
474 fn cleanup_expired_streams(&self) -> usize;
475}