1#[cfg(feature = "axum")]
2pub mod extract;
3
4pub mod proto;
5pub mod s2s;
6pub mod sse;
7
8use std::time::Duration;
9
10use futures::stream::BoxStream;
11use itertools::Itertools as _;
12use s2_common::{
13 record,
14 types::{
15 self,
16 stream::{StreamName, StreamNamePrefix, StreamNameStartAfter},
17 },
18};
19use serde::{Deserialize, Serialize};
20use time::OffsetDateTime;
21
22use super::config::StreamConfig;
23use crate::{data::Format, mime::JsonOrProto};
24
25#[rustfmt::skip]
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
28pub struct StreamInfo {
29 pub name: StreamName,
31 #[serde(with = "time::serde::rfc3339")]
33 pub created_at: OffsetDateTime,
34 #[serde(with = "time::serde::rfc3339::option")]
36 pub deleted_at: Option<OffsetDateTime>,
37}
38
39impl From<types::stream::StreamInfo> for StreamInfo {
40 fn from(value: types::stream::StreamInfo) -> Self {
41 Self {
42 name: value.name,
43 created_at: value.created_at,
44 deleted_at: value.deleted_at,
45 }
46 }
47}
48
49#[rustfmt::skip]
50#[derive(Debug, Clone, Serialize, Deserialize)]
51#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
52#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
53pub struct ListStreamsRequest {
54 #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
56 pub prefix: Option<StreamNamePrefix>,
57 #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
60 pub start_after: Option<StreamNameStartAfter>,
61 #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
63 pub limit: Option<usize>,
64}
65
66super::impl_list_request_conversions!(
67 ListStreamsRequest,
68 types::stream::StreamNamePrefix,
69 types::stream::StreamNameStartAfter
70);
71
72#[rustfmt::skip]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
75pub struct ListStreamsResponse {
76 #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
78 pub streams: Vec<StreamInfo>,
79 pub has_more: bool,
81}
82
83#[rustfmt::skip]
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
86pub struct CreateStreamRequest {
87 pub stream: StreamName,
90 pub config: Option<StreamConfig>,
92}
93
94#[rustfmt::skip]
95#[derive(Debug, Clone, Serialize, Deserialize)]
96#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
97pub struct StreamPosition {
99 pub seq_num: record::SeqNum,
101 pub timestamp: record::Timestamp,
104}
105
106impl From<record::StreamPosition> for StreamPosition {
107 fn from(pos: record::StreamPosition) -> Self {
108 Self {
109 seq_num: pos.seq_num,
110 timestamp: pos.timestamp,
111 }
112 }
113}
114
115impl From<StreamPosition> for record::StreamPosition {
116 fn from(pos: StreamPosition) -> Self {
117 Self {
118 seq_num: pos.seq_num,
119 timestamp: pos.timestamp,
120 }
121 }
122}
123
124#[rustfmt::skip]
125#[derive(Debug, Clone, Serialize, Deserialize)]
126#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
127pub struct TailResponse {
128 pub tail: StreamPosition,
130}
131
132#[rustfmt::skip]
133#[derive(Debug, Clone, Serialize, Deserialize)]
134#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
135#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
136pub struct ReadStart {
137 #[cfg_attr(feature = "utoipa", param(value_type = record::SeqNum, required = false))]
139 pub seq_num: Option<record::SeqNum>,
140 #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
142 pub timestamp: Option<record::Timestamp>,
143 #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
145 pub tail_offset: Option<u64>,
146 #[cfg_attr(feature = "utoipa", param(value_type = bool, required = false))]
149 pub clamp: Option<bool>,
150}
151
152impl TryFrom<ReadStart> for types::stream::ReadStart {
153 type Error = types::ValidationError;
154
155 fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
156 let from = match (value.seq_num, value.timestamp, value.tail_offset) {
157 (Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
158 (None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
159 (None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
160 (None, None, None) => types::stream::ReadFrom::TailOffset(0),
161 _ => {
162 return Err(types::ValidationError(
163 "only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
164 ));
165 }
166 };
167 let clamp = value.clamp.unwrap_or(false);
168 Ok(Self { from, clamp })
169 }
170}
171
172#[rustfmt::skip]
173#[derive(Debug, Clone, Serialize, Deserialize)]
174#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
175#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
176pub struct ReadEnd {
177 #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
180 pub count: Option<usize>,
181 #[cfg_attr(feature = "utoipa", param(value_type = usize, required = false))]
184 pub bytes: Option<usize>,
185 #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
187 pub until: Option<record::Timestamp>,
188 #[cfg_attr(feature = "utoipa", param(value_type = u32, required = false))]
193 pub wait: Option<u32>,
194}
195
196impl From<ReadEnd> for types::stream::ReadEnd {
197 fn from(value: ReadEnd) -> Self {
198 Self {
199 limit: s2_common::read_extent::ReadLimit::from_count_and_bytes(
200 value.count,
201 value.bytes,
202 ),
203 until: value.until.into(),
204 wait: value.wait.map(|w| Duration::from_secs(w as u64)),
205 }
206 }
207}
208
209#[derive(Debug, Clone, Copy)]
210pub enum ReadRequest {
211 Unary {
213 format: Format,
214 response_mime: JsonOrProto,
215 },
216 EventStream {
218 format: Format,
219 last_event_id: Option<sse::LastEventId>,
220 },
221 S2s {
223 response_compression: s2s::CompressionAlgorithm,
224 },
225}
226
227pub enum AppendRequest {
228 Unary {
230 input: types::stream::AppendInput,
231 response_mime: JsonOrProto,
232 },
233 S2s {
235 inputs: BoxStream<'static, Result<types::stream::AppendInput, AppendInputStreamError>>,
236 response_compression: s2s::CompressionAlgorithm,
237 },
238}
239
240impl std::fmt::Debug for AppendRequest {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 match self {
243 AppendRequest::Unary {
244 input,
245 response_mime: response,
246 } => f
247 .debug_struct("AppendRequest::Unary")
248 .field("input", input)
249 .field("response", response)
250 .finish(),
251 AppendRequest::S2s { .. } => f.debug_struct("AppendRequest::S2s").finish(),
252 }
253 }
254}
255
256#[derive(Debug, thiserror::Error)]
257pub enum AppendInputStreamError {
258 #[error("Failed to decode S2S frame: {0}")]
259 FrameDecode(#[from] std::io::Error),
260 #[error(transparent)]
261 Validation(#[from] types::ValidationError),
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
268#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
269pub struct Header(pub String, pub String);
270
271#[rustfmt::skip]
272#[derive(Debug, Clone, Serialize)]
274#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
275pub struct SequencedRecord {
276 pub seq_num: record::SeqNum,
278 pub timestamp: record::Timestamp,
280 #[serde(default, skip_serializing_if = "Vec::is_empty")]
282 #[cfg_attr(feature = "utoipa", schema(required = false))]
283 pub headers: Vec<Header>,
284 #[serde(default, skip_serializing_if = "String::is_empty")]
286 #[cfg_attr(feature = "utoipa", schema(required = false))]
287 pub body: String,
288}
289
290impl SequencedRecord {
291 pub fn encode(
292 format: Format,
293 record::SequencedRecord {
294 position: record::StreamPosition { seq_num, timestamp },
295 record,
296 }: record::SequencedRecord,
297 ) -> Self {
298 let (headers, body) = record.into_parts();
299 Self {
300 seq_num,
301 timestamp,
302 headers: headers
303 .into_iter()
304 .map(|h| Header(format.encode(&h.name), format.encode(&h.value)))
305 .collect(),
306 body: format.encode(&body),
307 }
308 }
309}
310
311#[rustfmt::skip]
312#[derive(Debug, Clone, Serialize, Deserialize)]
314#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
315pub struct AppendRecord {
316 pub timestamp: Option<record::Timestamp>,
320 #[serde(default, skip_serializing_if = "Vec::is_empty")]
322 #[cfg_attr(feature = "utoipa", schema(required = false))]
323 pub headers: Vec<Header>,
324 #[serde(default, skip_serializing_if = "String::is_empty")]
326 #[cfg_attr(feature = "utoipa", schema(required = false))]
327 pub body: String,
328}
329
330impl AppendRecord {
331 pub fn decode(
332 self,
333 format: Format,
334 ) -> Result<types::stream::AppendRecord, types::ValidationError> {
335 let headers = self
336 .headers
337 .into_iter()
338 .map(|Header(name, value)| {
339 Ok::<record::Header, types::ValidationError>(record::Header {
340 name: format.decode(name)?,
341 value: format.decode(value)?,
342 })
343 })
344 .try_collect()?;
345
346 let body = format.decode(self.body)?;
347
348 let record = record::Record::try_from_parts(headers, body)
349 .map_err(|e| e.to_string())?
350 .into();
351
352 let parts = types::stream::AppendRecordParts {
353 timestamp: self.timestamp,
354 record,
355 };
356
357 types::stream::AppendRecord::try_from(parts)
358 .map_err(|e| types::ValidationError(e.to_string()))
359 }
360}
361
362#[rustfmt::skip]
363#[derive(Debug, Clone, Serialize, Deserialize)]
365#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
366pub struct AppendInput {
367 pub records: Vec<AppendRecord>,
370 pub match_seq_num: Option<record::SeqNum>,
372 pub fencing_token: Option<record::FencingToken>,
374}
375
376impl AppendInput {
377 pub fn decode(
378 self,
379 format: Format,
380 ) -> Result<types::stream::AppendInput, types::ValidationError> {
381 let records: Vec<types::stream::AppendRecord> = self
382 .records
383 .into_iter()
384 .map(|record| record.decode(format))
385 .try_collect()?;
386
387 Ok(types::stream::AppendInput {
388 records: types::stream::AppendRecordBatch::try_from(records)?,
389 match_seq_num: self.match_seq_num,
390 fencing_token: self.fencing_token,
391 })
392 }
393}
394
395#[rustfmt::skip]
396#[derive(Debug, Clone, Serialize)]
398#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
399pub struct AppendAck {
400 pub start: StreamPosition,
402 pub end: StreamPosition,
405 pub tail: StreamPosition,
408}
409
410impl From<types::stream::AppendAck> for AppendAck {
411 fn from(ack: types::stream::AppendAck) -> Self {
412 Self {
413 start: ack.start.into(),
414 end: ack.end.into(),
415 tail: ack.tail.into(),
416 }
417 }
418}
419
420#[rustfmt::skip]
421#[derive(Debug, Clone, Serialize, Deserialize)]
423#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
424#[serde(rename_all = "snake_case")]
425pub enum AppendConditionFailed {
426 #[cfg_attr(feature = "utoipa", schema(title = "fencing token"))]
429 FencingTokenMismatch(record::FencingToken),
430 #[cfg_attr(feature = "utoipa", schema(title = "seq num"))]
433 SeqNumMismatch(record::SeqNum),
434}
435
436#[rustfmt::skip]
437#[derive(Debug, Clone, Serialize)]
438#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
439pub struct ReadBatch {
440 pub records: Vec<SequencedRecord>,
443 #[serde(skip_serializing_if = "Option::is_none")]
446 pub tail: Option<StreamPosition>,
447}
448
449impl ReadBatch {
450 pub fn encode(format: Format, batch: types::stream::ReadBatch) -> Self {
451 Self {
452 records: batch
453 .records
454 .into_iter()
455 .map(|record| SequencedRecord::encode(format, record))
456 .collect(),
457 tail: batch.tail.map(Into::into),
458 }
459 }
460}