Skip to main content

durable_streams_server/storage/
mod.rs

1pub mod acid;
2pub mod file;
3pub mod memory;
4
5use crate::protocol::error::{Error, Result};
6use crate::protocol::offset::Offset;
7use crate::protocol::producer::ProducerHeaders;
8use bytes::Bytes;
9use chrono::{DateTime, Utc};
10use std::collections::HashMap;
11use tokio::sync::broadcast;
12
13/// Stream configuration
14///
15/// Immutable configuration set at stream creation time.
16///
17/// Custom `PartialEq`: when `ttl_seconds` is `Some`, `expires_at` is
18/// derived from `Utc::now()` and will drift between requests.  The
19/// comparison therefore ignores `expires_at` in that case.  When
20/// `ttl_seconds` is `None` and `expires_at` was set directly (via
21/// `Expires-At` header), the parsed timestamp is stable so we compare it.
22#[derive(Debug, Clone, Eq, serde::Serialize, serde::Deserialize)]
23pub struct StreamConfig {
24    /// Content-Type header value (normalized, lowercase)
25    pub content_type: String,
26    /// Time-to-live in seconds (optional)
27    pub ttl_seconds: Option<u64>,
28    /// Absolute expiration time (optional)
29    pub expires_at: Option<DateTime<Utc>>,
30    /// Whether the stream was created closed
31    pub created_closed: bool,
32}
33
34impl PartialEq for StreamConfig {
35    fn eq(&self, other: &Self) -> bool {
36        self.content_type == other.content_type
37            && self.ttl_seconds == other.ttl_seconds
38            && self.created_closed == other.created_closed
39            && if self.ttl_seconds.is_some() {
40                // TTL-derived expires_at drifts with Utc::now(); skip comparison
41                true
42            } else {
43                self.expires_at == other.expires_at
44            }
45    }
46}
47
48impl StreamConfig {
49    /// Create a new stream config with required fields
50    #[must_use]
51    pub fn new(content_type: String) -> Self {
52        Self {
53            content_type,
54            ttl_seconds: None,
55            expires_at: None,
56            created_closed: false,
57        }
58    }
59
60    /// Set TTL in seconds
61    #[must_use]
62    pub fn with_ttl(mut self, ttl_seconds: u64) -> Self {
63        self.ttl_seconds = Some(ttl_seconds);
64        self
65    }
66
67    /// Set absolute expiration time
68    #[must_use]
69    pub fn with_expires_at(mut self, expires_at: DateTime<Utc>) -> Self {
70        self.expires_at = Some(expires_at);
71        self
72    }
73
74    /// Set created closed flag
75    #[must_use]
76    pub fn with_created_closed(mut self, created_closed: bool) -> Self {
77        self.created_closed = created_closed;
78        self
79    }
80}
81
82/// A message in a stream
83#[derive(Debug, Clone)]
84pub struct Message {
85    /// Message offset (unique identifier within stream)
86    pub offset: Offset,
87    /// Message data
88    pub data: Bytes,
89    /// Byte length (for memory tracking)
90    pub byte_len: u64,
91}
92
93impl Message {
94    /// Create a new message
95    #[must_use]
96    pub fn new(offset: Offset, data: Bytes) -> Self {
97        let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
98        Self {
99            offset,
100            data,
101            byte_len,
102        }
103    }
104}
105
106/// Read result from storage
107#[derive(Debug)]
108pub struct ReadResult {
109    /// Messages read
110    pub messages: Vec<Bytes>,
111    /// Next offset to read from (for resumption)
112    pub next_offset: Offset,
113    /// Whether we're at the end of the stream
114    pub at_tail: bool,
115    /// Whether the stream is closed
116    pub closed: bool,
117}
118
119/// Stream metadata
120#[derive(Debug, Clone)]
121pub struct StreamMetadata {
122    /// Stream configuration
123    pub config: StreamConfig,
124    /// Next offset that will be assigned
125    pub next_offset: Offset,
126    /// Whether the stream is closed
127    pub closed: bool,
128    /// Total bytes stored in this stream
129    pub total_bytes: u64,
130    /// Number of messages in the stream
131    pub message_count: u64,
132    /// Creation timestamp
133    pub created_at: DateTime<Utc>,
134}
135
136/// Result of create-stream operation.
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub enum CreateStreamResult {
139    /// A new stream was created.
140    Created,
141    /// Stream already existed with matching config (idempotent create).
142    AlreadyExists,
143}
144
145/// Result of atomic create-with-data operation.
146///
147/// Bundles creation status with a metadata snapshot taken under the
148/// same lock hold so the handler never needs a separate `head()` call.
149#[derive(Debug)]
150pub struct CreateWithDataResult {
151    /// Whether the stream was newly created or already existed.
152    pub status: CreateStreamResult,
153    /// Next offset (for `Stream-Next-Offset` response header).
154    pub next_offset: Offset,
155    /// Whether the stream is closed (for `Stream-Closed` response header).
156    pub closed: bool,
157}
158
159/// Result of an append with producer sequencing.
160///
161/// Includes a snapshot of stream state taken atomically with the operation
162/// so handlers never need a separate `head()` call.
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub enum ProducerAppendResult {
165    /// New data accepted (200 OK)
166    Accepted {
167        epoch: u64,
168        seq: u64,
169        next_offset: Offset,
170        closed: bool,
171    },
172    /// Duplicate detected, data already persisted (204 No Content)
173    Duplicate {
174        epoch: u64,
175        seq: u64,
176        next_offset: Offset,
177        closed: bool,
178    },
179}
180
181// ── Shared constants and validation helpers ─────────────────────────
182//
183// Extracted from InMemoryStorage/FileStorage to avoid duplication.
184// Both implementations delegate to these for content-type, seq,
185// producer, and expiry validation.
186
187/// Duration after which stale producer state is cleaned up (7 days).
188pub(crate) const PRODUCER_STATE_TTL_SECS: i64 = 7 * 24 * 60 * 60;
189
190/// Broadcast channel capacity for long-poll/SSE notifications.
191/// Small because notifications are hints (no payload), not data delivery.
192pub(crate) const NOTIFY_CHANNEL_CAPACITY: usize = 16;
193
194/// Per-producer state tracked within a stream.
195///
196/// Shared between storage implementations. Includes serde derives
197/// for the file-backed storage which persists this to disk.
198#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
199pub(crate) struct ProducerState {
200    pub epoch: u64,
201    pub last_seq: u64,
202    pub updated_at: DateTime<Utc>,
203}
204
205/// Outcome of producer validation before any mutation.
206pub(crate) enum ProducerCheck {
207    /// Request is valid; proceed with append.
208    Accept,
209    /// Request is a duplicate; return idempotent success.
210    Duplicate { epoch: u64, seq: u64 },
211}
212
213/// Check if a stream has expired based on its configuration.
214pub(crate) fn is_stream_expired(config: &StreamConfig) -> bool {
215    config
216        .expires_at
217        .is_some_and(|expires_at| Utc::now() >= expires_at)
218}
219
220/// Validate content-type matches the stream's configured type (case-insensitive).
221pub(crate) fn validate_content_type(stream_ct: &str, request_ct: &str) -> Result<()> {
222    if !request_ct.eq_ignore_ascii_case(stream_ct) {
223        return Err(Error::ContentTypeMismatch {
224            expected: stream_ct.to_string(),
225            actual: request_ct.to_string(),
226        });
227    }
228    Ok(())
229}
230
231/// Validate Stream-Seq ordering and return the pending value to commit.
232///
233/// Returns `Err(SeqOrderingViolation)` if the new seq is not strictly
234/// greater than the last seq (lexicographic comparison).
235pub(crate) fn validate_seq(
236    last_seq: Option<&str>,
237    new_seq: Option<&str>,
238) -> Result<Option<String>> {
239    if let Some(new) = new_seq {
240        if let Some(last) = last_seq
241            && new <= last
242        {
243            return Err(Error::SeqOrderingViolation {
244                last: last.to_string(),
245                received: new.to_string(),
246            });
247        }
248        return Ok(Some(new.to_string()));
249    }
250    Ok(None)
251}
252
253/// Remove producer state entries older than `PRODUCER_STATE_TTL_SECS`.
254pub(crate) fn cleanup_stale_producers(producers: &mut HashMap<String, ProducerState>) {
255    let cutoff = Utc::now()
256        - chrono::TimeDelta::try_seconds(PRODUCER_STATE_TTL_SECS)
257            .expect("7 days fits in TimeDelta");
258    producers.retain(|_, state| state.updated_at > cutoff);
259}
260
261/// Validate producer epoch/sequence against existing state.
262///
263/// Implements the standard validation order:
264///   1. Epoch fencing (403)
265///   2. Duplicate detection (204) — before closed check so retries work
266///   3. Closed check (409) — blocks new sequences on closed streams
267///   4. Gap / epoch-bump validation
268pub(crate) fn check_producer(
269    existing: Option<&ProducerState>,
270    producer: &ProducerHeaders,
271    stream_closed: bool,
272) -> Result<ProducerCheck> {
273    if let Some(state) = existing {
274        if producer.epoch < state.epoch {
275            return Err(Error::EpochFenced {
276                current: state.epoch,
277                received: producer.epoch,
278            });
279        }
280
281        if producer.epoch == state.epoch && producer.seq <= state.last_seq {
282            return Ok(ProducerCheck::Duplicate {
283                epoch: state.epoch,
284                seq: state.last_seq,
285            });
286        }
287
288        // Not a duplicate — if stream is closed, reject
289        if stream_closed {
290            return Err(Error::StreamClosed);
291        }
292
293        if producer.epoch > state.epoch {
294            if producer.seq != 0 {
295                return Err(Error::InvalidProducerState(
296                    "new epoch must start at seq 0".to_string(),
297                ));
298            }
299        } else if producer.seq > state.last_seq + 1 {
300            return Err(Error::SequenceGap {
301                expected: state.last_seq + 1,
302                actual: producer.seq,
303            });
304        }
305    } else {
306        // New producer
307        if stream_closed {
308            return Err(Error::StreamClosed);
309        }
310        if producer.seq != 0 {
311            return Err(Error::SequenceGap {
312                expected: 0,
313                actual: producer.seq,
314            });
315        }
316    }
317    Ok(ProducerCheck::Accept)
318}
319
320/// Storage trait for stream persistence
321///
322/// Methods are intentionally sync (not async) to keep the core logic simple.
323/// Async boundaries are at the handler layer and notification layer.
324///
325/// Implementations must be thread-safe (Send + Sync).
326///
327/// Error conditions are documented inline rather than in separate sections
328/// to avoid repetitive documentation on internal trait methods.
329#[allow(clippy::missing_errors_doc)]
330pub trait Storage: Send + Sync {
331    /// Create a new stream
332    ///
333    /// Returns whether the stream was newly created or already existed.
334    ///
335    /// Returns `Err(Error::ConfigMismatch)` if stream exists with different config.
336    fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult>;
337
338    /// Append data to a stream
339    ///
340    /// Generates and returns the offset for this message.
341    /// Offsets must be monotonically increasing.
342    ///
343    /// Returns `Err(Error::StreamClosed)` if stream is closed.
344    /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
345    fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>;
346
347    /// Append multiple messages atomically
348    ///
349    /// All messages are validated and committed as a single atomic operation.
350    /// Either all messages are appended successfully, or none are.
351    /// Returns the next offset (the offset that will be assigned to the
352    /// next message appended after this batch).
353    ///
354    /// If `seq` is `Some`, validates lexicographic ordering against the
355    /// stream's last seq and updates it on success.
356    ///
357    /// Returns `Err(Error::StreamClosed)` if stream is closed.
358    /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
359    /// Returns `Err(Error::SeqOrderingViolation)` if seq <= last seq.
360    /// Returns `Err(Error::MemoryLimitExceeded)` if batch would exceed limits.
361    fn batch_append(
362        &self,
363        name: &str,
364        messages: Vec<Bytes>,
365        content_type: &str,
366        seq: Option<&str>,
367    ) -> Result<Offset>;
368
369    /// Read messages from a stream starting at offset
370    ///
371    /// If `from_offset` is `Offset::start()`, reads from beginning.
372    /// If `from_offset` is `Offset::now()`, returns empty result at tail.
373    ///
374    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
375    /// Returns `Err(Error::InvalidOffset)` if offset is invalid.
376    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
377
378    /// Delete a stream
379    ///
380    /// Returns `Ok(())` on successful deletion.
381    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
382    fn delete(&self, name: &str) -> Result<()>;
383
384    /// Get stream metadata
385    ///
386    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
387    fn head(&self, name: &str) -> Result<StreamMetadata>;
388
389    /// Close a stream
390    ///
391    /// Prevents further appends.
392    /// Returns `Ok(())` if already closed (idempotent).
393    /// Returns `Err(Error::NotFound)` if stream doesn't exist.
394    fn close_stream(&self, name: &str) -> Result<()>;
395
396    /// Append messages with producer sequencing (atomic validation + append)
397    ///
398    /// Validates producer epoch/sequence, appends data if accepted, and
399    /// optionally closes the stream — all within a single lock hold.
400    ///
401    /// Returns `ProducerAppendResult::Accepted` for new data (200 OK).
402    /// Returns `ProducerAppendResult::Duplicate` for already-seen seq (204).
403    /// Returns `Err(EpochFenced)` if epoch < current (403).
404    /// Returns `Err(SequenceGap)` if seq > expected (409).
405    /// Returns `Err(InvalidProducerState)` if epoch bump with seq != 0 (400).
406    fn append_with_producer(
407        &self,
408        name: &str,
409        messages: Vec<Bytes>,
410        content_type: &str,
411        producer: &ProducerHeaders,
412        should_close: bool,
413        seq: Option<&str>,
414    ) -> Result<ProducerAppendResult>;
415
416    /// Atomically create a stream with optional initial data and close.
417    ///
418    /// Creates the stream, appends `messages` (if non-empty), and closes
419    /// (if `should_close`) — all before the entry becomes visible to other
420    /// operations. If `commit_messages` fails (e.g. memory limit), the
421    /// stream is never created.
422    ///
423    /// For idempotent recreates (`AlreadyExists`), the body and close
424    /// flag are ignored and existing metadata is returned.
425    fn create_stream_with_data(
426        &self,
427        name: &str,
428        config: StreamConfig,
429        messages: Vec<Bytes>,
430        should_close: bool,
431    ) -> Result<CreateWithDataResult>;
432
433    /// Check if a stream exists
434    fn exists(&self, name: &str) -> bool;
435
436    /// Subscribe to notifications for new data on a stream.
437    ///
438    /// Returns a broadcast receiver that fires when data is appended
439    /// or the stream is closed. Returns `None` if the stream does not
440    /// exist or has expired.
441    ///
442    /// The method itself is sync; the handler awaits on the receiver.
443    fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>>;
444}