1#[cfg(feature = "axum")]
2pub mod extract;
3
4pub mod json;
5pub mod proto;
6pub mod s2s;
7pub mod sse;
8
9use std::time::Duration;
10
11use futures::stream::BoxStream;
12use itertools::Itertools as _;
13use s2_common::{
14 record,
15 types::{
16 self,
17 stream::{StreamName, StreamNamePrefix, StreamNameStartAfter},
18 },
19};
20use serde::{Deserialize, Serialize};
21use time::OffsetDateTime;
22
23use super::config::StreamConfig;
24use crate::{data::Format, mime::JsonOrProto};
25
26#[rustfmt::skip]
27#[derive(Debug, Clone, Serialize, Deserialize)]
28#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
29pub struct StreamInfo {
30 pub name: StreamName,
32 #[serde(with = "time::serde::rfc3339")]
34 pub created_at: OffsetDateTime,
35 #[serde(with = "time::serde::rfc3339::option")]
37 pub deleted_at: Option<OffsetDateTime>,
38}
39
40impl From<types::stream::StreamInfo> for StreamInfo {
41 fn from(value: types::stream::StreamInfo) -> Self {
42 Self {
43 name: value.name,
44 created_at: value.created_at,
45 deleted_at: value.deleted_at,
46 }
47 }
48}
49
50#[rustfmt::skip]
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
53#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
54pub struct ListStreamsRequest {
55 #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
57 pub prefix: Option<StreamNamePrefix>,
58 #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
61 pub start_after: Option<StreamNameStartAfter>,
62 #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
64 pub limit: Option<usize>,
65}
66
67super::impl_list_request_conversions!(
68 ListStreamsRequest,
69 types::stream::StreamNamePrefix,
70 types::stream::StreamNameStartAfter
71);
72
73#[rustfmt::skip]
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
76pub struct ListStreamsResponse {
77 #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
79 pub streams: Vec<StreamInfo>,
80 pub has_more: bool,
82}
83
84#[rustfmt::skip]
85#[derive(Debug, Clone, Serialize, Deserialize)]
86#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
87pub struct CreateStreamRequest {
88 pub stream: StreamName,
91 pub config: Option<StreamConfig>,
93}
94
95#[rustfmt::skip]
96#[derive(Debug, Clone, Serialize, Deserialize)]
97#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
98pub struct StreamPosition {
100 pub seq_num: record::SeqNum,
102 pub timestamp: record::Timestamp,
105}
106
107impl From<record::StreamPosition> for StreamPosition {
108 fn from(pos: record::StreamPosition) -> Self {
109 Self {
110 seq_num: pos.seq_num,
111 timestamp: pos.timestamp,
112 }
113 }
114}
115
116impl From<StreamPosition> for record::StreamPosition {
117 fn from(pos: StreamPosition) -> Self {
118 Self {
119 seq_num: pos.seq_num,
120 timestamp: pos.timestamp,
121 }
122 }
123}
124
125#[rustfmt::skip]
126#[derive(Debug, Clone, Serialize, Deserialize)]
127#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
128pub struct TailResponse {
129 pub tail: StreamPosition,
131}
132
133#[rustfmt::skip]
134#[derive(Debug, Clone, Serialize, Deserialize)]
135#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
136#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
137pub struct ReadStart {
138 #[cfg_attr(feature = "utoipa", param(value_type = record::SeqNum, required = false))]
140 pub seq_num: Option<record::SeqNum>,
141 #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
143 pub timestamp: Option<record::Timestamp>,
144 #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
146 pub tail_offset: Option<u64>,
147 #[cfg_attr(feature = "utoipa", param(value_type = bool, required = false))]
150 pub clamp: Option<bool>,
151}
152
153impl TryFrom<ReadStart> for types::stream::ReadStart {
154 type Error = types::ValidationError;
155
156 fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
157 let from = match (value.seq_num, value.timestamp, value.tail_offset) {
158 (Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
159 (None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
160 (None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
161 (None, None, None) => types::stream::ReadFrom::TailOffset(0),
162 _ => {
163 return Err(types::ValidationError(
164 "only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
165 ));
166 }
167 };
168 let clamp = value.clamp.unwrap_or(false);
169 Ok(Self { from, clamp })
170 }
171}
172
173#[rustfmt::skip]
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
176#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
177pub struct ReadEnd {
178 #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
181 pub count: Option<usize>,
182 #[cfg_attr(feature = "utoipa", param(value_type = usize, required = false))]
185 pub bytes: Option<usize>,
186 #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
188 pub until: Option<record::Timestamp>,
189 #[cfg_attr(feature = "utoipa", param(value_type = u32, required = false))]
194 pub wait: Option<u32>,
195}
196
197impl From<ReadEnd> for types::stream::ReadEnd {
198 fn from(value: ReadEnd) -> Self {
199 Self {
200 limit: s2_common::read_extent::ReadLimit::from_count_and_bytes(
201 value.count,
202 value.bytes,
203 ),
204 until: value.until.into(),
205 wait: value.wait.map(|w| Duration::from_secs(w as u64)),
206 }
207 }
208}
209
210#[derive(Debug, Clone, Copy)]
211pub enum ReadRequest {
212 Unary {
214 format: Format,
215 response_mime: JsonOrProto,
216 },
217 EventStream {
219 format: Format,
220 last_event_id: Option<sse::LastEventId>,
221 },
222 S2s {
224 response_compression: s2s::CompressionAlgorithm,
225 },
226}
227
228pub enum AppendRequest {
229 Unary {
231 input: types::stream::AppendInput,
232 response_mime: JsonOrProto,
233 },
234 S2s {
236 inputs: BoxStream<'static, Result<types::stream::AppendInput, AppendInputStreamError>>,
237 response_compression: s2s::CompressionAlgorithm,
238 },
239}
240
241impl std::fmt::Debug for AppendRequest {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 match self {
244 AppendRequest::Unary {
245 input,
246 response_mime: response,
247 } => f
248 .debug_struct("AppendRequest::Unary")
249 .field("input", input)
250 .field("response", response)
251 .finish(),
252 AppendRequest::S2s { .. } => f.debug_struct("AppendRequest::S2s").finish(),
253 }
254 }
255}
256
257#[derive(Debug, thiserror::Error)]
258pub enum AppendInputStreamError {
259 #[error("Failed to decode S2S frame: {0}")]
260 FrameDecode(#[from] std::io::Error),
261 #[error(transparent)]
262 Validation(#[from] types::ValidationError),
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
267pub struct Header(pub String, pub String);
268
269#[rustfmt::skip]
270#[derive(Debug, Clone, Serialize)]
272#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
273pub struct SequencedRecord {
274 pub seq_num: record::SeqNum,
276 pub timestamp: record::Timestamp,
278 #[serde(default, skip_serializing_if = "Vec::is_empty")]
280 #[cfg_attr(feature = "utoipa", schema(required = false))]
281 pub headers: Vec<Header>,
282 #[serde(default, skip_serializing_if = "String::is_empty")]
284 #[cfg_attr(feature = "utoipa", schema(required = false))]
285 pub body: String,
286}
287
288impl SequencedRecord {
289 pub fn encode(
290 format: Format,
291 record::SequencedRecord {
292 position: record::StreamPosition { seq_num, timestamp },
293 record,
294 }: record::SequencedRecord,
295 ) -> Self {
296 let (headers, body) = record.into_parts();
297 Self {
298 seq_num,
299 timestamp,
300 headers: headers
301 .into_iter()
302 .map(|h| Header(format.encode(&h.name), format.encode(&h.value)))
303 .collect(),
304 body: format.encode(&body),
305 }
306 }
307}
308
309#[rustfmt::skip]
310#[derive(Debug, Clone, Serialize, Deserialize)]
312#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
313pub struct AppendRecord {
314 pub timestamp: Option<record::Timestamp>,
318 #[serde(default, skip_serializing_if = "Vec::is_empty")]
320 #[cfg_attr(feature = "utoipa", schema(required = false))]
321 pub headers: Vec<Header>,
322 #[serde(default, skip_serializing_if = "String::is_empty")]
324 #[cfg_attr(feature = "utoipa", schema(required = false))]
325 pub body: String,
326}
327
328impl AppendRecord {
329 pub fn decode(
330 self,
331 format: Format,
332 ) -> Result<types::stream::AppendRecord, types::ValidationError> {
333 let headers = self
334 .headers
335 .into_iter()
336 .map(|Header(name, value)| {
337 Ok::<record::Header, types::ValidationError>(record::Header {
338 name: format.decode(name)?,
339 value: format.decode(value)?,
340 })
341 })
342 .try_collect()?;
343
344 let body = format.decode(self.body)?;
345
346 let record = record::Record::try_from_parts(headers, body)
347 .map_err(|e| e.to_string())?
348 .into();
349
350 let parts = types::stream::AppendRecordParts {
351 timestamp: self.timestamp,
352 record,
353 };
354
355 types::stream::AppendRecord::try_from(parts)
356 .map_err(|e| types::ValidationError(e.to_string()))
357 }
358}
359
360#[rustfmt::skip]
361#[derive(Debug, Clone, Serialize, Deserialize)]
363#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
364pub struct AppendInput {
365 pub records: Vec<AppendRecord>,
368 pub match_seq_num: Option<record::SeqNum>,
370 pub fencing_token: Option<record::FencingToken>,
372}
373
374impl AppendInput {
375 pub fn decode(
376 self,
377 format: Format,
378 ) -> Result<types::stream::AppendInput, types::ValidationError> {
379 let records: Vec<types::stream::AppendRecord> = self
380 .records
381 .into_iter()
382 .map(|record| record.decode(format))
383 .try_collect()?;
384
385 Ok(types::stream::AppendInput {
386 records: types::stream::AppendRecordBatch::try_from(records)?,
387 match_seq_num: self.match_seq_num,
388 fencing_token: self.fencing_token,
389 })
390 }
391}
392
393#[rustfmt::skip]
394#[derive(Debug, Clone, Serialize)]
396#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
397pub struct AppendAck {
398 pub start: StreamPosition,
400 pub end: StreamPosition,
403 pub tail: StreamPosition,
406}
407
408impl From<types::stream::AppendAck> for AppendAck {
409 fn from(ack: types::stream::AppendAck) -> Self {
410 Self {
411 start: ack.start.into(),
412 end: ack.end.into(),
413 tail: ack.tail.into(),
414 }
415 }
416}
417
418#[rustfmt::skip]
419#[derive(Debug, Clone, Serialize, Deserialize)]
421#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
422#[serde(rename_all = "snake_case")]
423pub enum AppendConditionFailed {
424 #[cfg_attr(feature = "utoipa", schema(title = "fencing token"))]
427 FencingTokenMismatch(record::FencingToken),
428 #[cfg_attr(feature = "utoipa", schema(title = "seq num"))]
431 SeqNumMismatch(record::SeqNum),
432}
433
434#[rustfmt::skip]
435#[derive(Debug, Clone, Serialize)]
436#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
437pub struct ReadBatch {
438 pub records: Vec<SequencedRecord>,
441 #[serde(skip_serializing_if = "Option::is_none")]
444 pub tail: Option<StreamPosition>,
445}
446
447impl ReadBatch {
448 pub fn encode(format: Format, batch: types::stream::ReadBatch) -> Self {
449 Self {
450 records: batch
451 .records
452 .into_iter()
453 .map(|record| SequencedRecord::encode(format, record))
454 .collect(),
455 tail: batch.tail.map(Into::into),
456 }
457 }
458}