durable_streams_server/storage/mod.rs
1pub mod acid;
2pub mod file;
3pub mod memory;
4
5use crate::protocol::error::{Error, Result};
6use crate::protocol::offset::Offset;
7use crate::protocol::producer::ProducerHeaders;
8use bytes::Bytes;
9use chrono::{DateTime, Utc};
10use std::collections::HashMap;
11use tokio::sync::broadcast;
12
13/// Stream configuration
14///
15/// Immutable configuration set at stream creation time.
16///
17/// Custom `PartialEq`: when `ttl_seconds` is `Some`, `expires_at` is
18/// derived from `Utc::now()` and will drift between requests. The
19/// comparison therefore ignores `expires_at` in that case. When
20/// `ttl_seconds` is `None` and `expires_at` was set directly (via
21/// `Expires-At` header), the parsed timestamp is stable so we compare it.
22#[derive(Debug, Clone, Eq, serde::Serialize, serde::Deserialize)]
23pub struct StreamConfig {
24 /// Content-Type header value (normalized, lowercase)
25 pub content_type: String,
26 /// Time-to-live in seconds (optional)
27 pub ttl_seconds: Option<u64>,
28 /// Absolute expiration time (optional)
29 pub expires_at: Option<DateTime<Utc>>,
30 /// Whether the stream was created closed
31 pub created_closed: bool,
32}
33
34impl PartialEq for StreamConfig {
35 fn eq(&self, other: &Self) -> bool {
36 self.content_type == other.content_type
37 && self.ttl_seconds == other.ttl_seconds
38 && self.created_closed == other.created_closed
39 && if self.ttl_seconds.is_some() {
40 // TTL-derived expires_at drifts with Utc::now(); skip comparison
41 true
42 } else {
43 self.expires_at == other.expires_at
44 }
45 }
46}
47
48impl StreamConfig {
49 /// Create a new stream config with required fields
50 #[must_use]
51 pub fn new(content_type: String) -> Self {
52 Self {
53 content_type,
54 ttl_seconds: None,
55 expires_at: None,
56 created_closed: false,
57 }
58 }
59
60 /// Set TTL in seconds
61 #[must_use]
62 pub fn with_ttl(mut self, ttl_seconds: u64) -> Self {
63 self.ttl_seconds = Some(ttl_seconds);
64 self
65 }
66
67 /// Set absolute expiration time
68 #[must_use]
69 pub fn with_expires_at(mut self, expires_at: DateTime<Utc>) -> Self {
70 self.expires_at = Some(expires_at);
71 self
72 }
73
74 /// Set created closed flag
75 #[must_use]
76 pub fn with_created_closed(mut self, created_closed: bool) -> Self {
77 self.created_closed = created_closed;
78 self
79 }
80}
81
82/// A message in a stream
83#[derive(Debug, Clone)]
84pub struct Message {
85 /// Message offset (unique identifier within stream)
86 pub offset: Offset,
87 /// Message data
88 pub data: Bytes,
89 /// Byte length (for memory tracking)
90 pub byte_len: u64,
91}
92
93impl Message {
94 /// Create a new message
95 #[must_use]
96 pub fn new(offset: Offset, data: Bytes) -> Self {
97 let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
98 Self {
99 offset,
100 data,
101 byte_len,
102 }
103 }
104}
105
106/// Read result from storage
107#[derive(Debug)]
108pub struct ReadResult {
109 /// Messages read
110 pub messages: Vec<Bytes>,
111 /// Next offset to read from (for resumption)
112 pub next_offset: Offset,
113 /// Whether we're at the end of the stream
114 pub at_tail: bool,
115 /// Whether the stream is closed
116 pub closed: bool,
117}
118
119/// Stream metadata
120#[derive(Debug, Clone)]
121pub struct StreamMetadata {
122 /// Stream configuration
123 pub config: StreamConfig,
124 /// Next offset that will be assigned
125 pub next_offset: Offset,
126 /// Whether the stream is closed
127 pub closed: bool,
128 /// Total bytes stored in this stream
129 pub total_bytes: u64,
130 /// Number of messages in the stream
131 pub message_count: u64,
132 /// Creation timestamp
133 pub created_at: DateTime<Utc>,
134}
135
136/// Result of create-stream operation.
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub enum CreateStreamResult {
139 /// A new stream was created.
140 Created,
141 /// Stream already existed with matching config (idempotent create).
142 AlreadyExists,
143}
144
145/// Result of atomic create-with-data operation.
146///
147/// Bundles creation status with a metadata snapshot taken under the
148/// same lock hold so the handler never needs a separate `head()` call.
149#[derive(Debug)]
150pub struct CreateWithDataResult {
151 /// Whether the stream was newly created or already existed.
152 pub status: CreateStreamResult,
153 /// Next offset (for `Stream-Next-Offset` response header).
154 pub next_offset: Offset,
155 /// Whether the stream is closed (for `Stream-Closed` response header).
156 pub closed: bool,
157}
158
159/// Result of an append with producer sequencing.
160///
161/// Includes a snapshot of stream state taken atomically with the operation
162/// so handlers never need a separate `head()` call.
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub enum ProducerAppendResult {
165 /// New data accepted (200 OK)
166 Accepted {
167 epoch: u64,
168 seq: u64,
169 next_offset: Offset,
170 closed: bool,
171 },
172 /// Duplicate detected, data already persisted (204 No Content)
173 Duplicate {
174 epoch: u64,
175 seq: u64,
176 next_offset: Offset,
177 closed: bool,
178 },
179}
180
181// ── Shared constants and validation helpers ─────────────────────────
182//
183// Extracted from InMemoryStorage/FileStorage to avoid duplication.
184// Both implementations delegate to these for content-type, seq,
185// producer, and expiry validation.
186
187/// Duration after which stale producer state is cleaned up (7 days).
188pub(crate) const PRODUCER_STATE_TTL_SECS: i64 = 7 * 24 * 60 * 60;
189
190/// Broadcast channel capacity for long-poll/SSE notifications.
191/// Small because notifications are hints (no payload), not data delivery.
192pub(crate) const NOTIFY_CHANNEL_CAPACITY: usize = 16;
193
194/// Per-producer state tracked within a stream.
195///
196/// Shared between storage implementations. Includes serde derives
197/// for the file-backed storage which persists this to disk.
198#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
199pub(crate) struct ProducerState {
200 pub epoch: u64,
201 pub last_seq: u64,
202 pub updated_at: DateTime<Utc>,
203}
204
205/// Outcome of producer validation before any mutation.
206pub(crate) enum ProducerCheck {
207 /// Request is valid; proceed with append.
208 Accept,
209 /// Request is a duplicate; return idempotent success.
210 Duplicate { epoch: u64, seq: u64 },
211}
212
213/// Check if a stream has expired based on its configuration.
214pub(crate) fn is_stream_expired(config: &StreamConfig) -> bool {
215 config
216 .expires_at
217 .is_some_and(|expires_at| Utc::now() >= expires_at)
218}
219
220/// Validate content-type matches the stream's configured type (case-insensitive).
221pub(crate) fn validate_content_type(stream_ct: &str, request_ct: &str) -> Result<()> {
222 if !request_ct.eq_ignore_ascii_case(stream_ct) {
223 return Err(Error::ContentTypeMismatch {
224 expected: stream_ct.to_string(),
225 actual: request_ct.to_string(),
226 });
227 }
228 Ok(())
229}
230
231/// Validate Stream-Seq ordering and return the pending value to commit.
232///
233/// Returns `Err(SeqOrderingViolation)` if the new seq is not strictly
234/// greater than the last seq (lexicographic comparison).
235pub(crate) fn validate_seq(
236 last_seq: Option<&str>,
237 new_seq: Option<&str>,
238) -> Result<Option<String>> {
239 if let Some(new) = new_seq {
240 if let Some(last) = last_seq
241 && new <= last
242 {
243 return Err(Error::SeqOrderingViolation {
244 last: last.to_string(),
245 received: new.to_string(),
246 });
247 }
248 return Ok(Some(new.to_string()));
249 }
250 Ok(None)
251}
252
253/// Remove producer state entries older than `PRODUCER_STATE_TTL_SECS`.
254pub(crate) fn cleanup_stale_producers(producers: &mut HashMap<String, ProducerState>) {
255 let cutoff = Utc::now()
256 - chrono::TimeDelta::try_seconds(PRODUCER_STATE_TTL_SECS)
257 .expect("7 days fits in TimeDelta");
258 producers.retain(|_, state| state.updated_at > cutoff);
259}
260
261/// Validate producer epoch/sequence against existing state.
262///
263/// Implements the standard validation order:
264/// 1. Epoch fencing (403)
265/// 2. Duplicate detection (204) — before closed check so retries work
266/// 3. Closed check (409) — blocks new sequences on closed streams
267/// 4. Gap / epoch-bump validation
268pub(crate) fn check_producer(
269 existing: Option<&ProducerState>,
270 producer: &ProducerHeaders,
271 stream_closed: bool,
272) -> Result<ProducerCheck> {
273 if let Some(state) = existing {
274 if producer.epoch < state.epoch {
275 return Err(Error::EpochFenced {
276 current: state.epoch,
277 received: producer.epoch,
278 });
279 }
280
281 if producer.epoch == state.epoch && producer.seq <= state.last_seq {
282 return Ok(ProducerCheck::Duplicate {
283 epoch: state.epoch,
284 seq: state.last_seq,
285 });
286 }
287
288 // Not a duplicate — if stream is closed, reject
289 if stream_closed {
290 return Err(Error::StreamClosed);
291 }
292
293 if producer.epoch > state.epoch {
294 if producer.seq != 0 {
295 return Err(Error::InvalidProducerState(
296 "new epoch must start at seq 0".to_string(),
297 ));
298 }
299 } else if producer.seq > state.last_seq + 1 {
300 return Err(Error::SequenceGap {
301 expected: state.last_seq + 1,
302 actual: producer.seq,
303 });
304 }
305 } else {
306 // New producer
307 if stream_closed {
308 return Err(Error::StreamClosed);
309 }
310 if producer.seq != 0 {
311 return Err(Error::SequenceGap {
312 expected: 0,
313 actual: producer.seq,
314 });
315 }
316 }
317 Ok(ProducerCheck::Accept)
318}
319
320/// Storage trait for stream persistence
321///
322/// Methods are intentionally sync (not async) to keep the core logic simple.
323/// Async boundaries are at the handler layer and notification layer.
324///
325/// Implementations must be thread-safe (Send + Sync).
326///
327/// Error conditions are documented inline rather than in separate sections
328/// to avoid repetitive documentation on internal trait methods.
329#[allow(clippy::missing_errors_doc)]
330pub trait Storage: Send + Sync {
331 /// Create a new stream
332 ///
333 /// Returns whether the stream was newly created or already existed.
334 ///
335 /// Returns `Err(Error::ConfigMismatch)` if stream exists with different config.
336 fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult>;
337
338 /// Append data to a stream
339 ///
340 /// Generates and returns the offset for this message.
341 /// Offsets must be monotonically increasing.
342 ///
343 /// Returns `Err(Error::StreamClosed)` if stream is closed.
344 /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
345 fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>;
346
347 /// Append multiple messages atomically
348 ///
349 /// All messages are validated and committed as a single atomic operation.
350 /// Either all messages are appended successfully, or none are.
351 /// Returns the next offset (the offset that will be assigned to the
352 /// next message appended after this batch).
353 ///
354 /// If `seq` is `Some`, validates lexicographic ordering against the
355 /// stream's last seq and updates it on success.
356 ///
357 /// Returns `Err(Error::StreamClosed)` if stream is closed.
358 /// Returns `Err(Error::ContentTypeMismatch)` if content type doesn't match.
359 /// Returns `Err(Error::SeqOrderingViolation)` if seq <= last seq.
360 /// Returns `Err(Error::MemoryLimitExceeded)` if batch would exceed limits.
361 fn batch_append(
362 &self,
363 name: &str,
364 messages: Vec<Bytes>,
365 content_type: &str,
366 seq: Option<&str>,
367 ) -> Result<Offset>;
368
369 /// Read messages from a stream starting at offset
370 ///
371 /// If `from_offset` is `Offset::start()`, reads from beginning.
372 /// If `from_offset` is `Offset::now()`, returns empty result at tail.
373 ///
374 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
375 /// Returns `Err(Error::InvalidOffset)` if offset is invalid.
376 fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
377
378 /// Delete a stream
379 ///
380 /// Returns `Ok(())` on successful deletion.
381 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
382 fn delete(&self, name: &str) -> Result<()>;
383
384 /// Get stream metadata
385 ///
386 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
387 fn head(&self, name: &str) -> Result<StreamMetadata>;
388
389 /// Close a stream
390 ///
391 /// Prevents further appends.
392 /// Returns `Ok(())` if already closed (idempotent).
393 /// Returns `Err(Error::NotFound)` if stream doesn't exist.
394 fn close_stream(&self, name: &str) -> Result<()>;
395
396 /// Append messages with producer sequencing (atomic validation + append)
397 ///
398 /// Validates producer epoch/sequence, appends data if accepted, and
399 /// optionally closes the stream — all within a single lock hold.
400 ///
401 /// Returns `ProducerAppendResult::Accepted` for new data (200 OK).
402 /// Returns `ProducerAppendResult::Duplicate` for already-seen seq (204).
403 /// Returns `Err(EpochFenced)` if epoch < current (403).
404 /// Returns `Err(SequenceGap)` if seq > expected (409).
405 /// Returns `Err(InvalidProducerState)` if epoch bump with seq != 0 (400).
406 fn append_with_producer(
407 &self,
408 name: &str,
409 messages: Vec<Bytes>,
410 content_type: &str,
411 producer: &ProducerHeaders,
412 should_close: bool,
413 seq: Option<&str>,
414 ) -> Result<ProducerAppendResult>;
415
416 /// Atomically create a stream with optional initial data and close.
417 ///
418 /// Creates the stream, appends `messages` (if non-empty), and closes
419 /// (if `should_close`) — all before the entry becomes visible to other
420 /// operations. If `commit_messages` fails (e.g. memory limit), the
421 /// stream is never created.
422 ///
423 /// For idempotent recreates (`AlreadyExists`), the body and close
424 /// flag are ignored and existing metadata is returned.
425 fn create_stream_with_data(
426 &self,
427 name: &str,
428 config: StreamConfig,
429 messages: Vec<Bytes>,
430 should_close: bool,
431 ) -> Result<CreateWithDataResult>;
432
433 /// Check if a stream exists
434 fn exists(&self, name: &str) -> bool;
435
436 /// Subscribe to notifications for new data on a stream.
437 ///
438 /// Returns a broadcast receiver that fires when data is appended
439 /// or the stream is closed. Returns `None` if the stream does not
440 /// exist or has expired.
441 ///
442 /// The method itself is sync; the handler awaits on the receiver.
443 fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>>;
444}