s2_api/v1/stream/
mod.rs

1#[cfg(feature = "axum")]
2pub mod extract;
3
4pub mod proto;
5pub mod s2s;
6pub mod sse;
7
8use std::time::Duration;
9
10use futures::stream::BoxStream;
11use itertools::Itertools as _;
12use s2_common::{
13    record,
14    types::{
15        self,
16        stream::{StreamName, StreamNamePrefix, StreamNameStartAfter},
17    },
18};
19use serde::{Deserialize, Serialize};
20use time::OffsetDateTime;
21
22use super::config::StreamConfig;
23use crate::{data::Format, mime::JsonOrProto};
24
25#[rustfmt::skip]
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
28pub struct StreamInfo {
29    /// Stream name.
30    pub name: StreamName,
31    /// Creation time in RFC 3339 format.
32    #[serde(with = "time::serde::rfc3339")]
33    pub created_at: OffsetDateTime,
34    /// Deletion time in RFC 3339 format, if the stream is being deleted.
35    #[serde(with = "time::serde::rfc3339::option")]
36    pub deleted_at: Option<OffsetDateTime>,
37}
38
39impl From<types::stream::StreamInfo> for StreamInfo {
40    fn from(value: types::stream::StreamInfo) -> Self {
41        Self {
42            name: value.name,
43            created_at: value.created_at,
44            deleted_at: value.deleted_at,
45        }
46    }
47}
48
49#[rustfmt::skip]
50#[derive(Debug, Clone, Serialize, Deserialize)]
51#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
52#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
53pub struct ListStreamsRequest {
54    /// Filter to streams whose name begins with this prefix.
55    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
56    pub prefix: Option<StreamNamePrefix>,
57    /// Filter to streams whose name begins with this prefix.
58    /// It must be greater than or equal to the `prefix` if specified.
59    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
60    pub start_after: Option<StreamNameStartAfter>,
61    /// Number of results, up to a maximum of 1000.
62    #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
63    pub limit: Option<usize>,
64}
65
66super::impl_list_request_conversions!(
67    ListStreamsRequest,
68    types::stream::StreamNamePrefix,
69    types::stream::StreamNameStartAfter
70);
71
72#[rustfmt::skip]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
75pub struct ListStreamsResponse {
76    /// Matching streams.
77    #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
78    pub streams: Vec<StreamInfo>,
79    /// Indicates that there are more results that match the criteria.
80    pub has_more: bool,
81}
82
83#[rustfmt::skip]
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
86pub struct CreateStreamRequest {
87    /// Stream name that is unique to the basin.
88    /// It can be between 1 and 512 bytes in length.
89    pub stream: StreamName,
90    /// Stream configuration.
91    pub config: Option<StreamConfig>,
92}
93
94#[rustfmt::skip]
95#[derive(Debug, Clone, Serialize, Deserialize)]
96#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
97/// Position of a record in a stream.
98pub struct StreamPosition {
99    /// Sequence number assigned by the service.
100    pub seq_num: record::SeqNum,
101    /// Timestamp, which may be client-specified or assigned by the service.
102    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
103    pub timestamp: record::Timestamp,
104}
105
106impl From<record::StreamPosition> for StreamPosition {
107    fn from(pos: record::StreamPosition) -> Self {
108        Self {
109            seq_num: pos.seq_num,
110            timestamp: pos.timestamp,
111        }
112    }
113}
114
115impl From<StreamPosition> for record::StreamPosition {
116    fn from(pos: StreamPosition) -> Self {
117        Self {
118            seq_num: pos.seq_num,
119            timestamp: pos.timestamp,
120        }
121    }
122}
123
124#[rustfmt::skip]
125#[derive(Debug, Clone, Serialize, Deserialize)]
126#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
127pub struct TailResponse {
128    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
129    pub tail: StreamPosition,
130}
131
132#[rustfmt::skip]
133#[derive(Debug, Clone, Serialize, Deserialize)]
134#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
135#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
136pub struct ReadStart {
137    /// Start from a sequence number.
138    #[cfg_attr(feature = "utoipa", param(value_type = record::SeqNum, required = false))]
139    pub seq_num: Option<record::SeqNum>,
140    /// Start from a timestamp.
141    #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
142    pub timestamp: Option<record::Timestamp>,
143    /// Start from number of records before the next sequence number.
144    #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
145    pub tail_offset: Option<u64>,
146    /// Start reading from the tail if the requested position is beyond it.
147    /// Otherwise, a `416 Range Not Satisfiable` response is returned.
148    #[cfg_attr(feature = "utoipa", param(value_type = bool, required = false))]
149    pub clamp: Option<bool>,
150}
151
152impl TryFrom<ReadStart> for types::stream::ReadStart {
153    type Error = types::ValidationError;
154
155    fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
156        let from = match (value.seq_num, value.timestamp, value.tail_offset) {
157            (Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
158            (None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
159            (None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
160            (None, None, None) => types::stream::ReadFrom::TailOffset(0),
161            _ => {
162                return Err(types::ValidationError(
163                    "only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
164                ));
165            }
166        };
167        let clamp = value.clamp.unwrap_or(false);
168        Ok(Self { from, clamp })
169    }
170}
171
172#[rustfmt::skip]
173#[derive(Debug, Clone, Serialize, Deserialize)]
174#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
175#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
176pub struct ReadEnd {
177    /// Record count limit.
178    /// Non-streaming reads are capped by the default limit of 1000 records.
179    #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
180    pub count: Option<usize>,
181    /// Metered bytes limit.
182    /// Non-streaming reads are capped by the default limit of 1 MiB.
183    #[cfg_attr(feature = "utoipa", param(value_type = usize, required = false))]
184    pub bytes: Option<usize>,
185    /// Exclusive timestamp to read until.
186    #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
187    pub until: Option<record::Timestamp>,
188    /// Duration in seconds to wait for new records.
189    /// The default duration is 0 if there is a bound on `count`, `bytes`, or `until`, and otherwise infinite.
190    /// Non-streaming reads are always bounded on `count` and `bytes`, so you can achieve long poll semantics by specifying a non-zero duration up to 60 seconds.
191    /// In the context of an SSE or S2S streaming read, the duration will bound how much time can elapse between records throughout the lifetime of the session.
192    #[cfg_attr(feature = "utoipa", param(value_type = u32, required = false))]
193    pub wait: Option<u32>,
194}
195
196impl From<ReadEnd> for types::stream::ReadEnd {
197    fn from(value: ReadEnd) -> Self {
198        Self {
199            limit: s2_common::read_extent::ReadLimit::from_count_and_bytes(
200                value.count,
201                value.bytes,
202            ),
203            until: value.until.into(),
204            wait: value.wait.map(|w| Duration::from_secs(w as u64)),
205        }
206    }
207}
208
209#[derive(Debug, Clone, Copy)]
210pub enum ReadRequest {
211    /// Unary
212    Unary {
213        format: Format,
214        response_mime: JsonOrProto,
215    },
216    /// Server-Sent Events streaming response
217    EventStream {
218        format: Format,
219        last_event_id: Option<sse::LastEventId>,
220    },
221    /// S2S streaming response
222    S2s {
223        response_compression: s2s::CompressionAlgorithm,
224    },
225}
226
227pub enum AppendRequest {
228    /// Unary
229    Unary {
230        input: types::stream::AppendInput,
231        response_mime: JsonOrProto,
232    },
233    /// S2S bi-directional streaming
234    S2s {
235        inputs: BoxStream<'static, Result<types::stream::AppendInput, AppendInputStreamError>>,
236        response_compression: s2s::CompressionAlgorithm,
237    },
238}
239
240impl std::fmt::Debug for AppendRequest {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        match self {
243            AppendRequest::Unary {
244                input,
245                response_mime: response,
246            } => f
247                .debug_struct("AppendRequest::Unary")
248                .field("input", input)
249                .field("response", response)
250                .finish(),
251            AppendRequest::S2s { .. } => f.debug_struct("AppendRequest::S2s").finish(),
252        }
253    }
254}
255
256#[derive(Debug, thiserror::Error)]
257pub enum AppendInputStreamError {
258    #[error("Failed to decode S2S frame: {0}")]
259    FrameDecode(#[from] std::io::Error),
260    #[error(transparent)]
261    Validation(#[from] types::ValidationError),
262}
263
264/// Headers add structured information to a record as name-value pairs.
265///
266/// The name cannot be empty, with the exception of an S2 command record.
267#[derive(Debug, Clone, Serialize, Deserialize)]
268#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
269pub struct Header(pub String, pub String);
270
271#[rustfmt::skip]
272/// Record that is durably sequenced on a stream.
273#[derive(Debug, Clone, Serialize)]
274#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
275pub struct SequencedRecord {
276    /// Sequence number assigned by the service.
277    pub seq_num: record::SeqNum,
278    /// Timestamp for this record.
279    pub timestamp: record::Timestamp,
280    /// Series of name-value pairs for this record.
281    #[serde(default, skip_serializing_if = "Vec::is_empty")]
282    #[cfg_attr(feature = "utoipa", schema(required = false))]
283    pub headers: Vec<Header>,
284    /// Body of the record.
285    #[serde(default, skip_serializing_if = "String::is_empty")]
286    #[cfg_attr(feature = "utoipa", schema(required = false))]
287    pub body: String,
288}
289
290impl SequencedRecord {
291    pub fn encode(
292        format: Format,
293        record::SequencedRecord {
294            position: record::StreamPosition { seq_num, timestamp },
295            record,
296        }: record::SequencedRecord,
297    ) -> Self {
298        let (headers, body) = record.into_parts();
299        Self {
300            seq_num,
301            timestamp,
302            headers: headers
303                .into_iter()
304                .map(|h| Header(format.encode(&h.name), format.encode(&h.value)))
305                .collect(),
306            body: format.encode(&body),
307        }
308    }
309}
310
311#[rustfmt::skip]
312/// Record to be appended to a stream.
313#[derive(Debug, Clone, Serialize, Deserialize)]
314#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
315pub struct AppendRecord {
316    /// Timestamp for this record.
317    /// The service will always ensure monotonicity by adjusting it up if necessary to the maximum observed timestamp.
318    /// Refer to stream timestamping configuration for the finer semantics around whether a client-specified timestamp is required, and whether it will be capped at the arrival time.
319    pub timestamp: Option<record::Timestamp>,
320    /// Series of name-value pairs for this record.
321    #[serde(default, skip_serializing_if = "Vec::is_empty")]
322    #[cfg_attr(feature = "utoipa", schema(required = false))]
323    pub headers: Vec<Header>,
324    /// Body of the record.
325    #[serde(default, skip_serializing_if = "String::is_empty")]
326    #[cfg_attr(feature = "utoipa", schema(required = false))]
327    pub body: String,
328}
329
330impl AppendRecord {
331    pub fn decode(
332        self,
333        format: Format,
334    ) -> Result<types::stream::AppendRecord, types::ValidationError> {
335        let headers = self
336            .headers
337            .into_iter()
338            .map(|Header(name, value)| {
339                Ok::<record::Header, types::ValidationError>(record::Header {
340                    name: format.decode(name)?,
341                    value: format.decode(value)?,
342                })
343            })
344            .try_collect()?;
345
346        let body = format.decode(self.body)?;
347
348        let record = record::Record::try_from_parts(headers, body)
349            .map_err(|e| e.to_string())?
350            .into();
351
352        let parts = types::stream::AppendRecordParts {
353            timestamp: self.timestamp,
354            record,
355        };
356
357        types::stream::AppendRecord::try_from(parts)
358            .map_err(|e| types::ValidationError(e.to_string()))
359    }
360}
361
362#[rustfmt::skip]
363/// Payload of an `append` request.
364#[derive(Debug, Clone, Serialize, Deserialize)]
365#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
366pub struct AppendInput {
367    /// Batch of records to append atomically, which must contain at least one record, and no more than 1000.
368    /// The total size of a batch of records may not exceed 1 MiB of metered bytes.
369    pub records: Vec<AppendRecord>,
370    /// Enforce that the sequence number assigned to the first record matches.
371    pub match_seq_num: Option<record::SeqNum>,
372    /// Enforce a fencing token, which starts out as an empty string that can be overridden by a `fence` command record.
373    pub fencing_token: Option<record::FencingToken>,
374}
375
376impl AppendInput {
377    pub fn decode(
378        self,
379        format: Format,
380    ) -> Result<types::stream::AppendInput, types::ValidationError> {
381        let records: Vec<types::stream::AppendRecord> = self
382            .records
383            .into_iter()
384            .map(|record| record.decode(format))
385            .try_collect()?;
386
387        Ok(types::stream::AppendInput {
388            records: types::stream::AppendRecordBatch::try_from(records)?,
389            match_seq_num: self.match_seq_num,
390            fencing_token: self.fencing_token,
391        })
392    }
393}
394
395#[rustfmt::skip]
396/// Success response to an `append` request.
397#[derive(Debug, Clone, Serialize)]
398#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
399pub struct AppendAck {
400    /// Sequence number and timestamp of the first record that was appended.
401    pub start: StreamPosition,
402    /// Sequence number of the last record that was appended `+ 1`, and timestamp of the last record that was appended.
403    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records appended.
404    pub end: StreamPosition,
405    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record on the stream.
406    /// This can be greater than the `end` position in case of concurrent appends.
407    pub tail: StreamPosition,
408}
409
410impl From<types::stream::AppendAck> for AppendAck {
411    fn from(ack: types::stream::AppendAck) -> Self {
412        Self {
413            start: ack.start.into(),
414            end: ack.end.into(),
415            tail: ack.tail.into(),
416        }
417    }
418}
419
420#[rustfmt::skip]
421/// Aborted due to a failed condition.
422#[derive(Debug, Clone, Serialize, Deserialize)]
423#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
424#[serde(rename_all = "snake_case")]
425pub enum AppendConditionFailed {
426    /// Fencing token did not match.
427    /// The expected fencing token is returned.
428    #[cfg_attr(feature = "utoipa", schema(title = "fencing token"))]
429    FencingTokenMismatch(record::FencingToken),
430    /// Sequence number did not match the tail of the stream.
431    /// The expected next sequence number is returned.
432    #[cfg_attr(feature = "utoipa", schema(title = "seq num"))]
433    SeqNumMismatch(record::SeqNum),
434}
435
436#[rustfmt::skip]
437#[derive(Debug, Clone, Serialize)]
438#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
439pub struct ReadBatch {
440    /// Records that are durably sequenced on the stream, retrieved based on the requested criteria.
441    /// This can only be empty in response to a unary read (i.e. not SSE), if the request cannot be satisfied without violating an explicit bound (`count`, `bytes`, or `until`).
442    pub records: Vec<SequencedRecord>,
443    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
444    /// This will only be present when reading recent records.
445    #[serde(skip_serializing_if = "Option::is_none")]
446    pub tail: Option<StreamPosition>,
447}
448
449impl ReadBatch {
450    pub fn encode(format: Format, batch: types::stream::ReadBatch) -> Self {
451        Self {
452            records: batch
453                .records
454                .into_iter()
455                .map(|record| SequencedRecord::encode(format, record))
456                .collect(),
457            tail: batch.tail.map(Into::into),
458        }
459    }
460}