Skip to main content

durable_streams_server/handlers/
get.rs

1use crate::config::{LongPollTimeout, SseReconnectInterval};
2use crate::protocol::cursor;
3use crate::protocol::error::{Error, Result};
4use crate::protocol::headers::names;
5use crate::protocol::json_mode;
6use crate::protocol::offset::Offset;
7use crate::protocol::sse::{self, ControlPayload};
8use crate::storage::{ReadResult, Storage};
9use axum::{
10    Extension,
11    body::Body,
12    extract::{Path, Query, State},
13    http::{HeaderMap, StatusCode},
14    response::{IntoResponse, Response},
15};
16use bytes::{BufMut, BytesMut};
17use serde::Deserialize;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Duration;
21use tokio::time::Instant;
22
23/// Query parameters for GET requests
24#[derive(Debug, Deserialize)]
25pub struct ReadQuery {
26    /// Starting offset (None when not provided; defaults vary by mode)
27    offset: Option<String>,
28    /// Live mode: "long-poll" for long-polling
29    live: Option<String>,
30    /// Cursor echoed from previous long-poll response. Parsed by axum/serde
31    /// so the query param is accepted, but the server doesn't use it — it
32    /// exists for CDN intermediaries to collapse identical polling requests.
33    #[allow(dead_code)]
34    cursor: Option<String>,
35}
36
37/// GET handler for reading stream data
38///
39/// Supports three modes:
40/// - Catch-up (no `live` param): immediate read, returns all available data
41/// - Long-poll (`live=long-poll`): waits for new data at tail
42/// - SSE (`live=sse`): streaming Server-Sent Events
43///
44/// # Errors
45///
46/// Returns error if stream doesn't exist, offset is invalid,
47/// or storage operation fails.
48///
49/// # Panics
50///
51/// Panics if validated header values fail to parse into `HeaderValue`,
52/// which should never happen with valid inputs.
53pub async fn read_stream<S: Storage + 'static>(
54    State(storage): State<Arc<S>>,
55    Path(name): Path<String>,
56    Query(query): Query<ReadQuery>,
57    Extension(LongPollTimeout(timeout)): Extension<LongPollTimeout>,
58    Extension(SseReconnectInterval(reconnect_interval_secs)): Extension<SseReconnectInterval>,
59    headers: HeaderMap,
60) -> Result<Response> {
61    // Resolve offset: live modes require explicit offset, catch-up defaults to "-1"
62    let raw_offset = if let Some(ref live) = query.live {
63        match query.offset {
64            Some(ref o) => o.clone(),
65            None => {
66                return Err(Error::InvalidHeader {
67                    header: "offset".to_string(),
68                    reason: format!("offset query parameter is required for live={live} mode"),
69                });
70            }
71        }
72    } else {
73        query.offset.clone().unwrap_or_else(|| "-1".to_string())
74    };
75
76    let offset = Offset::from_str(&raw_offset)?;
77    let metadata = storage.head(&name)?;
78    let content_type = metadata.config.content_type.clone();
79
80    if let Some(ref live) = query.live {
81        match live.as_str() {
82            "long-poll" => {
83                let if_none_match = headers.get("if-none-match").and_then(|v| v.to_str().ok());
84                read_long_poll(
85                    &storage,
86                    &name,
87                    &offset,
88                    &raw_offset,
89                    if_none_match,
90                    &content_type,
91                    timeout,
92                )
93                .await
94            }
95            "sse" => read_sse(
96                storage,
97                name,
98                &offset,
99                &content_type,
100                reconnect_interval_secs,
101            ),
102            other => Err(Error::InvalidHeader {
103                header: "live".to_string(),
104                reason: format!("unsupported live mode: {other}"),
105            }),
106        }
107    } else {
108        let if_none_match = headers.get("if-none-match").and_then(|v| v.to_str().ok());
109        read_catch_up(
110            &storage,
111            &name,
112            &offset,
113            &raw_offset,
114            if_none_match,
115            &content_type,
116        )
117    }
118}
119
120/// Catch-up mode: immediate read of all available data.
121fn read_catch_up<S: Storage>(
122    storage: &Arc<S>,
123    name: &str,
124    offset: &Offset,
125    raw_offset: &str,
126    if_none_match: Option<&str>,
127    content_type: &str,
128) -> Result<Response> {
129    // Read from storage (single snapshot for offsets/closed state)
130    let read_result = storage.read(name, offset)?;
131
132    // Check 304 Not Modified
133    let etag = generate_etag(raw_offset, &read_result);
134    if let Some(client_etag) = if_none_match
135        && client_etag == etag
136    {
137        return Ok(build_304_response(&read_result));
138    }
139
140    build_data_response(&read_result, content_type, &etag, None)
141}
142
143/// Long-poll mode: wait for new data at tail, return immediately if data exists.
144async fn read_long_poll<S: Storage>(
145    storage: &Arc<S>,
146    name: &str,
147    offset: &Offset,
148    raw_offset: &str,
149    if_none_match: Option<&str>,
150    content_type: &str,
151    timeout: Duration,
152) -> Result<Response> {
153    // Subscribe BEFORE read to avoid missing notifications between read and subscribe
154    let mut receiver = storage
155        .subscribe(name)
156        .ok_or_else(|| Error::NotFound(name.to_string()))?;
157
158    let read_result = storage.read(name, offset)?;
159
160    // Check 304 Not Modified (same as catch-up)
161    let etag = generate_etag(raw_offset, &read_result);
162    if let Some(client_etag) = if_none_match
163        && client_etag == etag
164    {
165        return Ok(build_304_response(&read_result));
166    }
167
168    // Data available → return immediately (like catch-up + cursor)
169    if !read_result.messages.is_empty() {
170        let cursor_val = cursor::generate(&read_result.next_offset);
171        return build_data_response(&read_result, content_type, &etag, Some(&cursor_val));
172    }
173
174    // At tail + closed → immediate 204 (MUST NOT wait)
175    if read_result.closed && read_result.at_tail {
176        return Ok(build_204_response(&read_result.next_offset, true));
177    }
178
179    // At tail + open → wait for notification or timeout.
180    // Capture the concrete tail offset for re-reads. The original `offset`
181    // may be a sentinel (e.g. `now`) which always returns empty on re-read;
182    // using the resolved position ensures we pick up data that arrived.
183    let tail_offset = read_result.next_offset.clone();
184    let tail_offset_str = tail_offset.to_string();
185
186    tokio::select! {
187        _ = receiver.recv() => {
188            // Data or close event — re-read from resolved tail position
189            handle_long_poll_wake(storage, name, &tail_offset, &tail_offset_str, content_type)
190        }
191        () = tokio::time::sleep(timeout) => {
192            // Timeout — return 204 with current position
193            let read_result = storage.read(name, &tail_offset)?;
194            let is_closed = read_result.closed && read_result.at_tail;
195            Ok(build_204_response(&read_result.next_offset, is_closed))
196        }
197    }
198}
199
200/// SSE mode: stream data as Server-Sent Events (PROTOCOL.md §5.8).
201///
202/// Validates preconditions (stream existence, offset) eagerly before
203/// starting the stream. Uses raw byte streaming for full control over
204/// the SSE wire format. Once streaming begins, errors are silently
205/// dropped (SSE has no error frame).
206fn read_sse<S: Storage + 'static>(
207    storage: Arc<S>,
208    name: String,
209    offset: &Offset,
210    content_type: &str,
211    reconnect_interval_secs: u64,
212) -> Result<Response> {
213    let is_binary = sse::is_binary_content_type(content_type);
214    let is_json = json_mode::is_json_content_type(content_type);
215
216    // Subscribe before read to avoid missing notifications
217    let receiver = storage
218        .subscribe(&name)
219        .ok_or_else(|| Error::NotFound(name.clone()))?;
220
221    // Initial read to get current state
222    let read_result = storage.read(&name, offset)?;
223
224    let byte_stream = build_sse_byte_stream(
225        storage,
226        name,
227        read_result,
228        receiver,
229        is_binary,
230        is_json,
231        reconnect_interval_secs,
232    );
233
234    let body = Body::from_stream(byte_stream);
235
236    let mut headers = HeaderMap::new();
237    headers.insert("content-type", "text/event-stream".parse().unwrap());
238
239    if is_binary {
240        headers.insert("stream-sse-data-encoding", "base64".parse().unwrap());
241    }
242
243    Ok((StatusCode::OK, headers, body).into_response())
244}
245
246/// Build a byte stream that yields raw SSE frame strings.
247///
248/// Manages keep-alive, idle timeout, and the subscribe-before-read pattern.
249fn build_sse_byte_stream<S: Storage + 'static>(
250    storage: Arc<S>,
251    name: String,
252    initial_read: ReadResult,
253    mut receiver: tokio::sync::broadcast::Receiver<()>,
254    is_binary: bool,
255    is_json: bool,
256    reconnect_interval_secs: u64,
257) -> impl futures_util::stream::Stream<Item = std::result::Result<String, std::convert::Infallible>> + Send
258{
259    async_stream::stream! {
260        let read_result = initial_read;
261
262        // Emit initial data + control (JSON messages batched into one event)
263        let data_frames = sse::format_data_frames(&read_result.messages, is_binary, is_json);
264        if !data_frames.is_empty() {
265            yield Ok(data_frames);
266        }
267
268        let control = build_sse_control(&read_result);
269        yield Ok(sse::format_control_frame(&control));
270
271        // If closed at tail, we're done
272        if read_result.closed && read_result.at_tail {
273            return;
274        }
275
276        // Enter wait loop at tail
277        let mut tail_offset = read_result.next_offset;
278        let idle_timeout = if reconnect_interval_secs > 0 {
279            Some(Duration::from_secs(reconnect_interval_secs))
280        } else {
281            None
282        };
283        let mut idle_deadline = idle_timeout.map(|timeout| Instant::now() + timeout);
284
285        let keepalive_interval = Duration::from_secs(15);
286
287        loop {
288            tokio::select! {
289                recv_result = receiver.recv() => {
290                    match recv_result {
291                        Ok(()) | Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
292                            // New data or we fell behind — re-read from tail
293                        }
294                        Err(tokio::sync::broadcast::error::RecvError::Closed) => {
295                            // Channel closed — final read + emit + end
296                            if let Ok(rr) = storage.read(&name, &tail_offset) {
297                                let data_frames = sse::format_data_frames(&rr.messages, is_binary, is_json);
298                                if !data_frames.is_empty() {
299                                    yield Ok(data_frames);
300                                }
301                                let ctrl = build_sse_control(&rr);
302                                yield Ok(sse::format_control_frame(&ctrl));
303                            }
304                            return;
305                        }
306                    }
307                }
308                () = tokio::time::sleep(keepalive_interval) => {
309                    // Keep-alive comment frame
310                    yield Ok(sse::format_keepalive_frame().to_string());
311                    continue;
312                }
313                () = async {
314                    match idle_deadline {
315                        Some(deadline) => tokio::time::sleep_until(deadline).await,
316                        None => std::future::pending().await,
317                    }
318                } => {
319                    // Idle close per PROTOCOL.md §5.8
320                    return;
321                }
322            }
323
324            // Re-read from tail position
325            let Ok(rr) = storage.read(&name, &tail_offset) else {
326                return;
327            };
328
329            let data_frames = sse::format_data_frames(&rr.messages, is_binary, is_json);
330            if !data_frames.is_empty() {
331                yield Ok(data_frames);
332            }
333
334            let ctrl = build_sse_control(&rr);
335            yield Ok(sse::format_control_frame(&ctrl));
336
337            // Keep idle-close tied to stream activity (new data), not keepalive ticks.
338            if !rr.messages.is_empty()
339                && let Some(timeout) = idle_timeout
340            {
341                idle_deadline = Some(Instant::now() + timeout);
342            }
343
344            if rr.closed && rr.at_tail {
345                return;
346            }
347
348            tail_offset = rr.next_offset;
349        }
350    }
351}
352
353/// Build a `ControlPayload` from a `ReadResult`.
354fn build_sse_control(read_result: &ReadResult) -> ControlPayload {
355    let is_closed_at_tail = read_result.closed && read_result.at_tail;
356
357    ControlPayload {
358        stream_next_offset: read_result.next_offset.to_string(),
359        stream_cursor: if is_closed_at_tail {
360            None
361        } else {
362            Some(cursor::generate(&read_result.next_offset))
363        },
364        up_to_date: if read_result.at_tail {
365            Some(true)
366        } else {
367            None
368        },
369        stream_closed: if is_closed_at_tail { Some(true) } else { None },
370    }
371}
372
373/// Handle wake-up from broadcast in long-poll mode.
374///
375/// Re-reads from storage to get the actual data that triggered the notification.
376fn handle_long_poll_wake<S: Storage>(
377    storage: &Arc<S>,
378    name: &str,
379    offset: &Offset,
380    raw_offset: &str,
381    content_type: &str,
382) -> Result<Response> {
383    let read_result = storage.read(name, offset)?;
384
385    if read_result.messages.is_empty() {
386        // Woke up but no data (e.g., stream was closed)
387        let is_closed = read_result.closed && read_result.at_tail;
388        return Ok(build_204_response(&read_result.next_offset, is_closed));
389    }
390
391    let etag = generate_etag(raw_offset, &read_result);
392    let cursor_val = cursor::generate(&read_result.next_offset);
393    build_data_response(&read_result, content_type, &etag, Some(&cursor_val))
394}
395
396/// Generate `ETag` from read result.
397///
398/// Format: `"{start_offset}:{end_offset}"` or `"{start_offset}:{end_offset}:c"` if closed at tail.
399fn generate_etag(start_offset: &str, read_result: &ReadResult) -> String {
400    let end_offset = read_result.next_offset.as_str();
401    if read_result.closed && read_result.at_tail {
402        format!("\"{start_offset}:{end_offset}:c\"")
403    } else {
404        format!("\"{start_offset}:{end_offset}\"")
405    }
406}
407
408/// Build a 304 Not Modified response.
409fn build_304_response(read_result: &ReadResult) -> Response {
410    let mut headers = HeaderMap::new();
411    headers.insert(
412        names::STREAM_NEXT_OFFSET,
413        axum::http::HeaderValue::from_bytes(read_result.next_offset.as_str().as_bytes()).unwrap(),
414    );
415    headers.insert(names::STREAM_UP_TO_DATE, "true".parse().unwrap());
416    (StatusCode::NOT_MODIFIED, headers).into_response()
417}
418
419/// Build a 200 OK response with message data.
420///
421/// If `cursor` is `Some`, includes `Stream-Cursor` header (long-poll mode).
422fn build_data_response(
423    read_result: &ReadResult,
424    content_type: &str,
425    etag: &str,
426    cursor_val: Option<&str>,
427) -> Result<Response> {
428    let body = build_body(read_result, content_type)?;
429
430    let mut headers = HeaderMap::new();
431    headers.insert("content-type", content_type.parse().unwrap());
432    headers.insert(
433        names::STREAM_NEXT_OFFSET,
434        axum::http::HeaderValue::from_bytes(read_result.next_offset.as_str().as_bytes()).unwrap(),
435    );
436    headers.insert(
437        names::STREAM_UP_TO_DATE,
438        (if read_result.at_tail { "true" } else { "false" })
439            .parse()
440            .unwrap(),
441    );
442    headers.insert("etag", etag.parse().unwrap());
443
444    let is_closed_at_tail = read_result.closed && read_result.at_tail;
445    if is_closed_at_tail {
446        headers.insert(names::STREAM_CLOSED, "true".parse().unwrap());
447    }
448
449    if let Some(c) = cursor_val {
450        headers.insert(names::STREAM_CURSOR, c.parse().unwrap());
451    }
452
453    Ok((StatusCode::OK, headers, body).into_response())
454}
455
456/// Build a 204 No Content response for long-poll timeout or closed stream.
457fn build_204_response(next_offset: &Offset, is_closed: bool) -> Response {
458    let mut headers = HeaderMap::new();
459    headers.insert(
460        names::STREAM_NEXT_OFFSET,
461        axum::http::HeaderValue::from_bytes(next_offset.as_str().as_bytes()).unwrap(),
462    );
463    headers.insert(names::STREAM_UP_TO_DATE, "true".parse().unwrap());
464
465    let cursor_val = cursor::generate(next_offset);
466    if is_closed {
467        headers.insert(names::STREAM_CLOSED, "true".parse().unwrap());
468        // Cursor MAY be omitted when closed per spec, but including it is harmless
469    }
470    headers.insert(names::STREAM_CURSOR, cursor_val.parse().unwrap());
471
472    (StatusCode::NO_CONTENT, headers).into_response()
473}
474
475/// Build response body from read result messages.
476fn build_body(read_result: &ReadResult, content_type: &str) -> Result<bytes::Bytes> {
477    if json_mode::is_json_content_type(content_type) {
478        json_mode::wrap_read_iter(read_result.messages.iter())
479    } else if read_result.messages.is_empty() {
480        Ok(bytes::Bytes::new())
481    } else {
482        let total_len: usize = read_result.messages.iter().map(bytes::Bytes::len).sum();
483        let mut buf = BytesMut::with_capacity(total_len);
484        for message in &read_result.messages {
485            buf.put(message.clone());
486        }
487        Ok(buf.freeze())
488    }
489}