1#[cfg(feature = "axum")]
2pub mod extract;
3
4pub mod json;
5pub mod proto;
6pub mod s2s;
7pub mod sse;
8
9use std::time::Duration;
10
11use futures::stream::BoxStream;
12use itertools::Itertools as _;
13use s2_common::{
14 encryption::EncryptionSpec,
15 record,
16 types::{
17 self,
18 stream::{StreamName, StreamNamePrefix, StreamNameStartAfter},
19 },
20};
21use serde::{Deserialize, Serialize};
22use time::OffsetDateTime;
23
24use super::config::StreamConfig;
25use crate::{data::Format, mime::JsonOrProto};
26
27#[rustfmt::skip]
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
30pub struct StreamInfo {
31 pub name: StreamName,
33 #[serde(with = "time::serde::rfc3339")]
35 pub created_at: OffsetDateTime,
36 #[serde(with = "time::serde::rfc3339::option")]
38 pub deleted_at: Option<OffsetDateTime>,
39}
40
41impl From<types::stream::StreamInfo> for StreamInfo {
42 fn from(value: types::stream::StreamInfo) -> Self {
43 Self {
44 name: value.name,
45 created_at: value.created_at,
46 deleted_at: value.deleted_at,
47 }
48 }
49}
50
51#[rustfmt::skip]
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
54#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
55pub struct ListStreamsRequest {
56 #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
58 pub prefix: Option<StreamNamePrefix>,
59 #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
62 pub start_after: Option<StreamNameStartAfter>,
63 #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
65 pub limit: Option<usize>,
66}
67
68super::impl_list_request_conversions!(
69 ListStreamsRequest,
70 types::stream::StreamNamePrefix,
71 types::stream::StreamNameStartAfter
72);
73
74#[rustfmt::skip]
75#[derive(Debug, Clone, Serialize, Deserialize)]
76#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
77pub struct ListStreamsResponse {
78 #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
80 pub streams: Vec<StreamInfo>,
81 pub has_more: bool,
83}
84
85#[rustfmt::skip]
86#[derive(Debug, Clone, Serialize, Deserialize)]
87#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
88pub struct CreateStreamRequest {
89 pub stream: StreamName,
92 pub config: Option<StreamConfig>,
94}
95
96#[rustfmt::skip]
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
99pub struct StreamPosition {
101 pub seq_num: record::SeqNum,
103 pub timestamp: record::Timestamp,
106}
107
108impl From<record::StreamPosition> for StreamPosition {
109 fn from(pos: record::StreamPosition) -> Self {
110 Self {
111 seq_num: pos.seq_num,
112 timestamp: pos.timestamp,
113 }
114 }
115}
116
117impl From<StreamPosition> for record::StreamPosition {
118 fn from(pos: StreamPosition) -> Self {
119 Self {
120 seq_num: pos.seq_num,
121 timestamp: pos.timestamp,
122 }
123 }
124}
125
126#[rustfmt::skip]
127#[derive(Debug, Clone, Serialize, Deserialize)]
128#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
129pub struct TailResponse {
130 pub tail: StreamPosition,
132}
133
134#[rustfmt::skip]
135#[derive(Debug, Clone, Serialize, Deserialize)]
136#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
137#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
138pub struct ReadStart {
139 #[cfg_attr(feature = "utoipa", param(value_type = record::SeqNum, required = false))]
141 pub seq_num: Option<record::SeqNum>,
142 #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
144 pub timestamp: Option<record::Timestamp>,
145 #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
147 pub tail_offset: Option<u64>,
148 #[cfg_attr(feature = "utoipa", param(value_type = bool, required = false))]
151 pub clamp: Option<bool>,
152}
153
154impl TryFrom<ReadStart> for types::stream::ReadStart {
155 type Error = types::ValidationError;
156
157 fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
158 let from = match (value.seq_num, value.timestamp, value.tail_offset) {
159 (Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
160 (None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
161 (None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
162 (None, None, None) => types::stream::ReadFrom::TailOffset(0),
163 _ => {
164 return Err(types::ValidationError(
165 "only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
166 ));
167 }
168 };
169 let clamp = value.clamp.unwrap_or(false);
170 Ok(Self { from, clamp })
171 }
172}
173
174#[rustfmt::skip]
175#[derive(Debug, Clone, Serialize, Deserialize)]
176#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
177#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
178pub struct ReadEnd {
179 #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
182 pub count: Option<usize>,
183 #[cfg_attr(feature = "utoipa", param(value_type = usize, required = false))]
186 pub bytes: Option<usize>,
187 #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
189 pub until: Option<record::Timestamp>,
190 #[cfg_attr(feature = "utoipa", param(value_type = u32, required = false))]
195 pub wait: Option<u32>,
196}
197
198impl From<ReadEnd> for types::stream::ReadEnd {
199 fn from(value: ReadEnd) -> Self {
200 Self {
201 limit: s2_common::read_extent::ReadLimit::from_count_and_bytes(
202 value.count,
203 value.bytes,
204 ),
205 until: value.until.into(),
206 wait: value.wait.map(|w| Duration::from_secs(w as u64)),
207 }
208 }
209}
210
211#[derive(Debug, Clone)]
212pub enum ReadRequest {
213 Unary {
215 encryption: EncryptionSpec,
216 format: Format,
217 response_mime: JsonOrProto,
218 },
219 EventStream {
221 encryption: EncryptionSpec,
222 format: Format,
223 last_event_id: Option<sse::LastEventId>,
224 },
225 S2s {
227 encryption: EncryptionSpec,
228 response_compression: s2s::CompressionAlgorithm,
229 },
230}
231
232pub enum AppendRequest {
233 Unary {
235 encryption: EncryptionSpec,
236 input: types::stream::AppendInput,
237 response_mime: JsonOrProto,
238 },
239 S2s {
241 encryption: EncryptionSpec,
242 inputs: BoxStream<'static, Result<types::stream::AppendInput, AppendInputStreamError>>,
243 response_compression: s2s::CompressionAlgorithm,
244 },
245}
246
247impl std::fmt::Debug for AppendRequest {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 match self {
250 AppendRequest::Unary {
251 encryption,
252 input,
253 response_mime: response,
254 } => f
255 .debug_struct("AppendRequest::Unary")
256 .field("encryption", encryption)
257 .field("input", input)
258 .field("response", response)
259 .finish(),
260 AppendRequest::S2s {
261 encryption,
262 response_compression,
263 ..
264 } => f
265 .debug_struct("AppendRequest::S2s")
266 .field("encryption", encryption)
267 .field("response_compression", response_compression)
268 .finish(),
269 }
270 }
271}
272
273#[derive(Debug, thiserror::Error)]
274pub enum AppendInputStreamError {
275 #[error("Failed to decode S2S frame: {0}")]
276 FrameDecode(#[from] std::io::Error),
277 #[error(transparent)]
278 Validation(#[from] types::ValidationError),
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
282#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
283pub struct Header(pub String, pub String);
284
285#[rustfmt::skip]
286#[derive(Debug, Clone, Serialize)]
288#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
289pub struct SequencedRecord {
290 pub seq_num: record::SeqNum,
292 pub timestamp: record::Timestamp,
294 #[serde(default, skip_serializing_if = "Vec::is_empty")]
296 #[cfg_attr(feature = "utoipa", schema(required = false))]
297 pub headers: Vec<Header>,
298 #[serde(default, skip_serializing_if = "String::is_empty")]
300 #[cfg_attr(feature = "utoipa", schema(required = false))]
301 pub body: String,
302}
303
304impl SequencedRecord {
305 pub fn encode(format: Format, record: record::SequencedRecord) -> Self {
306 let (record::StreamPosition { seq_num, timestamp }, record) = record.into_parts();
307 let (headers, body) = record.into_parts();
308 Self {
309 seq_num,
310 timestamp,
311 headers: headers
312 .into_iter()
313 .map(|h| Header(format.encode(&h.name), format.encode(&h.value)))
314 .collect(),
315 body: format.encode(&body),
316 }
317 }
318}
319
320#[rustfmt::skip]
321#[derive(Debug, Clone, Serialize, Deserialize)]
323#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
324pub struct AppendRecord {
325 pub timestamp: Option<record::Timestamp>,
329 #[serde(default, skip_serializing_if = "Vec::is_empty")]
331 #[cfg_attr(feature = "utoipa", schema(required = false))]
332 pub headers: Vec<Header>,
333 #[serde(default, skip_serializing_if = "String::is_empty")]
335 #[cfg_attr(feature = "utoipa", schema(required = false))]
336 pub body: String,
337}
338
339impl AppendRecord {
340 pub fn decode(
341 self,
342 format: Format,
343 ) -> Result<types::stream::AppendRecord, types::ValidationError> {
344 let headers = self
345 .headers
346 .into_iter()
347 .map(|Header(name, value)| {
348 Ok::<record::Header, types::ValidationError>(record::Header {
349 name: format.decode(name)?,
350 value: format.decode(value)?,
351 })
352 })
353 .try_collect()?;
354
355 let body = format.decode(self.body)?;
356
357 let record = record::Record::try_from_parts(headers, body)
358 .map_err(|e| e.to_string())?
359 .into();
360
361 let parts = types::stream::AppendRecordParts {
362 timestamp: self.timestamp,
363 record,
364 };
365
366 types::stream::AppendRecord::try_from(parts)
367 .map_err(|e| types::ValidationError(e.to_string()))
368 }
369}
370
371#[rustfmt::skip]
372#[derive(Debug, Clone, Serialize, Deserialize)]
374#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
375pub struct AppendInput {
376 pub records: Vec<AppendRecord>,
379 pub match_seq_num: Option<record::SeqNum>,
381 pub fencing_token: Option<record::FencingToken>,
383}
384
385impl AppendInput {
386 pub fn decode(
387 self,
388 format: Format,
389 ) -> Result<types::stream::AppendInput, types::ValidationError> {
390 let records: Vec<types::stream::AppendRecord> = self
391 .records
392 .into_iter()
393 .map(|record| record.decode(format))
394 .try_collect()?;
395
396 Ok(types::stream::AppendInput {
397 records: types::stream::AppendRecordBatch::try_from(records)?,
398 match_seq_num: self.match_seq_num,
399 fencing_token: self.fencing_token,
400 })
401 }
402}
403
404#[rustfmt::skip]
405#[derive(Debug, Clone, Serialize)]
407#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
408pub struct AppendAck {
409 pub start: StreamPosition,
411 pub end: StreamPosition,
414 pub tail: StreamPosition,
417}
418
419impl From<types::stream::AppendAck> for AppendAck {
420 fn from(ack: types::stream::AppendAck) -> Self {
421 Self {
422 start: ack.start.into(),
423 end: ack.end.into(),
424 tail: ack.tail.into(),
425 }
426 }
427}
428
429#[rustfmt::skip]
430#[derive(Debug, Clone, Serialize, Deserialize)]
432#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
433#[serde(rename_all = "snake_case")]
434pub enum AppendConditionFailed {
435 #[cfg_attr(feature = "utoipa", schema(title = "fencing token"))]
438 FencingTokenMismatch(record::FencingToken),
439 #[cfg_attr(feature = "utoipa", schema(title = "seq num"))]
442 SeqNumMismatch(record::SeqNum),
443}
444
445#[rustfmt::skip]
446#[derive(Debug, Clone, Serialize)]
447#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
448pub struct ReadBatch {
449 pub records: Vec<SequencedRecord>,
452 #[serde(skip_serializing_if = "Option::is_none")]
455 pub tail: Option<StreamPosition>,
456}
457
458impl ReadBatch {
459 pub fn encode(format: Format, batch: types::stream::ReadBatch) -> Self {
460 Self {
461 records: batch
462 .records
463 .into_iter()
464 .map(|record| SequencedRecord::encode(format, record))
465 .collect(),
466 tail: batch.tail.map(Into::into),
467 }
468 }
469}