1#[cfg(feature = "axum")]
2pub mod extract;
3
4pub mod json;
5pub mod proto;
6pub mod s2s;
7pub mod sse;
8
9use std::time::Duration;
10
11use futures::stream::BoxStream;
12use itertools::Itertools as _;
13use s2_common::{
14 encryption::EncryptionKey,
15 record,
16 types::{
17 self,
18 stream::{StreamName, StreamNamePrefix, StreamNameStartAfter},
19 },
20};
21use serde::{Deserialize, Serialize};
22use time::OffsetDateTime;
23
24use super::config::{EncryptionAlgorithm, StreamConfig};
25use crate::{data::Format, mime::JsonOrProto};
26
27#[rustfmt::skip]
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
30pub struct StreamInfo {
31 pub name: StreamName,
33 #[serde(with = "time::serde::rfc3339")]
35 pub created_at: OffsetDateTime,
36 #[serde(with = "time::serde::rfc3339::option")]
38 pub deleted_at: Option<OffsetDateTime>,
39 pub cipher: Option<EncryptionAlgorithm>,
41}
42
43impl From<types::stream::StreamInfo> for StreamInfo {
44 fn from(value: types::stream::StreamInfo) -> Self {
45 Self {
46 name: value.name,
47 created_at: value.created_at,
48 deleted_at: value.deleted_at,
49 cipher: value.cipher.map(Into::into),
50 }
51 }
52}
53
54#[rustfmt::skip]
55#[derive(Debug, Clone, Serialize, Deserialize)]
56#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
57#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
58pub struct ListStreamsRequest {
59 #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
61 pub prefix: Option<StreamNamePrefix>,
62 #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
64 pub start_after: Option<StreamNameStartAfter>,
65 #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
67 pub limit: Option<usize>,
68}
69
70super::impl_list_request_conversions!(
71 ListStreamsRequest,
72 types::stream::StreamNamePrefix,
73 types::stream::StreamNameStartAfter
74);
75
76#[rustfmt::skip]
77#[derive(Debug, Clone, Serialize, Deserialize)]
78#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
79pub struct ListStreamsResponse {
80 #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
82 pub streams: Vec<StreamInfo>,
83 pub has_more: bool,
85}
86
87#[rustfmt::skip]
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
90pub struct CreateStreamRequest {
91 pub stream: StreamName,
94 pub config: Option<StreamConfig>,
96}
97
98#[rustfmt::skip]
99#[derive(Debug, Clone, Serialize, Deserialize)]
100#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
101pub struct StreamPosition {
103 pub seq_num: record::SeqNum,
105 pub timestamp: record::Timestamp,
108}
109
110impl From<record::StreamPosition> for StreamPosition {
111 fn from(pos: record::StreamPosition) -> Self {
112 Self {
113 seq_num: pos.seq_num,
114 timestamp: pos.timestamp,
115 }
116 }
117}
118
119impl From<StreamPosition> for record::StreamPosition {
120 fn from(pos: StreamPosition) -> Self {
121 Self {
122 seq_num: pos.seq_num,
123 timestamp: pos.timestamp,
124 }
125 }
126}
127
128#[rustfmt::skip]
129#[derive(Debug, Clone, Serialize, Deserialize)]
130#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
131pub struct TailResponse {
132 pub tail: StreamPosition,
134}
135
136#[rustfmt::skip]
137#[derive(Debug, Clone, Serialize, Deserialize)]
138#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
139#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
140pub struct ReadStart {
141 #[cfg_attr(feature = "utoipa", param(value_type = record::SeqNum, required = false))]
143 pub seq_num: Option<record::SeqNum>,
144 #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
146 pub timestamp: Option<record::Timestamp>,
147 #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
149 pub tail_offset: Option<u64>,
150 #[cfg_attr(feature = "utoipa", param(value_type = bool, required = false))]
153 pub clamp: Option<bool>,
154}
155
156impl TryFrom<ReadStart> for types::stream::ReadStart {
157 type Error = types::ValidationError;
158
159 fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
160 let from = match (value.seq_num, value.timestamp, value.tail_offset) {
161 (Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
162 (None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
163 (None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
164 (None, None, None) => types::stream::ReadFrom::TailOffset(0),
165 _ => {
166 return Err(types::ValidationError(
167 "only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
168 ));
169 }
170 };
171 let clamp = value.clamp.unwrap_or(false);
172 Ok(Self { from, clamp })
173 }
174}
175
176#[rustfmt::skip]
177#[derive(Debug, Clone, Serialize, Deserialize)]
178#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
179#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
180pub struct ReadEnd {
181 #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
184 pub count: Option<usize>,
185 #[cfg_attr(feature = "utoipa", param(value_type = usize, required = false))]
188 pub bytes: Option<usize>,
189 #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
191 pub until: Option<record::Timestamp>,
192 #[cfg_attr(feature = "utoipa", param(value_type = u32, required = false))]
197 pub wait: Option<u32>,
198}
199
200impl From<ReadEnd> for types::stream::ReadEnd {
201 fn from(value: ReadEnd) -> Self {
202 Self {
203 limit: s2_common::read_extent::ReadLimit::from_count_and_bytes(
204 value.count,
205 value.bytes,
206 ),
207 until: value.until.into(),
208 wait: value.wait.map(|w| Duration::from_secs(w as u64)),
209 }
210 }
211}
212
213#[derive(Debug, Clone)]
214pub enum ReadRequest {
215 Unary {
217 encryption_key: Option<EncryptionKey>,
218 format: Format,
219 response_mime: JsonOrProto,
220 },
221 EventStream {
223 encryption_key: Option<EncryptionKey>,
224 format: Format,
225 last_event_id: Option<sse::LastEventId>,
226 },
227 S2s {
229 encryption_key: Option<EncryptionKey>,
230 response_compression: s2s::CompressionAlgorithm,
231 },
232}
233
234pub enum AppendRequest {
235 Unary {
237 encryption_key: Option<EncryptionKey>,
238 input: types::stream::AppendInput,
239 response_mime: JsonOrProto,
240 },
241 S2s {
243 encryption_key: Option<EncryptionKey>,
244 inputs: BoxStream<'static, Result<types::stream::AppendInput, AppendInputStreamError>>,
245 response_compression: s2s::CompressionAlgorithm,
246 },
247}
248
249impl std::fmt::Debug for AppendRequest {
250 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251 match self {
252 AppendRequest::Unary {
253 encryption_key,
254 input,
255 response_mime: response,
256 } => f
257 .debug_struct("AppendRequest::Unary")
258 .field("encryption_key", encryption_key)
259 .field("input", input)
260 .field("response", response)
261 .finish(),
262 AppendRequest::S2s {
263 encryption_key,
264 response_compression,
265 ..
266 } => f
267 .debug_struct("AppendRequest::S2s")
268 .field("encryption_key", encryption_key)
269 .field("response_compression", response_compression)
270 .finish(),
271 }
272 }
273}
274
275#[derive(Debug, thiserror::Error)]
276pub enum AppendInputStreamError {
277 #[error("Failed to decode S2S frame: {0}")]
278 FrameDecode(#[from] std::io::Error),
279 #[error(transparent)]
280 Validation(#[from] types::ValidationError),
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
284#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
285pub struct Header(pub String, pub String);
286
287#[rustfmt::skip]
288#[derive(Debug, Clone, Serialize)]
290#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
291pub struct SequencedRecord {
292 pub seq_num: record::SeqNum,
294 pub timestamp: record::Timestamp,
296 #[serde(default, skip_serializing_if = "Vec::is_empty")]
298 #[cfg_attr(feature = "utoipa", schema(required = false))]
299 pub headers: Vec<Header>,
300 #[serde(default, skip_serializing_if = "String::is_empty")]
302 #[cfg_attr(feature = "utoipa", schema(required = false))]
303 pub body: String,
304}
305
306impl SequencedRecord {
307 pub fn encode(format: Format, record: record::SequencedRecord) -> Self {
308 let (record::StreamPosition { seq_num, timestamp }, record) = record.into_parts();
309 let (headers, body) = record.into_parts();
310 Self {
311 seq_num,
312 timestamp,
313 headers: headers
314 .into_iter()
315 .map(|h| Header(format.encode(&h.name), format.encode(&h.value)))
316 .collect(),
317 body: format.encode(&body),
318 }
319 }
320}
321
322#[rustfmt::skip]
323#[derive(Debug, Clone, Serialize, Deserialize)]
325#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
326pub struct AppendRecord {
327 pub timestamp: Option<record::Timestamp>,
331 #[serde(default, skip_serializing_if = "Vec::is_empty")]
333 #[cfg_attr(feature = "utoipa", schema(required = false))]
334 pub headers: Vec<Header>,
335 #[serde(default, skip_serializing_if = "String::is_empty")]
337 #[cfg_attr(feature = "utoipa", schema(required = false))]
338 pub body: String,
339}
340
341impl AppendRecord {
342 pub fn decode(
343 self,
344 format: Format,
345 ) -> Result<types::stream::AppendRecord, types::ValidationError> {
346 let headers = self
347 .headers
348 .into_iter()
349 .map(|Header(name, value)| {
350 Ok::<record::Header, types::ValidationError>(record::Header {
351 name: format.decode(name)?,
352 value: format.decode(value)?,
353 })
354 })
355 .try_collect()?;
356
357 let body = format.decode(self.body)?;
358
359 let record = record::Record::try_from_parts(headers, body)
360 .map_err(|e| e.to_string())?
361 .into();
362
363 let parts = types::stream::AppendRecordParts {
364 timestamp: self.timestamp,
365 record,
366 };
367
368 types::stream::AppendRecord::try_from(parts)
369 .map_err(|e| types::ValidationError(e.to_string()))
370 }
371}
372
373#[rustfmt::skip]
374#[derive(Debug, Clone, Serialize, Deserialize)]
376#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
377pub struct AppendInput {
378 pub records: Vec<AppendRecord>,
381 pub match_seq_num: Option<record::SeqNum>,
383 pub fencing_token: Option<record::FencingToken>,
385}
386
387impl AppendInput {
388 pub fn decode(
389 self,
390 format: Format,
391 ) -> Result<types::stream::AppendInput, types::ValidationError> {
392 let records: Vec<types::stream::AppendRecord> = self
393 .records
394 .into_iter()
395 .map(|record| record.decode(format))
396 .try_collect()?;
397
398 Ok(types::stream::AppendInput {
399 records: types::stream::AppendRecordBatch::try_from(records)?,
400 match_seq_num: self.match_seq_num,
401 fencing_token: self.fencing_token,
402 })
403 }
404}
405
406#[rustfmt::skip]
407#[derive(Debug, Clone, Serialize)]
409#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
410pub struct AppendAck {
411 pub start: StreamPosition,
413 pub end: StreamPosition,
416 pub tail: StreamPosition,
419}
420
421impl From<types::stream::AppendAck> for AppendAck {
422 fn from(ack: types::stream::AppendAck) -> Self {
423 Self {
424 start: ack.start.into(),
425 end: ack.end.into(),
426 tail: ack.tail.into(),
427 }
428 }
429}
430
431#[rustfmt::skip]
432#[derive(Debug, Clone, Serialize, Deserialize)]
434#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
435#[serde(rename_all = "snake_case")]
436pub enum AppendConditionFailed {
437 #[cfg_attr(feature = "utoipa", schema(title = "fencing token"))]
440 FencingTokenMismatch(record::FencingToken),
441 #[cfg_attr(feature = "utoipa", schema(title = "seq num"))]
444 SeqNumMismatch(record::SeqNum),
445}
446
447#[rustfmt::skip]
448#[derive(Debug, Clone, Serialize)]
449#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
450pub struct ReadBatch {
451 pub records: Vec<SequencedRecord>,
454 #[serde(skip_serializing_if = "Option::is_none")]
457 pub tail: Option<StreamPosition>,
458}
459
460impl ReadBatch {
461 pub fn encode(format: Format, batch: types::stream::ReadBatch) -> Self {
462 Self {
463 records: batch
464 .records
465 .into_iter()
466 .map(|record| SequencedRecord::encode(format, record))
467 .collect(),
468 tail: batch.tail.map(Into::into),
469 }
470 }
471}