Skip to main content

durable_streams_server/storage/
mod.rs

1//! Storage backends and the persistence contract used by the server.
2//!
3//! [`Storage`] is the central abstraction. The built-in implementations are:
4//!
5//! - [`memory::InMemoryStorage`] for ephemeral development and tests
6//! - [`file::FileStorage`] for append-log persistence on the local filesystem
7//! - [`acid::AcidStorage`] for crash-resilient redb-backed persistence
8
9pub mod acid;
10pub mod file;
11pub(crate) mod fork;
12pub mod memory;
13pub(crate) mod shared;
14
15use crate::protocol::error::Result;
16use crate::protocol::offset::Offset;
17use crate::protocol::producer::ProducerHeaders;
18use bytes::Bytes;
19use chrono::{DateTime, Utc};
20use tokio::sync::broadcast;
21
22// Re-export shared items so existing `super::` paths in backends still work.
23pub(crate) use shared::{
24    NOTIFY_CHANNEL_CAPACITY, ProducerCheck, ProducerState, check_producer, cleanup_stale_producers,
25    is_stream_expired, validate_content_type, validate_seq,
26};
27
28/// Immutable stream configuration captured at create time.
29///
30/// This is the durable metadata returned by `HEAD` and used for idempotent
31/// create checks. The server treats it as part of the stream identity: recreating
32/// an existing stream with a different configuration is a conflict.
33///
34/// Custom `PartialEq`: when `ttl_seconds` is `Some`, `expires_at` is
35/// derived from `Utc::now()` and will drift between requests.  The
36/// comparison therefore ignores `expires_at` in that case.  When
37/// `ttl_seconds` is `None` and `expires_at` was set directly (via
38/// `Expires-At` header), the parsed timestamp is stable so we compare it.
39#[derive(Debug, Clone, Eq, serde::Serialize, serde::Deserialize)]
40pub struct StreamConfig {
41    /// Content-Type header value (normalized, lowercase)
42    pub content_type: String,
43    /// Time-to-live in seconds (optional)
44    pub ttl_seconds: Option<u64>,
45    /// Absolute expiration time (optional)
46    pub expires_at: Option<DateTime<Utc>>,
47    /// Whether the stream was created closed
48    pub created_closed: bool,
49}
50
51impl PartialEq for StreamConfig {
52    fn eq(&self, other: &Self) -> bool {
53        self.content_type == other.content_type
54            && self.ttl_seconds == other.ttl_seconds
55            && self.created_closed == other.created_closed
56            && if self.ttl_seconds.is_some() {
57                // TTL-derived expires_at drifts with Utc::now(); skip comparison
58                true
59            } else {
60                self.expires_at == other.expires_at
61            }
62    }
63}
64
65impl StreamConfig {
66    /// Create a config with a normalized content type and default flags.
67    #[must_use]
68    pub fn new(content_type: String) -> Self {
69        Self {
70            content_type,
71            ttl_seconds: None,
72            expires_at: None,
73            created_closed: false,
74        }
75    }
76
77    /// Set a relative time-to-live in seconds.
78    #[must_use]
79    pub fn with_ttl(mut self, ttl_seconds: u64) -> Self {
80        self.ttl_seconds = Some(ttl_seconds);
81        self
82    }
83
84    /// Set an absolute expiration timestamp.
85    #[must_use]
86    pub fn with_expires_at(mut self, expires_at: DateTime<Utc>) -> Self {
87        self.expires_at = Some(expires_at);
88        self
89    }
90
91    /// Record that the stream should be considered created in the closed state.
92    #[must_use]
93    pub fn with_created_closed(mut self, created_closed: bool) -> Self {
94        self.created_closed = created_closed;
95        self
96    }
97}
98
99/// Fork lineage metadata for a stream created via `create_fork`.
100#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
101pub struct ForkInfo {
102    /// Name of the source stream this fork was created from.
103    pub source_name: String,
104    /// Offset at which this fork diverged from the source (serialized as string).
105    #[serde(
106        serialize_with = "crate::protocol::offset::serialize_offset",
107        deserialize_with = "crate::protocol::offset::deserialize_offset"
108    )]
109    pub fork_offset: Offset,
110}
111
112/// Lifecycle state of a stream (active or soft-deleted tombstone).
113#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
114pub enum StreamState {
115    /// Normal operational state.
116    #[default]
117    Active,
118    /// Soft-deleted: still held in memory for fork ref-count bookkeeping
119    /// but invisible to regular operations.
120    Tombstone,
121}
122
123/// Stored message plus bookkeeping metadata used by storage backends.
124#[derive(Debug, Clone)]
125pub struct Message {
126    /// Message offset (unique identifier within stream)
127    pub offset: Offset,
128    /// Message data
129    pub data: Bytes,
130    /// Byte length (for memory tracking)
131    pub byte_len: u64,
132}
133
134impl Message {
135    /// Create a stored message and derive its tracked byte length.
136    #[must_use]
137    pub fn new(offset: Offset, data: Bytes) -> Self {
138        let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
139        Self {
140            offset,
141            data,
142            byte_len,
143        }
144    }
145}
146
147/// Snapshot returned by [`Storage::read`].
148///
149/// Handlers map this directly into catch-up, long-poll, and SSE responses.
150#[derive(Debug)]
151pub struct ReadResult {
152    /// Messages read
153    pub messages: Vec<Bytes>,
154    /// Next offset to read from (for resumption)
155    pub next_offset: Offset,
156    /// Whether we're at the end of the stream
157    pub at_tail: bool,
158    /// Whether the stream is closed
159    pub closed: bool,
160}
161
162/// Stream-level metadata returned by [`Storage::head`].
163#[derive(Debug, Clone)]
164pub struct StreamMetadata {
165    /// Stream configuration
166    pub config: StreamConfig,
167    /// Next offset that will be assigned
168    pub next_offset: Offset,
169    /// Whether the stream is closed
170    pub closed: bool,
171    /// Total bytes stored in this stream
172    pub total_bytes: u64,
173    /// Number of messages in the stream
174    pub message_count: u64,
175    /// Creation timestamp
176    pub created_at: DateTime<Utc>,
177    /// Last modification timestamp (append, close, producer write)
178    pub updated_at: Option<DateTime<Utc>>,
179}
180
181/// Outcome of [`Storage::create_stream`].
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub enum CreateStreamResult {
184    /// A new stream was created.
185    Created,
186    /// Stream already existed with matching config (idempotent create).
187    AlreadyExists,
188}
189
190/// Immutable snapshot of a fork create request after source-derived fields
191/// have been resolved.
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub(crate) struct ForkCreateSpec {
194    pub source_name: String,
195    pub fork_offset: Offset,
196    pub config: StreamConfig,
197}
198
199/// Outcome of [`Storage::create_stream_with_data`].
200///
201/// Bundles creation status with a metadata snapshot taken under the
202/// same lock hold so the handler never needs a separate `head()` call.
203#[derive(Debug)]
204pub struct CreateWithDataResult {
205    /// Whether the stream was newly created or already existed.
206    pub status: CreateStreamResult,
207    /// Next offset (for `Stream-Next-Offset` response header).
208    pub next_offset: Offset,
209    /// Whether the stream is closed (for `Stream-Closed` response header).
210    pub closed: bool,
211}
212
213/// Outcome of [`Storage::append_with_producer`].
214///
215/// Includes a snapshot of stream state taken atomically with the operation
216/// so handlers never need a separate `head()` call.
217#[derive(Debug, Clone, PartialEq, Eq)]
218pub enum ProducerAppendResult {
219    /// New data accepted (200 OK)
220    Accepted {
221        epoch: u64,
222        seq: u64,
223        next_offset: Offset,
224        closed: bool,
225    },
226    /// Duplicate detected, data already persisted (204 No Content)
227    Duplicate {
228        epoch: u64,
229        seq: u64,
230        next_offset: Offset,
231        closed: bool,
232    },
233}
234
235/// Persistence contract for Durable Streams server state.
236///
237/// Methods are intentionally synchronous. The server keeps async boundaries in
238/// the HTTP and notification layers so storage implementations can focus on
239/// atomicity, ordering, and recovery.
240///
241/// Implementations are expected to preserve these invariants:
242///
243/// - per-stream offsets are monotonic
244/// - create, append, close, and delete are atomic at the stream level
245/// - duplicate producer appends are idempotent
246/// - reads observe a coherent snapshot
247/// - expired streams behave as if they no longer exist
248///
249/// Implementations must also be thread-safe (`Send + Sync`), because the axum
250/// server shares them across request handlers.
251///
252/// Error conditions are documented inline rather than in separate sections
253/// to avoid repetitive documentation on internal trait methods.
254#[allow(clippy::missing_errors_doc)]
255pub trait Storage: Send + Sync {
256    /// Create a stream entry with immutable configuration.
257    ///
258    /// Returns whether the stream was newly created or already existed with
259    /// matching configuration.
260    ///
261    /// Returns `Err(Error::ConfigMismatch)` if stream exists with different config.
262    fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult>;
263
264    /// Append one message to an existing stream.
265    ///
266    /// Generates and returns the offset assigned to the appended message.
267    /// Offsets must remain monotonically increasing within a stream.
268    ///
269    /// Returns `Err(Error::StreamClosed)` if stream is closed.
270    /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
271    fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>;
272
273    /// Append a batch of messages as one atomic operation.
274    ///
275    /// All messages are validated and committed as a single atomic operation.
276    /// Either all messages are appended successfully, or none are.
277    /// Returns the next offset (the offset that will be assigned to the
278    /// next message appended after this batch).
279    ///
280    /// If `seq` is `Some`, validates lexicographic ordering against the
281    /// stream's last seq and updates it on success.
282    ///
283    /// Returns `Err(Error::StreamClosed)` if stream is closed.
284    /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
285    /// Returns `Err(Error::SeqOrderingViolation)` if seq <= last seq.
286    /// Returns `Err(Error::MemoryLimitExceeded)` if batch would exceed limits.
287    fn batch_append(
288        &self,
289        name: &str,
290        messages: Vec<Bytes>,
291        content_type: &str,
292        seq: Option<&str>,
293    ) -> Result<Offset>;
294
295    /// Read from a stream starting at `from_offset`.
296    ///
297    /// `Offset::start()` reads from the beginning of the stream.
298    /// `Offset::now()` positions the caller at the current tail and returns
299    /// an empty catch-up result.
300    ///
301    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
302    /// Returns `Err(Error::InvalidOffset)` if offset is invalid.
303    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
304
305    /// Delete a stream and all of its persisted data.
306    ///
307    /// Returns `Ok(())` on successful deletion.
308    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
309    fn delete(&self, name: &str) -> Result<()>;
310
311    /// Return stream metadata without reading message bodies.
312    ///
313    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
314    fn head(&self, name: &str) -> Result<StreamMetadata>;
315
316    /// Mark a stream closed so future appends are rejected.
317    ///
318    /// Prevents further appends.
319    /// Returns `Ok(())` if already closed (idempotent).
320    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
321    fn close_stream(&self, name: &str) -> Result<()>;
322
323    /// Append with idempotent producer sequencing.
324    ///
325    /// Validates producer epoch/sequence, appends data if accepted, and
326    /// optionally closes the stream — all within a single lock hold.
327    ///
328    /// Returns `ProducerAppendResult::Accepted` for new data (200 OK).
329    /// Returns `ProducerAppendResult::Duplicate` for already-seen seq (204).
330    /// Returns `Err(EpochFenced)` if epoch < current (403).
331    /// Returns `Err(SequenceGap)` if seq > expected (409).
332    /// Returns `Err(InvalidProducerState)` if epoch bump with seq != 0 (400).
333    fn append_with_producer(
334        &self,
335        name: &str,
336        messages: Vec<Bytes>,
337        content_type: &str,
338        producer: &ProducerHeaders,
339        should_close: bool,
340        seq: Option<&str>,
341    ) -> Result<ProducerAppendResult>;
342
343    /// Atomically create a stream, optionally seed it with data, and optionally close it.
344    ///
345    /// Creates the stream, appends `messages` (if non-empty), and closes
346    /// (if `should_close`) — all before the entry becomes visible to other
347    /// operations. If `commit_messages` fails (e.g. memory limit), the
348    /// stream is never created.
349    ///
350    /// For idempotent recreates (`AlreadyExists`), the body and close
351    /// flag are ignored and existing metadata is returned.
352    fn create_stream_with_data(
353        &self,
354        name: &str,
355        config: StreamConfig,
356        messages: Vec<Bytes>,
357        should_close: bool,
358    ) -> Result<CreateWithDataResult>;
359
360    /// Check if a stream exists
361    fn exists(&self, name: &str) -> bool;
362
363    /// Subscribe to notifications for new data on a stream.
364    ///
365    /// Returns a broadcast receiver that fires when data is appended
366    /// or the stream is closed. Returns `None` if the stream does not
367    /// exist or has expired.
368    ///
369    /// The method itself is sync; the handler awaits on the receiver.
370    fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>>;
371
372    /// Proactively remove all expired streams, returning the count deleted.
373    ///
374    /// By default, expired streams are only cleaned up lazily when accessed.
375    /// This method sweeps all streams and deletes any that have expired,
376    /// reclaiming their resources immediately.
377    fn cleanup_expired_streams(&self) -> usize;
378
379    /// List all non-expired streams with their metadata.
380    ///
381    /// Returns `(name, metadata)` pairs sorted by stream name.
382    /// Expired streams are excluded from the listing.
383    ///
384    /// Returns `Err(Error::Storage)` if the underlying backend cannot be read.
385    fn list_streams(&self) -> Result<Vec<(String, StreamMetadata)>>;
386
387    /// Create a fork of an existing stream.
388    ///
389    /// The fork inherits messages from the source up to `fork_offset` (or the
390    /// source tail if `None`). Subsequent appends go into the fork's own
391    /// storage. The source's `ref_count` is incremented so it cannot be
392    /// garbage-collected while forks exist.
393    ///
394    /// Returns `Err(StreamGone)` if the source is tombstoned.
395    /// Returns `Err(ForkOffsetBeyondTail)` if `fork_offset` exceeds the source tail.
396    fn create_fork(
397        &self,
398        name: &str,
399        source_name: &str,
400        fork_offset: Option<&Offset>,
401        config: StreamConfig,
402    ) -> Result<CreateStreamResult>;
403}