1use std::str::FromStr;
2
3use s2_common::{http::ParseableHeader, types};
4use serde::Serialize;
5
6use super::ReadBatch;
7
8static LAST_EVENT_ID_HEADER: http::HeaderName = http::HeaderName::from_static("last-event-id");
9
10#[derive(Debug, Clone, Copy)]
11pub struct LastEventId {
12 pub seq_num: u64,
13 pub count: usize,
14 pub bytes: usize,
15}
16
17impl ParseableHeader for LastEventId {
18 fn name() -> &'static http::HeaderName {
19 &LAST_EVENT_ID_HEADER
20 }
21}
22
23impl Serialize for LastEventId {
24 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
25 where
26 S: serde::Serializer,
27 {
28 self.to_string().serialize(serializer)
29 }
30}
31
32impl std::fmt::Display for LastEventId {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 let Self {
35 seq_num,
36 count,
37 bytes,
38 } = self;
39 write!(f, "{seq_num},{count},{bytes}")
40 }
41}
42
43impl FromStr for LastEventId {
44 type Err = types::ValidationError;
45
46 fn from_str(s: &str) -> Result<Self, Self::Err> {
47 let mut iter = s.splitn(3, ",");
48
49 fn get_next<T>(
50 iter: &mut std::str::SplitN<&str>,
51 field: &str,
52 ) -> Result<T, types::ValidationError>
53 where
54 T: FromStr,
55 <T as FromStr>::Err: std::fmt::Display,
56 {
57 let item = iter
58 .next()
59 .ok_or_else(|| format!("missing {field} in Last-Event-Id"))?;
60 item.parse()
61 .map_err(|e| format!("invalid {field} in Last-Event-ID: {e}").into())
62 }
63
64 let seq_num = get_next(&mut iter, "seq_num")?;
65 let count = get_next(&mut iter, "count")?;
66 let bytes = get_next(&mut iter, "bytes")?;
67
68 Ok(Self {
69 seq_num,
70 count,
71 bytes,
72 })
73 }
74}
75
76macro_rules! event {
77 ($name:ident, $val:expr) => {
78 #[derive(Serialize)]
79 #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
80 #[serde(rename_all = "snake_case")]
81 pub enum $name {
82 $name,
83 }
84
85 impl AsRef<str> for $name {
86 fn as_ref(&self) -> &str {
87 $val
88 }
89 }
90 };
91}
92
93event!(Batch, "batch");
94event!(Error, "error");
95event!(Ping, "ping");
96
97#[derive(Serialize)]
98#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
99#[serde(untagged)]
100pub enum ReadEvent {
101 #[cfg_attr(feature = "utoipa", schema(title = "batch"))]
102 Batch {
103 #[cfg_attr(feature = "utoipa", schema(inline))]
104 event: Batch,
105 data: ReadBatch,
106 #[cfg_attr(feature = "utoipa", schema(value_type = String, pattern = "^[0-9]+,[0-9]+,[0-9]+$"))]
107 id: LastEventId,
108 },
109 #[cfg_attr(feature = "utoipa", schema(title = "error"))]
110 Error {
111 #[cfg_attr(feature = "utoipa", schema(inline))]
112 event: Error,
113 data: String,
114 },
115 #[cfg_attr(feature = "utoipa", schema(title = "ping"))]
116 Ping {
117 #[cfg_attr(feature = "utoipa", schema(inline))]
118 event: Ping,
119 data: PingEventData,
120 },
121 #[cfg_attr(feature = "utoipa", schema(title = "done"))]
122 #[serde(skip)]
123 Done {
124 #[cfg_attr(feature = "utoipa", schema(value_type = String, pattern = r"^\[DONE\]$"))]
125 data: DoneEventData,
126 },
127}
128
129#[cfg(feature = "axum")]
130fn elapsed_since_epoch() -> std::time::Duration {
131 std::time::SystemTime::now()
132 .duration_since(std::time::SystemTime::UNIX_EPOCH)
133 .expect("healthy clock")
134}
135
136#[cfg(feature = "axum")]
137pub fn read_batch_event(
138 format: crate::data::Format,
139 batch: &types::stream::ReadBatch,
140 id: LastEventId,
141) -> Result<axum::response::sse::Event, axum::Error> {
142 axum::response::sse::Event::default()
143 .event(Batch::Batch)
144 .id(id.to_string())
145 .json_data(super::json::serialize_read_batch(format, batch))
146}
147
148#[cfg(feature = "axum")]
149pub fn error_event(data: String) -> Result<axum::response::sse::Event, axum::Error> {
150 Ok(axum::response::sse::Event::default()
151 .event(Error::Error)
152 .data(data))
153}
154
155#[cfg(feature = "axum")]
156pub fn ping_event() -> Result<axum::response::sse::Event, axum::Error> {
157 axum::response::sse::Event::default()
158 .event(Ping::Ping)
159 .json_data(PingEventData {
160 timestamp: elapsed_since_epoch().as_millis() as u64,
161 })
162}
163
164#[cfg(feature = "axum")]
165pub fn done_event() -> Result<axum::response::sse::Event, axum::Error> {
166 Ok(axum::response::sse::Event::default().data(DoneEventData))
167}
168
169#[derive(Debug, Clone, Serialize)]
170#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
171#[serde(rename = "[DONE]")]
172pub struct DoneEventData;
173
174impl AsRef<str> for DoneEventData {
175 fn as_ref(&self) -> &str {
176 "[DONE]"
177 }
178}
179
180#[rustfmt::skip]
181#[derive(Debug, Clone, Serialize)]
182#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
183pub struct PingEventData {
184 pub timestamp: u64,
185}