1#[cfg(feature = "axum")]
2pub mod extract;
3
4pub mod json;
5pub mod proto;
6pub mod s2s;
7pub mod sse;
8
9use std::time::Duration;
10
11use futures::stream::BoxStream;
12use itertools::Itertools as _;
13use s2_common::{
14 encryption::EncryptionKey,
15 record,
16 types::{
17 self,
18 stream::{StreamName, StreamNamePrefix, StreamNameStartAfter},
19 },
20};
21use serde::{Deserialize, Serialize};
22use time::OffsetDateTime;
23
24use super::config::{EncryptionAlgorithm, StreamConfig};
25use crate::{data::Format, mime::JsonOrProto};
26
27#[rustfmt::skip]
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
30pub struct StreamInfo {
31 pub name: StreamName,
33 #[serde(with = "time::serde::rfc3339")]
35 pub created_at: OffsetDateTime,
36 #[serde(with = "time::serde::rfc3339::option")]
38 pub deleted_at: Option<OffsetDateTime>,
39 pub cipher: Option<EncryptionAlgorithm>,
41}
42
43impl From<types::stream::StreamInfo> for StreamInfo {
44 fn from(value: types::stream::StreamInfo) -> Self {
45 Self {
46 name: value.name,
47 created_at: value.created_at,
48 deleted_at: value.deleted_at,
49 cipher: value.cipher.map(Into::into),
50 }
51 }
52}
53
54#[rustfmt::skip]
55#[derive(Debug, Clone, Serialize, Deserialize)]
56#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
57#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
58pub struct ListStreamsRequest {
59 #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
61 pub prefix: Option<StreamNamePrefix>,
62 #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
65 pub start_after: Option<StreamNameStartAfter>,
66 #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
68 pub limit: Option<usize>,
69}
70
71super::impl_list_request_conversions!(
72 ListStreamsRequest,
73 types::stream::StreamNamePrefix,
74 types::stream::StreamNameStartAfter
75);
76
77#[rustfmt::skip]
78#[derive(Debug, Clone, Serialize, Deserialize)]
79#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
80pub struct ListStreamsResponse {
81 #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
83 pub streams: Vec<StreamInfo>,
84 pub has_more: bool,
86}
87
88#[rustfmt::skip]
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
91pub struct CreateStreamRequest {
92 pub stream: StreamName,
95 pub config: Option<StreamConfig>,
97}
98
99#[rustfmt::skip]
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
102pub struct StreamPosition {
104 pub seq_num: record::SeqNum,
106 pub timestamp: record::Timestamp,
109}
110
111impl From<record::StreamPosition> for StreamPosition {
112 fn from(pos: record::StreamPosition) -> Self {
113 Self {
114 seq_num: pos.seq_num,
115 timestamp: pos.timestamp,
116 }
117 }
118}
119
120impl From<StreamPosition> for record::StreamPosition {
121 fn from(pos: StreamPosition) -> Self {
122 Self {
123 seq_num: pos.seq_num,
124 timestamp: pos.timestamp,
125 }
126 }
127}
128
129#[rustfmt::skip]
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
132pub struct TailResponse {
133 pub tail: StreamPosition,
135}
136
137#[rustfmt::skip]
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
140#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
141pub struct ReadStart {
142 #[cfg_attr(feature = "utoipa", param(value_type = record::SeqNum, required = false))]
144 pub seq_num: Option<record::SeqNum>,
145 #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
147 pub timestamp: Option<record::Timestamp>,
148 #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
150 pub tail_offset: Option<u64>,
151 #[cfg_attr(feature = "utoipa", param(value_type = bool, required = false))]
154 pub clamp: Option<bool>,
155}
156
157impl TryFrom<ReadStart> for types::stream::ReadStart {
158 type Error = types::ValidationError;
159
160 fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
161 let from = match (value.seq_num, value.timestamp, value.tail_offset) {
162 (Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
163 (None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
164 (None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
165 (None, None, None) => types::stream::ReadFrom::TailOffset(0),
166 _ => {
167 return Err(types::ValidationError(
168 "only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
169 ));
170 }
171 };
172 let clamp = value.clamp.unwrap_or(false);
173 Ok(Self { from, clamp })
174 }
175}
176
177#[rustfmt::skip]
178#[derive(Debug, Clone, Serialize, Deserialize)]
179#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
180#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
181pub struct ReadEnd {
182 #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
185 pub count: Option<usize>,
186 #[cfg_attr(feature = "utoipa", param(value_type = usize, required = false))]
189 pub bytes: Option<usize>,
190 #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
192 pub until: Option<record::Timestamp>,
193 #[cfg_attr(feature = "utoipa", param(value_type = u32, required = false))]
198 pub wait: Option<u32>,
199}
200
201impl From<ReadEnd> for types::stream::ReadEnd {
202 fn from(value: ReadEnd) -> Self {
203 Self {
204 limit: s2_common::read_extent::ReadLimit::from_count_and_bytes(
205 value.count,
206 value.bytes,
207 ),
208 until: value.until.into(),
209 wait: value.wait.map(|w| Duration::from_secs(w as u64)),
210 }
211 }
212}
213
214#[derive(Debug, Clone)]
215pub enum ReadRequest {
216 Unary {
218 encryption_key: Option<EncryptionKey>,
219 format: Format,
220 response_mime: JsonOrProto,
221 },
222 EventStream {
224 encryption_key: Option<EncryptionKey>,
225 format: Format,
226 last_event_id: Option<sse::LastEventId>,
227 },
228 S2s {
230 encryption_key: Option<EncryptionKey>,
231 response_compression: s2s::CompressionAlgorithm,
232 },
233}
234
235pub enum AppendRequest {
236 Unary {
238 encryption_key: Option<EncryptionKey>,
239 input: types::stream::AppendInput,
240 response_mime: JsonOrProto,
241 },
242 S2s {
244 encryption_key: Option<EncryptionKey>,
245 inputs: BoxStream<'static, Result<types::stream::AppendInput, AppendInputStreamError>>,
246 response_compression: s2s::CompressionAlgorithm,
247 },
248}
249
250impl std::fmt::Debug for AppendRequest {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 match self {
253 AppendRequest::Unary {
254 encryption_key,
255 input,
256 response_mime: response,
257 } => f
258 .debug_struct("AppendRequest::Unary")
259 .field("encryption_key", encryption_key)
260 .field("input", input)
261 .field("response", response)
262 .finish(),
263 AppendRequest::S2s {
264 encryption_key,
265 response_compression,
266 ..
267 } => f
268 .debug_struct("AppendRequest::S2s")
269 .field("encryption_key", encryption_key)
270 .field("response_compression", response_compression)
271 .finish(),
272 }
273 }
274}
275
276#[derive(Debug, thiserror::Error)]
277pub enum AppendInputStreamError {
278 #[error("Failed to decode S2S frame: {0}")]
279 FrameDecode(#[from] std::io::Error),
280 #[error(transparent)]
281 Validation(#[from] types::ValidationError),
282}
283
284#[derive(Debug, Clone, Serialize, Deserialize)]
285#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
286pub struct Header(pub String, pub String);
287
288#[rustfmt::skip]
289#[derive(Debug, Clone, Serialize)]
291#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
292pub struct SequencedRecord {
293 pub seq_num: record::SeqNum,
295 pub timestamp: record::Timestamp,
297 #[serde(default, skip_serializing_if = "Vec::is_empty")]
299 #[cfg_attr(feature = "utoipa", schema(required = false))]
300 pub headers: Vec<Header>,
301 #[serde(default, skip_serializing_if = "String::is_empty")]
303 #[cfg_attr(feature = "utoipa", schema(required = false))]
304 pub body: String,
305}
306
307impl SequencedRecord {
308 pub fn encode(format: Format, record: record::SequencedRecord) -> Self {
309 let (record::StreamPosition { seq_num, timestamp }, record) = record.into_parts();
310 let (headers, body) = record.into_parts();
311 Self {
312 seq_num,
313 timestamp,
314 headers: headers
315 .into_iter()
316 .map(|h| Header(format.encode(&h.name), format.encode(&h.value)))
317 .collect(),
318 body: format.encode(&body),
319 }
320 }
321}
322
323#[rustfmt::skip]
324#[derive(Debug, Clone, Serialize, Deserialize)]
326#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
327pub struct AppendRecord {
328 pub timestamp: Option<record::Timestamp>,
332 #[serde(default, skip_serializing_if = "Vec::is_empty")]
334 #[cfg_attr(feature = "utoipa", schema(required = false))]
335 pub headers: Vec<Header>,
336 #[serde(default, skip_serializing_if = "String::is_empty")]
338 #[cfg_attr(feature = "utoipa", schema(required = false))]
339 pub body: String,
340}
341
342impl AppendRecord {
343 pub fn decode(
344 self,
345 format: Format,
346 ) -> Result<types::stream::AppendRecord, types::ValidationError> {
347 let headers = self
348 .headers
349 .into_iter()
350 .map(|Header(name, value)| {
351 Ok::<record::Header, types::ValidationError>(record::Header {
352 name: format.decode(name)?,
353 value: format.decode(value)?,
354 })
355 })
356 .try_collect()?;
357
358 let body = format.decode(self.body)?;
359
360 let record = record::Record::try_from_parts(headers, body)
361 .map_err(|e| e.to_string())?
362 .into();
363
364 let parts = types::stream::AppendRecordParts {
365 timestamp: self.timestamp,
366 record,
367 };
368
369 types::stream::AppendRecord::try_from(parts)
370 .map_err(|e| types::ValidationError(e.to_string()))
371 }
372}
373
374#[rustfmt::skip]
375#[derive(Debug, Clone, Serialize, Deserialize)]
377#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
378pub struct AppendInput {
379 pub records: Vec<AppendRecord>,
382 pub match_seq_num: Option<record::SeqNum>,
384 pub fencing_token: Option<record::FencingToken>,
386}
387
388impl AppendInput {
389 pub fn decode(
390 self,
391 format: Format,
392 ) -> Result<types::stream::AppendInput, types::ValidationError> {
393 let records: Vec<types::stream::AppendRecord> = self
394 .records
395 .into_iter()
396 .map(|record| record.decode(format))
397 .try_collect()?;
398
399 Ok(types::stream::AppendInput {
400 records: types::stream::AppendRecordBatch::try_from(records)?,
401 match_seq_num: self.match_seq_num,
402 fencing_token: self.fencing_token,
403 })
404 }
405}
406
407#[rustfmt::skip]
408#[derive(Debug, Clone, Serialize)]
410#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
411pub struct AppendAck {
412 pub start: StreamPosition,
414 pub end: StreamPosition,
417 pub tail: StreamPosition,
420}
421
422impl From<types::stream::AppendAck> for AppendAck {
423 fn from(ack: types::stream::AppendAck) -> Self {
424 Self {
425 start: ack.start.into(),
426 end: ack.end.into(),
427 tail: ack.tail.into(),
428 }
429 }
430}
431
432#[rustfmt::skip]
433#[derive(Debug, Clone, Serialize, Deserialize)]
435#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
436#[serde(rename_all = "snake_case")]
437pub enum AppendConditionFailed {
438 #[cfg_attr(feature = "utoipa", schema(title = "fencing token"))]
441 FencingTokenMismatch(record::FencingToken),
442 #[cfg_attr(feature = "utoipa", schema(title = "seq num"))]
445 SeqNumMismatch(record::SeqNum),
446}
447
448#[rustfmt::skip]
449#[derive(Debug, Clone, Serialize)]
450#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
451pub struct ReadBatch {
452 pub records: Vec<SequencedRecord>,
455 #[serde(skip_serializing_if = "Option::is_none")]
458 pub tail: Option<StreamPosition>,
459}
460
461impl ReadBatch {
462 pub fn encode(format: Format, batch: types::stream::ReadBatch) -> Self {
463 Self {
464 records: batch
465 .records
466 .into_iter()
467 .map(|record| SequencedRecord::encode(format, record))
468 .collect(),
469 tail: batch.tail.map(Into::into),
470 }
471 }
472}