Skip to main content

durable_streams_server/storage/
mod.rs

1//! Storage backends and the persistence contract used by the server.
2//!
3//! [`Storage`] is the central abstraction. The built-in implementations are:
4//!
5//! - [`memory::InMemoryStorage`] for ephemeral development and tests
6//! - [`file::FileStorage`] for append-log persistence on the local filesystem
7//! - [`acid::AcidStorage`] for crash-resilient redb-backed persistence
8
9pub mod acid;
10pub mod file;
11pub mod memory;
12
13use crate::protocol::error::{Error, Result};
14use crate::protocol::offset::Offset;
15use crate::protocol::producer::ProducerHeaders;
16use bytes::Bytes;
17use chrono::{DateTime, Utc};
18use std::collections::HashMap;
19use tokio::sync::broadcast;
20
21/// Immutable stream configuration captured at create time.
22///
23/// This is the durable metadata returned by `HEAD` and used for idempotent
24/// create checks. The server treats it as part of the stream identity: recreating
25/// an existing stream with a different configuration is a conflict.
26///
27/// Custom `PartialEq`: when `ttl_seconds` is `Some`, `expires_at` is
28/// derived from `Utc::now()` and will drift between requests.  The
29/// comparison therefore ignores `expires_at` in that case.  When
30/// `ttl_seconds` is `None` and `expires_at` was set directly (via
31/// `Expires-At` header), the parsed timestamp is stable so we compare it.
32#[derive(Debug, Clone, Eq, serde::Serialize, serde::Deserialize)]
33pub struct StreamConfig {
34    /// Content-Type header value (normalized, lowercase)
35    pub content_type: String,
36    /// Time-to-live in seconds (optional)
37    pub ttl_seconds: Option<u64>,
38    /// Absolute expiration time (optional)
39    pub expires_at: Option<DateTime<Utc>>,
40    /// Whether the stream was created closed
41    pub created_closed: bool,
42}
43
44impl PartialEq for StreamConfig {
45    fn eq(&self, other: &Self) -> bool {
46        self.content_type == other.content_type
47            && self.ttl_seconds == other.ttl_seconds
48            && self.created_closed == other.created_closed
49            && if self.ttl_seconds.is_some() {
50                // TTL-derived expires_at drifts with Utc::now(); skip comparison
51                true
52            } else {
53                self.expires_at == other.expires_at
54            }
55    }
56}
57
58impl StreamConfig {
59    /// Create a config with a normalized content type and default flags.
60    #[must_use]
61    pub fn new(content_type: String) -> Self {
62        Self {
63            content_type,
64            ttl_seconds: None,
65            expires_at: None,
66            created_closed: false,
67        }
68    }
69
70    /// Set a relative time-to-live in seconds.
71    #[must_use]
72    pub fn with_ttl(mut self, ttl_seconds: u64) -> Self {
73        self.ttl_seconds = Some(ttl_seconds);
74        self
75    }
76
77    /// Set an absolute expiration timestamp.
78    #[must_use]
79    pub fn with_expires_at(mut self, expires_at: DateTime<Utc>) -> Self {
80        self.expires_at = Some(expires_at);
81        self
82    }
83
84    /// Record that the stream should be considered created in the closed state.
85    #[must_use]
86    pub fn with_created_closed(mut self, created_closed: bool) -> Self {
87        self.created_closed = created_closed;
88        self
89    }
90}
91
92/// Stored message plus bookkeeping metadata used by storage backends.
93#[derive(Debug, Clone)]
94pub struct Message {
95    /// Message offset (unique identifier within stream)
96    pub offset: Offset,
97    /// Message data
98    pub data: Bytes,
99    /// Byte length (for memory tracking)
100    pub byte_len: u64,
101}
102
103impl Message {
104    /// Create a stored message and derive its tracked byte length.
105    #[must_use]
106    pub fn new(offset: Offset, data: Bytes) -> Self {
107        let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
108        Self {
109            offset,
110            data,
111            byte_len,
112        }
113    }
114}
115
116/// Snapshot returned by [`Storage::read`].
117///
118/// Handlers map this directly into catch-up, long-poll, and SSE responses.
119#[derive(Debug)]
120pub struct ReadResult {
121    /// Messages read
122    pub messages: Vec<Bytes>,
123    /// Next offset to read from (for resumption)
124    pub next_offset: Offset,
125    /// Whether we're at the end of the stream
126    pub at_tail: bool,
127    /// Whether the stream is closed
128    pub closed: bool,
129}
130
131/// Stream-level metadata returned by [`Storage::head`].
132#[derive(Debug, Clone)]
133pub struct StreamMetadata {
134    /// Stream configuration
135    pub config: StreamConfig,
136    /// Next offset that will be assigned
137    pub next_offset: Offset,
138    /// Whether the stream is closed
139    pub closed: bool,
140    /// Total bytes stored in this stream
141    pub total_bytes: u64,
142    /// Number of messages in the stream
143    pub message_count: u64,
144    /// Creation timestamp
145    pub created_at: DateTime<Utc>,
146}
147
148/// Outcome of [`Storage::create_stream`].
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
150pub enum CreateStreamResult {
151    /// A new stream was created.
152    Created,
153    /// Stream already existed with matching config (idempotent create).
154    AlreadyExists,
155}
156
157/// Outcome of [`Storage::create_stream_with_data`].
158///
159/// Bundles creation status with a metadata snapshot taken under the
160/// same lock hold so the handler never needs a separate `head()` call.
161#[derive(Debug)]
162pub struct CreateWithDataResult {
163    /// Whether the stream was newly created or already existed.
164    pub status: CreateStreamResult,
165    /// Next offset (for `Stream-Next-Offset` response header).
166    pub next_offset: Offset,
167    /// Whether the stream is closed (for `Stream-Closed` response header).
168    pub closed: bool,
169}
170
171/// Outcome of [`Storage::append_with_producer`].
172///
173/// Includes a snapshot of stream state taken atomically with the operation
174/// so handlers never need a separate `head()` call.
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub enum ProducerAppendResult {
177    /// New data accepted (200 OK)
178    Accepted {
179        epoch: u64,
180        seq: u64,
181        next_offset: Offset,
182        closed: bool,
183    },
184    /// Duplicate detected, data already persisted (204 No Content)
185    Duplicate {
186        epoch: u64,
187        seq: u64,
188        next_offset: Offset,
189        closed: bool,
190    },
191}
192
193// ── Shared constants and validation helpers ─────────────────────────
194//
195// Extracted from InMemoryStorage/FileStorage to avoid duplication.
196// Both implementations delegate to these for content-type, seq,
197// producer, and expiry validation.
198
199/// Duration after which stale producer state is cleaned up (7 days).
200pub(crate) const PRODUCER_STATE_TTL_SECS: i64 = 7 * 24 * 60 * 60;
201
202/// Broadcast channel capacity for long-poll/SSE notifications.
203/// Small because notifications are hints (no payload), not data delivery.
204pub(crate) const NOTIFY_CHANNEL_CAPACITY: usize = 16;
205
206/// Per-producer state tracked within a stream.
207///
208/// Shared between storage implementations. Includes serde derives
209/// for the file-backed storage which persists this to disk.
210#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
211pub(crate) struct ProducerState {
212    pub epoch: u64,
213    pub last_seq: u64,
214    pub updated_at: DateTime<Utc>,
215}
216
217/// Outcome of producer validation before any mutation.
218pub(crate) enum ProducerCheck {
219    /// Request is valid; proceed with append.
220    Accept,
221    /// Request is a duplicate; return idempotent success.
222    Duplicate { epoch: u64, seq: u64 },
223}
224
225/// Check if a stream has expired based on its configuration.
226pub(crate) fn is_stream_expired(config: &StreamConfig) -> bool {
227    config
228        .expires_at
229        .is_some_and(|expires_at| Utc::now() >= expires_at)
230}
231
232/// Validate content-type matches the stream's configured type (case-insensitive).
233pub(crate) fn validate_content_type(stream_ct: &str, request_ct: &str) -> Result<()> {
234    if !request_ct.eq_ignore_ascii_case(stream_ct) {
235        return Err(Error::ContentTypeMismatch {
236            expected: stream_ct.to_string(),
237            actual: request_ct.to_string(),
238        });
239    }
240    Ok(())
241}
242
243/// Validate Stream-Seq ordering and return the pending value to commit.
244///
245/// Returns `Err(SeqOrderingViolation)` if the new seq is not strictly
246/// greater than the last seq (lexicographic comparison).
247pub(crate) fn validate_seq(
248    last_seq: Option<&str>,
249    new_seq: Option<&str>,
250) -> Result<Option<String>> {
251    if let Some(new) = new_seq {
252        if let Some(last) = last_seq
253            && new <= last
254        {
255            return Err(Error::SeqOrderingViolation {
256                last: last.to_string(),
257                received: new.to_string(),
258            });
259        }
260        return Ok(Some(new.to_string()));
261    }
262    Ok(None)
263}
264
265/// Remove producer state entries older than `PRODUCER_STATE_TTL_SECS`.
266pub(crate) fn cleanup_stale_producers(producers: &mut HashMap<String, ProducerState>) {
267    let cutoff = Utc::now()
268        - chrono::TimeDelta::try_seconds(PRODUCER_STATE_TTL_SECS)
269            .expect("7 days fits in TimeDelta");
270    producers.retain(|_, state| state.updated_at > cutoff);
271}
272
273/// Validate producer epoch/sequence against existing state.
274///
275/// Implements the standard validation order:
276///   1. Epoch fencing (403)
277///   2. Duplicate detection (204) — before closed check so retries work
278///   3. Closed check (409) — blocks new sequences on closed streams
279///   4. Gap / epoch-bump validation
280pub(crate) fn check_producer(
281    existing: Option<&ProducerState>,
282    producer: &ProducerHeaders,
283    stream_closed: bool,
284) -> Result<ProducerCheck> {
285    if let Some(state) = existing {
286        if producer.epoch < state.epoch {
287            return Err(Error::EpochFenced {
288                current: state.epoch,
289                received: producer.epoch,
290            });
291        }
292
293        if producer.epoch == state.epoch && producer.seq <= state.last_seq {
294            return Ok(ProducerCheck::Duplicate {
295                epoch: state.epoch,
296                seq: state.last_seq,
297            });
298        }
299
300        // Not a duplicate — if stream is closed, reject
301        if stream_closed {
302            return Err(Error::StreamClosed);
303        }
304
305        if producer.epoch > state.epoch {
306            if producer.seq != 0 {
307                return Err(Error::InvalidProducerState(
308                    "new epoch must start at seq 0".to_string(),
309                ));
310            }
311        } else if producer.seq > state.last_seq + 1 {
312            return Err(Error::SequenceGap {
313                expected: state.last_seq + 1,
314                actual: producer.seq,
315            });
316        }
317    } else {
318        // New producer
319        if stream_closed {
320            return Err(Error::StreamClosed);
321        }
322        if producer.seq != 0 {
323            return Err(Error::SequenceGap {
324                expected: 0,
325                actual: producer.seq,
326            });
327        }
328    }
329    Ok(ProducerCheck::Accept)
330}
331
332/// Persistence contract for Durable Streams server state.
333///
334/// Methods are intentionally synchronous. The server keeps async boundaries in
335/// the HTTP and notification layers so storage implementations can focus on
336/// atomicity, ordering, and recovery.
337///
338/// Implementations are expected to preserve these invariants:
339///
340/// - per-stream offsets are monotonic
341/// - create, append, close, and delete are atomic at the stream level
342/// - duplicate producer appends are idempotent
343/// - reads observe a coherent snapshot
344/// - expired streams behave as if they no longer exist
345///
346/// Implementations must also be thread-safe (`Send + Sync`), because the axum
347/// server shares them across request handlers.
348///
349/// Error conditions are documented inline rather than in separate sections
350/// to avoid repetitive documentation on internal trait methods.
351#[allow(clippy::missing_errors_doc)]
352pub trait Storage: Send + Sync {
353    /// Create a stream entry with immutable configuration.
354    ///
355    /// Returns whether the stream was newly created or already existed with
356    /// matching configuration.
357    ///
358    /// Returns `Err(Error::ConfigMismatch)` if stream exists with different config.
359    fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult>;
360
361    /// Append one message to an existing stream.
362    ///
363    /// Generates and returns the offset assigned to the appended message.
364    /// Offsets must remain monotonically increasing within a stream.
365    ///
366    /// Returns `Err(Error::StreamClosed)` if stream is closed.
367    /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
368    fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>;
369
370    /// Append a batch of messages as one atomic operation.
371    ///
372    /// All messages are validated and committed as a single atomic operation.
373    /// Either all messages are appended successfully, or none are.
374    /// Returns the next offset (the offset that will be assigned to the
375    /// next message appended after this batch).
376    ///
377    /// If `seq` is `Some`, validates lexicographic ordering against the
378    /// stream's last seq and updates it on success.
379    ///
380    /// Returns `Err(Error::StreamClosed)` if stream is closed.
381    /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
382    /// Returns `Err(Error::SeqOrderingViolation)` if seq <= last seq.
383    /// Returns `Err(Error::MemoryLimitExceeded)` if batch would exceed limits.
384    fn batch_append(
385        &self,
386        name: &str,
387        messages: Vec<Bytes>,
388        content_type: &str,
389        seq: Option<&str>,
390    ) -> Result<Offset>;
391
392    /// Read from a stream starting at `from_offset`.
393    ///
394    /// `Offset::start()` reads from the beginning of the stream.
395    /// `Offset::now()` positions the caller at the current tail and returns
396    /// an empty catch-up result.
397    ///
398    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
399    /// Returns `Err(Error::InvalidOffset)` if offset is invalid.
400    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
401
402    /// Delete a stream and all of its persisted data.
403    ///
404    /// Returns `Ok(())` on successful deletion.
405    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
406    fn delete(&self, name: &str) -> Result<()>;
407
408    /// Return stream metadata without reading message bodies.
409    ///
410    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
411    fn head(&self, name: &str) -> Result<StreamMetadata>;
412
413    /// Mark a stream closed so future appends are rejected.
414    ///
415    /// Prevents further appends.
416    /// Returns `Ok(())` if already closed (idempotent).
417    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
418    fn close_stream(&self, name: &str) -> Result<()>;
419
420    /// Append with idempotent producer sequencing.
421    ///
422    /// Validates producer epoch/sequence, appends data if accepted, and
423    /// optionally closes the stream — all within a single lock hold.
424    ///
425    /// Returns `ProducerAppendResult::Accepted` for new data (200 OK).
426    /// Returns `ProducerAppendResult::Duplicate` for already-seen seq (204).
427    /// Returns `Err(EpochFenced)` if epoch < current (403).
428    /// Returns `Err(SequenceGap)` if seq > expected (409).
429    /// Returns `Err(InvalidProducerState)` if epoch bump with seq != 0 (400).
430    fn append_with_producer(
431        &self,
432        name: &str,
433        messages: Vec<Bytes>,
434        content_type: &str,
435        producer: &ProducerHeaders,
436        should_close: bool,
437        seq: Option<&str>,
438    ) -> Result<ProducerAppendResult>;
439
440    /// Atomically create a stream, optionally seed it with data, and optionally close it.
441    ///
442    /// Creates the stream, appends `messages` (if non-empty), and closes
443    /// (if `should_close`) — all before the entry becomes visible to other
444    /// operations. If `commit_messages` fails (e.g. memory limit), the
445    /// stream is never created.
446    ///
447    /// For idempotent recreates (`AlreadyExists`), the body and close
448    /// flag are ignored and existing metadata is returned.
449    fn create_stream_with_data(
450        &self,
451        name: &str,
452        config: StreamConfig,
453        messages: Vec<Bytes>,
454        should_close: bool,
455    ) -> Result<CreateWithDataResult>;
456
457    /// Check if a stream exists
458    fn exists(&self, name: &str) -> bool;
459
460    /// Subscribe to notifications for new data on a stream.
461    ///
462    /// Returns a broadcast receiver that fires when data is appended
463    /// or the stream is closed. Returns `None` if the stream does not
464    /// exist or has expired.
465    ///
466    /// The method itself is sync; the handler awaits on the receiver.
467    fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>>;
468
469    /// Proactively remove all expired streams, returning the count deleted.
470    ///
471    /// By default, expired streams are only cleaned up lazily when accessed.
472    /// This method sweeps all streams and deletes any that have expired,
473    /// reclaiming their resources immediately.
474    fn cleanup_expired_streams(&self) -> usize;
475}