durable_streams_server/storage/mod.rs
1//! Storage backends and the persistence contract used by the server.
2//!
3//! [`Storage`] is the central abstraction. The built-in implementations are:
4//!
5//! - [`memory::InMemoryStorage`] for ephemeral development and tests
6//! - [`file::FileStorage`] for append-log persistence on the local filesystem
7//! - [`acid::AcidStorage`] for crash-resilient redb-backed persistence
8
9pub mod acid;
10pub mod file;
11pub(crate) mod fork;
12pub mod memory;
13pub(crate) mod shared;
14
15use crate::protocol::error::Result;
16use crate::protocol::offset::Offset;
17use crate::protocol::producer::ProducerHeaders;
18use bytes::Bytes;
19use chrono::{DateTime, Utc};
20use tokio::sync::broadcast;
21
22// Re-export shared items so existing `super::` paths in backends still work.
23pub(crate) use shared::{
24 NOTIFY_CHANNEL_CAPACITY, ProducerCheck, ProducerState, check_producer, cleanup_stale_producers,
25 is_stream_expired, validate_content_type, validate_seq,
26};
27
28/// Immutable stream configuration captured at create time.
29///
30/// This is the durable metadata returned by `HEAD` and used for idempotent
31/// create checks. The server treats it as part of the stream identity: recreating
32/// an existing stream with a different configuration is a conflict.
33///
34/// Custom `PartialEq`: when `ttl_seconds` is `Some`, `expires_at` is
35/// derived from `Utc::now()` and will drift between requests. The
36/// comparison therefore ignores `expires_at` in that case. When
37/// `ttl_seconds` is `None` and `expires_at` was set directly (via
38/// `Expires-At` header), the parsed timestamp is stable so we compare it.
39#[derive(Debug, Clone, Eq, serde::Serialize, serde::Deserialize)]
40pub struct StreamConfig {
41 /// Content-Type header value (normalized, lowercase)
42 pub content_type: String,
43 /// Time-to-live in seconds (optional)
44 pub ttl_seconds: Option<u64>,
45 /// Absolute expiration time (optional)
46 pub expires_at: Option<DateTime<Utc>>,
47 /// Whether the stream was created closed
48 pub created_closed: bool,
49}
50
51impl PartialEq for StreamConfig {
52 fn eq(&self, other: &Self) -> bool {
53 self.content_type == other.content_type
54 && self.ttl_seconds == other.ttl_seconds
55 && self.created_closed == other.created_closed
56 && if self.ttl_seconds.is_some() {
57 // TTL-derived expires_at drifts with Utc::now(); skip comparison
58 true
59 } else {
60 self.expires_at == other.expires_at
61 }
62 }
63}
64
65impl StreamConfig {
66 /// Create a config with a normalized content type and default flags.
67 #[must_use]
68 pub fn new(content_type: String) -> Self {
69 Self {
70 content_type,
71 ttl_seconds: None,
72 expires_at: None,
73 created_closed: false,
74 }
75 }
76
77 /// Set a relative time-to-live in seconds.
78 #[must_use]
79 pub fn with_ttl(mut self, ttl_seconds: u64) -> Self {
80 self.ttl_seconds = Some(ttl_seconds);
81 self
82 }
83
84 /// Set an absolute expiration timestamp.
85 #[must_use]
86 pub fn with_expires_at(mut self, expires_at: DateTime<Utc>) -> Self {
87 self.expires_at = Some(expires_at);
88 self
89 }
90
91 /// Record that the stream should be considered created in the closed state.
92 #[must_use]
93 pub fn with_created_closed(mut self, created_closed: bool) -> Self {
94 self.created_closed = created_closed;
95 self
96 }
97}
98
99/// Fork lineage metadata for a stream created via `create_fork`.
100#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
101pub struct ForkInfo {
102 /// Name of the source stream this fork was created from.
103 pub source_name: String,
104 /// Offset at which this fork diverged from the source (serialized as string).
105 #[serde(
106 serialize_with = "crate::protocol::offset::serialize_offset",
107 deserialize_with = "crate::protocol::offset::deserialize_offset"
108 )]
109 pub fork_offset: Offset,
110}
111
112/// Lifecycle state of a stream (active or soft-deleted tombstone).
113#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
114pub enum StreamState {
115 /// Normal operational state.
116 #[default]
117 Active,
118 /// Soft-deleted: still held in memory for fork ref-count bookkeeping
119 /// but invisible to regular operations.
120 Tombstone,
121}
122
123/// Stored message plus bookkeeping metadata used by storage backends.
124#[derive(Debug, Clone)]
125pub struct Message {
126 /// Message offset (unique identifier within stream)
127 pub offset: Offset,
128 /// Message data
129 pub data: Bytes,
130 /// Byte length (for memory tracking)
131 pub byte_len: u64,
132}
133
134impl Message {
135 /// Create a stored message and derive its tracked byte length.
136 #[must_use]
137 pub fn new(offset: Offset, data: Bytes) -> Self {
138 let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
139 Self {
140 offset,
141 data,
142 byte_len,
143 }
144 }
145}
146
147/// Snapshot returned by [`Storage::read`].
148///
149/// Handlers map this directly into catch-up, long-poll, and SSE responses.
150#[derive(Debug)]
151pub struct ReadResult {
152 /// Messages read
153 pub messages: Vec<Bytes>,
154 /// Next offset to read from (for resumption)
155 pub next_offset: Offset,
156 /// Whether we're at the end of the stream
157 pub at_tail: bool,
158 /// Whether the stream is closed
159 pub closed: bool,
160}
161
162/// Stream-level metadata returned by [`Storage::head`].
163#[derive(Debug, Clone)]
164pub struct StreamMetadata {
165 /// Stream configuration
166 pub config: StreamConfig,
167 /// Next offset that will be assigned
168 pub next_offset: Offset,
169 /// Whether the stream is closed
170 pub closed: bool,
171 /// Total bytes stored in this stream
172 pub total_bytes: u64,
173 /// Number of messages in the stream
174 pub message_count: u64,
175 /// Creation timestamp
176 pub created_at: DateTime<Utc>,
177 /// Last modification timestamp (append, close, producer write)
178 pub updated_at: Option<DateTime<Utc>>,
179}
180
181/// Outcome of [`Storage::create_stream`].
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub enum CreateStreamResult {
184 /// A new stream was created.
185 Created,
186 /// Stream already existed with matching config (idempotent create).
187 AlreadyExists,
188}
189
190/// Immutable snapshot of a fork create request after source-derived fields
191/// have been resolved.
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub(crate) struct ForkCreateSpec {
194 pub source_name: String,
195 pub fork_offset: Offset,
196 pub config: StreamConfig,
197}
198
199/// Outcome of [`Storage::create_stream_with_data`].
200///
201/// Bundles creation status with a metadata snapshot taken under the
202/// same lock hold so the handler never needs a separate `head()` call.
203#[derive(Debug)]
204pub struct CreateWithDataResult {
205 /// Whether the stream was newly created or already existed.
206 pub status: CreateStreamResult,
207 /// Next offset (for `Stream-Next-Offset` response header).
208 pub next_offset: Offset,
209 /// Whether the stream is closed (for `Stream-Closed` response header).
210 pub closed: bool,
211}
212
213/// Outcome of [`Storage::append_with_producer`].
214///
215/// Includes a snapshot of stream state taken atomically with the operation
216/// so handlers never need a separate `head()` call.
217#[derive(Debug, Clone, PartialEq, Eq)]
218pub enum ProducerAppendResult {
219 /// New data accepted (200 OK)
220 Accepted {
221 epoch: u64,
222 seq: u64,
223 next_offset: Offset,
224 closed: bool,
225 },
226 /// Duplicate detected, data already persisted (204 No Content)
227 Duplicate {
228 epoch: u64,
229 seq: u64,
230 next_offset: Offset,
231 closed: bool,
232 },
233}
234
235/// Persistence contract for Durable Streams server state.
236///
237/// Methods are intentionally synchronous. The server keeps async boundaries in
238/// the HTTP and notification layers so storage implementations can focus on
239/// atomicity, ordering, and recovery.
240///
241/// Implementations are expected to preserve these invariants:
242///
243/// - per-stream offsets are monotonic
244/// - create, append, close, and delete are atomic at the stream level
245/// - duplicate producer appends are idempotent
246/// - reads observe a coherent snapshot
247/// - expired streams behave as if they no longer exist
248///
249/// Implementations must also be thread-safe (`Send + Sync`), because the axum
250/// server shares them across request handlers.
251///
252/// Error conditions are documented inline rather than in separate sections
253/// to avoid repetitive documentation on internal trait methods.
254#[allow(clippy::missing_errors_doc)]
255pub trait Storage: Send + Sync {
256 /// Create a stream entry with immutable configuration.
257 ///
258 /// Returns whether the stream was newly created or already existed with
259 /// matching configuration.
260 ///
261 /// Returns `Err(Error::ConfigMismatch)` if stream exists with different config.
262 fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult>;
263
264 /// Append one message to an existing stream.
265 ///
266 /// Generates and returns the offset assigned to the appended message.
267 /// Offsets must remain monotonically increasing within a stream.
268 ///
269 /// Returns `Err(Error::StreamClosed)` if stream is closed.
270 /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
271 fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>;
272
273 /// Append a batch of messages as one atomic operation.
274 ///
275 /// All messages are validated and committed as a single atomic operation.
276 /// Either all messages are appended successfully, or none are.
277 /// Returns the next offset (the offset that will be assigned to the
278 /// next message appended after this batch).
279 ///
280 /// If `seq` is `Some`, validates lexicographic ordering against the
281 /// stream's last seq and updates it on success.
282 ///
283 /// Returns `Err(Error::StreamClosed)` if stream is closed.
284 /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
285 /// Returns `Err(Error::SeqOrderingViolation)` if seq <= last seq.
286 /// Returns `Err(Error::MemoryLimitExceeded)` if batch would exceed limits.
287 fn batch_append(
288 &self,
289 name: &str,
290 messages: Vec<Bytes>,
291 content_type: &str,
292 seq: Option<&str>,
293 ) -> Result<Offset>;
294
295 /// Read from a stream starting at `from_offset`.
296 ///
297 /// `Offset::start()` reads from the beginning of the stream.
298 /// `Offset::now()` positions the caller at the current tail and returns
299 /// an empty catch-up result.
300 ///
301 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
302 /// Returns `Err(Error::InvalidOffset)` if offset is invalid.
303 fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
304
305 /// Delete a stream and all of its persisted data.
306 ///
307 /// Returns `Ok(())` on successful deletion.
308 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
309 fn delete(&self, name: &str) -> Result<()>;
310
311 /// Return stream metadata without reading message bodies.
312 ///
313 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
314 fn head(&self, name: &str) -> Result<StreamMetadata>;
315
316 /// Mark a stream closed so future appends are rejected.
317 ///
318 /// Prevents further appends.
319 /// Returns `Ok(())` if already closed (idempotent).
320 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
321 fn close_stream(&self, name: &str) -> Result<()>;
322
323 /// Append with idempotent producer sequencing.
324 ///
325 /// Validates producer epoch/sequence, appends data if accepted, and
326 /// optionally closes the stream — all within a single lock hold.
327 ///
328 /// Returns `ProducerAppendResult::Accepted` for new data (200 OK).
329 /// Returns `ProducerAppendResult::Duplicate` for already-seen seq (204).
330 /// Returns `Err(EpochFenced)` if epoch < current (403).
331 /// Returns `Err(SequenceGap)` if seq > expected (409).
332 /// Returns `Err(InvalidProducerState)` if epoch bump with seq != 0 (400).
333 fn append_with_producer(
334 &self,
335 name: &str,
336 messages: Vec<Bytes>,
337 content_type: &str,
338 producer: &ProducerHeaders,
339 should_close: bool,
340 seq: Option<&str>,
341 ) -> Result<ProducerAppendResult>;
342
343 /// Atomically create a stream, optionally seed it with data, and optionally close it.
344 ///
345 /// Creates the stream, appends `messages` (if non-empty), and closes
346 /// (if `should_close`) — all before the entry becomes visible to other
347 /// operations. If `commit_messages` fails (e.g. memory limit), the
348 /// stream is never created.
349 ///
350 /// For idempotent recreates (`AlreadyExists`), the body and close
351 /// flag are ignored and existing metadata is returned.
352 fn create_stream_with_data(
353 &self,
354 name: &str,
355 config: StreamConfig,
356 messages: Vec<Bytes>,
357 should_close: bool,
358 ) -> Result<CreateWithDataResult>;
359
360 /// Check if a stream exists
361 fn exists(&self, name: &str) -> bool;
362
363 /// Subscribe to notifications for new data on a stream.
364 ///
365 /// Returns a broadcast receiver that fires when data is appended
366 /// or the stream is closed. Returns `None` if the stream does not
367 /// exist or has expired.
368 ///
369 /// The method itself is sync; the handler awaits on the receiver.
370 fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>>;
371
372 /// Proactively remove all expired streams, returning the count deleted.
373 ///
374 /// By default, expired streams are only cleaned up lazily when accessed.
375 /// This method sweeps all streams and deletes any that have expired,
376 /// reclaiming their resources immediately.
377 fn cleanup_expired_streams(&self) -> usize;
378
379 /// List all non-expired streams with their metadata.
380 ///
381 /// Returns `(name, metadata)` pairs sorted by stream name.
382 /// Expired streams are excluded from the listing.
383 ///
384 /// Returns `Err(Error::Storage)` if the underlying backend cannot be read.
385 fn list_streams(&self) -> Result<Vec<(String, StreamMetadata)>>;
386
387 /// Create a fork of an existing stream.
388 ///
389 /// The fork inherits messages from the source up to `fork_offset` (or the
390 /// source tail if `None`). Subsequent appends go into the fork's own
391 /// storage. The source's `ref_count` is incremented so it cannot be
392 /// garbage-collected while forks exist.
393 ///
394 /// Returns `Err(StreamGone)` if the source is tombstoned.
395 /// Returns `Err(ForkOffsetBeyondTail)` if `fork_offset` exceeds the source tail.
396 fn create_fork(
397 &self,
398 name: &str,
399 source_name: &str,
400 fork_offset: Option<&Offset>,
401 config: StreamConfig,
402 ) -> Result<CreateStreamResult>;
403}