durable_streams_server/handlers/
get.rs1use crate::config::{LongPollTimeout, SseReconnectInterval};
2use crate::protocol::cursor;
3use crate::protocol::error::{Error, Result};
4use crate::protocol::headers::names;
5use crate::protocol::json_mode;
6use crate::protocol::offset::Offset;
7use crate::protocol::sse::{self, ControlPayload};
8use crate::storage::{ReadResult, Storage};
9use axum::{
10 Extension,
11 body::Body,
12 extract::{Path, Query, State},
13 http::{HeaderMap, StatusCode},
14 response::{IntoResponse, Response},
15};
16use bytes::{BufMut, BytesMut};
17use serde::Deserialize;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Duration;
21use tokio::time::Instant;
22
23#[derive(Debug, Deserialize)]
25pub struct ReadQuery {
26 offset: Option<String>,
28 live: Option<String>,
30 #[allow(dead_code)]
34 cursor: Option<String>,
35}
36
37pub async fn read_stream<S: Storage + 'static>(
54 State(storage): State<Arc<S>>,
55 Path(name): Path<String>,
56 Query(query): Query<ReadQuery>,
57 Extension(LongPollTimeout(timeout)): Extension<LongPollTimeout>,
58 Extension(SseReconnectInterval(reconnect_interval_secs)): Extension<SseReconnectInterval>,
59 headers: HeaderMap,
60) -> Result<Response> {
61 let raw_offset = if let Some(ref live) = query.live {
63 match query.offset {
64 Some(ref o) => o.clone(),
65 None => {
66 return Err(Error::InvalidHeader {
67 header: "offset".to_string(),
68 reason: format!("offset query parameter is required for live={live} mode"),
69 });
70 }
71 }
72 } else {
73 query.offset.clone().unwrap_or_else(|| "-1".to_string())
74 };
75
76 let offset = Offset::from_str(&raw_offset)?;
77 let metadata = storage.head(&name)?;
78 let content_type = metadata.config.content_type.clone();
79
80 if let Some(ref live) = query.live {
81 match live.as_str() {
82 "long-poll" => {
83 let if_none_match = headers.get("if-none-match").and_then(|v| v.to_str().ok());
84 read_long_poll(
85 &storage,
86 &name,
87 &offset,
88 &raw_offset,
89 if_none_match,
90 &content_type,
91 timeout,
92 )
93 .await
94 }
95 "sse" => read_sse(
96 storage,
97 name,
98 &offset,
99 &content_type,
100 reconnect_interval_secs,
101 ),
102 other => Err(Error::InvalidHeader {
103 header: "live".to_string(),
104 reason: format!("unsupported live mode: {other}"),
105 }),
106 }
107 } else {
108 let if_none_match = headers.get("if-none-match").and_then(|v| v.to_str().ok());
109 read_catch_up(
110 &storage,
111 &name,
112 &offset,
113 &raw_offset,
114 if_none_match,
115 &content_type,
116 )
117 }
118}
119
120fn read_catch_up<S: Storage>(
122 storage: &Arc<S>,
123 name: &str,
124 offset: &Offset,
125 raw_offset: &str,
126 if_none_match: Option<&str>,
127 content_type: &str,
128) -> Result<Response> {
129 let read_result = storage.read(name, offset)?;
131
132 let etag = generate_etag(raw_offset, &read_result);
134 if let Some(client_etag) = if_none_match
135 && client_etag == etag
136 {
137 return Ok(build_304_response(&read_result));
138 }
139
140 build_data_response(&read_result, content_type, &etag, None)
141}
142
143async fn read_long_poll<S: Storage>(
145 storage: &Arc<S>,
146 name: &str,
147 offset: &Offset,
148 raw_offset: &str,
149 if_none_match: Option<&str>,
150 content_type: &str,
151 timeout: Duration,
152) -> Result<Response> {
153 let mut receiver = storage
155 .subscribe(name)
156 .ok_or_else(|| Error::NotFound(name.to_string()))?;
157
158 let read_result = storage.read(name, offset)?;
159
160 let etag = generate_etag(raw_offset, &read_result);
162 if let Some(client_etag) = if_none_match
163 && client_etag == etag
164 {
165 return Ok(build_304_response(&read_result));
166 }
167
168 if !read_result.messages.is_empty() {
170 let cursor_val = cursor::generate(&read_result.next_offset);
171 return build_data_response(&read_result, content_type, &etag, Some(&cursor_val));
172 }
173
174 if read_result.closed && read_result.at_tail {
176 return Ok(build_204_response(&read_result.next_offset, true));
177 }
178
179 let tail_offset = read_result.next_offset.clone();
184 let tail_offset_str = tail_offset.to_string();
185
186 tokio::select! {
187 _ = receiver.recv() => {
188 handle_long_poll_wake(storage, name, &tail_offset, &tail_offset_str, content_type)
190 }
191 () = tokio::time::sleep(timeout) => {
192 let read_result = storage.read(name, &tail_offset)?;
194 let is_closed = read_result.closed && read_result.at_tail;
195 Ok(build_204_response(&read_result.next_offset, is_closed))
196 }
197 }
198}
199
200fn read_sse<S: Storage + 'static>(
207 storage: Arc<S>,
208 name: String,
209 offset: &Offset,
210 content_type: &str,
211 reconnect_interval_secs: u64,
212) -> Result<Response> {
213 let is_binary = sse::is_binary_content_type(content_type);
214 let is_json = json_mode::is_json_content_type(content_type);
215
216 let receiver = storage
218 .subscribe(&name)
219 .ok_or_else(|| Error::NotFound(name.clone()))?;
220
221 let read_result = storage.read(&name, offset)?;
223
224 let byte_stream = build_sse_byte_stream(
225 storage,
226 name,
227 read_result,
228 receiver,
229 is_binary,
230 is_json,
231 reconnect_interval_secs,
232 );
233
234 let body = Body::from_stream(byte_stream);
235
236 let mut headers = HeaderMap::new();
237 headers.insert("content-type", "text/event-stream".parse().unwrap());
238
239 if is_binary {
240 headers.insert("stream-sse-data-encoding", "base64".parse().unwrap());
241 }
242
243 Ok((StatusCode::OK, headers, body).into_response())
244}
245
246fn build_sse_byte_stream<S: Storage + 'static>(
250 storage: Arc<S>,
251 name: String,
252 initial_read: ReadResult,
253 mut receiver: tokio::sync::broadcast::Receiver<()>,
254 is_binary: bool,
255 is_json: bool,
256 reconnect_interval_secs: u64,
257) -> impl futures_util::stream::Stream<Item = std::result::Result<String, std::convert::Infallible>> + Send
258{
259 async_stream::stream! {
260 let read_result = initial_read;
261
262 let data_frames = sse::format_data_frames(&read_result.messages, is_binary, is_json);
264 if !data_frames.is_empty() {
265 yield Ok(data_frames);
266 }
267
268 let control = build_sse_control(&read_result);
269 yield Ok(sse::format_control_frame(&control));
270
271 if read_result.closed && read_result.at_tail {
273 return;
274 }
275
276 let mut tail_offset = read_result.next_offset;
278 let idle_timeout = if reconnect_interval_secs > 0 {
279 Some(Duration::from_secs(reconnect_interval_secs))
280 } else {
281 None
282 };
283 let mut idle_deadline = idle_timeout.map(|timeout| Instant::now() + timeout);
284
285 let keepalive_interval = Duration::from_secs(15);
286
287 loop {
288 tokio::select! {
289 recv_result = receiver.recv() => {
290 match recv_result {
291 Ok(()) | Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
292 }
294 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
295 if let Ok(rr) = storage.read(&name, &tail_offset) {
297 let data_frames = sse::format_data_frames(&rr.messages, is_binary, is_json);
298 if !data_frames.is_empty() {
299 yield Ok(data_frames);
300 }
301 let ctrl = build_sse_control(&rr);
302 yield Ok(sse::format_control_frame(&ctrl));
303 }
304 return;
305 }
306 }
307 }
308 () = tokio::time::sleep(keepalive_interval) => {
309 yield Ok(sse::format_keepalive_frame().to_string());
311 continue;
312 }
313 () = async {
314 match idle_deadline {
315 Some(deadline) => tokio::time::sleep_until(deadline).await,
316 None => std::future::pending().await,
317 }
318 } => {
319 return;
321 }
322 }
323
324 let Ok(rr) = storage.read(&name, &tail_offset) else {
326 return;
327 };
328
329 let data_frames = sse::format_data_frames(&rr.messages, is_binary, is_json);
330 if !data_frames.is_empty() {
331 yield Ok(data_frames);
332 }
333
334 let ctrl = build_sse_control(&rr);
335 yield Ok(sse::format_control_frame(&ctrl));
336
337 if !rr.messages.is_empty()
339 && let Some(timeout) = idle_timeout
340 {
341 idle_deadline = Some(Instant::now() + timeout);
342 }
343
344 if rr.closed && rr.at_tail {
345 return;
346 }
347
348 tail_offset = rr.next_offset;
349 }
350 }
351}
352
353fn build_sse_control(read_result: &ReadResult) -> ControlPayload {
355 let is_closed_at_tail = read_result.closed && read_result.at_tail;
356
357 ControlPayload {
358 stream_next_offset: read_result.next_offset.to_string(),
359 stream_cursor: if is_closed_at_tail {
360 None
361 } else {
362 Some(cursor::generate(&read_result.next_offset))
363 },
364 up_to_date: if read_result.at_tail {
365 Some(true)
366 } else {
367 None
368 },
369 stream_closed: if is_closed_at_tail { Some(true) } else { None },
370 }
371}
372
373fn handle_long_poll_wake<S: Storage>(
377 storage: &Arc<S>,
378 name: &str,
379 offset: &Offset,
380 raw_offset: &str,
381 content_type: &str,
382) -> Result<Response> {
383 let read_result = storage.read(name, offset)?;
384
385 if read_result.messages.is_empty() {
386 let is_closed = read_result.closed && read_result.at_tail;
388 return Ok(build_204_response(&read_result.next_offset, is_closed));
389 }
390
391 let etag = generate_etag(raw_offset, &read_result);
392 let cursor_val = cursor::generate(&read_result.next_offset);
393 build_data_response(&read_result, content_type, &etag, Some(&cursor_val))
394}
395
396fn generate_etag(start_offset: &str, read_result: &ReadResult) -> String {
400 let end_offset = read_result.next_offset.as_str();
401 if read_result.closed && read_result.at_tail {
402 format!("\"{start_offset}:{end_offset}:c\"")
403 } else {
404 format!("\"{start_offset}:{end_offset}\"")
405 }
406}
407
408fn build_304_response(read_result: &ReadResult) -> Response {
410 let mut headers = HeaderMap::new();
411 headers.insert(
412 names::STREAM_NEXT_OFFSET,
413 axum::http::HeaderValue::from_bytes(read_result.next_offset.as_str().as_bytes()).unwrap(),
414 );
415 headers.insert(names::STREAM_UP_TO_DATE, "true".parse().unwrap());
416 (StatusCode::NOT_MODIFIED, headers).into_response()
417}
418
419fn build_data_response(
423 read_result: &ReadResult,
424 content_type: &str,
425 etag: &str,
426 cursor_val: Option<&str>,
427) -> Result<Response> {
428 let body = build_body(read_result, content_type)?;
429
430 let mut headers = HeaderMap::new();
431 headers.insert("content-type", content_type.parse().unwrap());
432 headers.insert(
433 names::STREAM_NEXT_OFFSET,
434 axum::http::HeaderValue::from_bytes(read_result.next_offset.as_str().as_bytes()).unwrap(),
435 );
436 headers.insert(
437 names::STREAM_UP_TO_DATE,
438 (if read_result.at_tail { "true" } else { "false" })
439 .parse()
440 .unwrap(),
441 );
442 headers.insert("etag", etag.parse().unwrap());
443
444 let is_closed_at_tail = read_result.closed && read_result.at_tail;
445 if is_closed_at_tail {
446 headers.insert(names::STREAM_CLOSED, "true".parse().unwrap());
447 }
448
449 if let Some(c) = cursor_val {
450 headers.insert(names::STREAM_CURSOR, c.parse().unwrap());
451 }
452
453 Ok((StatusCode::OK, headers, body).into_response())
454}
455
456fn build_204_response(next_offset: &Offset, is_closed: bool) -> Response {
458 let mut headers = HeaderMap::new();
459 headers.insert(
460 names::STREAM_NEXT_OFFSET,
461 axum::http::HeaderValue::from_bytes(next_offset.as_str().as_bytes()).unwrap(),
462 );
463 headers.insert(names::STREAM_UP_TO_DATE, "true".parse().unwrap());
464
465 let cursor_val = cursor::generate(next_offset);
466 if is_closed {
467 headers.insert(names::STREAM_CLOSED, "true".parse().unwrap());
468 }
470 headers.insert(names::STREAM_CURSOR, cursor_val.parse().unwrap());
471
472 (StatusCode::NO_CONTENT, headers).into_response()
473}
474
475fn build_body(read_result: &ReadResult, content_type: &str) -> Result<bytes::Bytes> {
477 if json_mode::is_json_content_type(content_type) {
478 json_mode::wrap_read_iter(read_result.messages.iter())
479 } else if read_result.messages.is_empty() {
480 Ok(bytes::Bytes::new())
481 } else {
482 let total_len: usize = read_result.messages.iter().map(bytes::Bytes::len).sum();
483 let mut buf = BytesMut::with_capacity(total_len);
484 for message in &read_result.messages {
485 buf.put(message.clone());
486 }
487 Ok(buf.freeze())
488 }
489}