Skip to main content

s2_api/v1/stream/
mod.rs

1#[cfg(feature = "axum")]
2pub mod extract;
3
4pub mod json;
5pub mod proto;
6pub mod s2s;
7pub mod sse;
8
9use std::time::Duration;
10
11use futures::stream::BoxStream;
12use itertools::Itertools as _;
13use s2_common::{
14    record,
15    types::{
16        self,
17        stream::{StreamName, StreamNamePrefix, StreamNameStartAfter},
18    },
19};
20use serde::{Deserialize, Serialize};
21use time::OffsetDateTime;
22
23use super::config::StreamConfig;
24use crate::{data::Format, mime::JsonOrProto};
25
26#[rustfmt::skip]
27#[derive(Debug, Clone, Serialize, Deserialize)]
28#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
29pub struct StreamInfo {
30    /// Stream name.
31    pub name: StreamName,
32    /// Creation time in RFC 3339 format.
33    #[serde(with = "time::serde::rfc3339")]
34    pub created_at: OffsetDateTime,
35    /// Deletion time in RFC 3339 format, if the stream is being deleted.
36    #[serde(with = "time::serde::rfc3339::option")]
37    pub deleted_at: Option<OffsetDateTime>,
38}
39
40impl From<types::stream::StreamInfo> for StreamInfo {
41    fn from(value: types::stream::StreamInfo) -> Self {
42        Self {
43            name: value.name,
44            created_at: value.created_at,
45            deleted_at: value.deleted_at,
46        }
47    }
48}
49
50#[rustfmt::skip]
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
53#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
54pub struct ListStreamsRequest {
55    /// Filter to streams whose names begin with this prefix.
56    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
57    pub prefix: Option<StreamNamePrefix>,
58    /// Filter to streams whose names lexicographically start after this string.
59    /// It must be greater than or equal to the `prefix` if specified.
60    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
61    pub start_after: Option<StreamNameStartAfter>,
62    /// Number of results, up to a maximum of 1000.
63    #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
64    pub limit: Option<usize>,
65}
66
67super::impl_list_request_conversions!(
68    ListStreamsRequest,
69    types::stream::StreamNamePrefix,
70    types::stream::StreamNameStartAfter
71);
72
73#[rustfmt::skip]
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
76pub struct ListStreamsResponse {
77    /// Matching streams.
78    #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
79    pub streams: Vec<StreamInfo>,
80    /// Indicates that there are more results that match the criteria.
81    pub has_more: bool,
82}
83
84#[rustfmt::skip]
85#[derive(Debug, Clone, Serialize, Deserialize)]
86#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
87pub struct CreateStreamRequest {
88    /// Stream name that is unique to the basin.
89    /// It can be between 1 and 512 bytes in length.
90    pub stream: StreamName,
91    /// Stream configuration.
92    pub config: Option<StreamConfig>,
93}
94
95#[rustfmt::skip]
96#[derive(Debug, Clone, Serialize, Deserialize)]
97#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
98/// Position of a record in a stream.
99pub struct StreamPosition {
100    /// Sequence number assigned by the service.
101    pub seq_num: record::SeqNum,
102    /// Timestamp, which may be client-specified or assigned by the service.
103    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
104    pub timestamp: record::Timestamp,
105}
106
107impl From<record::StreamPosition> for StreamPosition {
108    fn from(pos: record::StreamPosition) -> Self {
109        Self {
110            seq_num: pos.seq_num,
111            timestamp: pos.timestamp,
112        }
113    }
114}
115
116impl From<StreamPosition> for record::StreamPosition {
117    fn from(pos: StreamPosition) -> Self {
118        Self {
119            seq_num: pos.seq_num,
120            timestamp: pos.timestamp,
121        }
122    }
123}
124
125#[rustfmt::skip]
126#[derive(Debug, Clone, Serialize, Deserialize)]
127#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
128pub struct TailResponse {
129    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
130    pub tail: StreamPosition,
131}
132
133#[rustfmt::skip]
134#[derive(Debug, Clone, Serialize, Deserialize)]
135#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
136#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
137pub struct ReadStart {
138    /// Start from a sequence number.
139    #[cfg_attr(feature = "utoipa", param(value_type = record::SeqNum, required = false))]
140    pub seq_num: Option<record::SeqNum>,
141    /// Start from a timestamp.
142    #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
143    pub timestamp: Option<record::Timestamp>,
144    /// Start from number of records before the next sequence number.
145    #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
146    pub tail_offset: Option<u64>,
147    /// Start reading from the tail if the requested position is beyond it.
148    /// Otherwise, a `416 Range Not Satisfiable` response is returned.
149    #[cfg_attr(feature = "utoipa", param(value_type = bool, required = false))]
150    pub clamp: Option<bool>,
151}
152
153impl TryFrom<ReadStart> for types::stream::ReadStart {
154    type Error = types::ValidationError;
155
156    fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
157        let from = match (value.seq_num, value.timestamp, value.tail_offset) {
158            (Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
159            (None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
160            (None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
161            (None, None, None) => types::stream::ReadFrom::TailOffset(0),
162            _ => {
163                return Err(types::ValidationError(
164                    "only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
165                ));
166            }
167        };
168        let clamp = value.clamp.unwrap_or(false);
169        Ok(Self { from, clamp })
170    }
171}
172
173#[rustfmt::skip]
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
176#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
177pub struct ReadEnd {
178    /// Record count limit.
179    /// Non-streaming reads are capped by the default limit of 1000 records.
180    #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
181    pub count: Option<usize>,
182    /// Metered bytes limit.
183    /// Non-streaming reads are capped by the default limit of 1 MiB.
184    #[cfg_attr(feature = "utoipa", param(value_type = usize, required = false))]
185    pub bytes: Option<usize>,
186    /// Exclusive timestamp to read until.
187    #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
188    pub until: Option<record::Timestamp>,
189    /// Duration in seconds to wait for new records.
190    /// The default duration is 0 if there is a bound on `count`, `bytes`, or `until`, and otherwise infinite.
191    /// Non-streaming reads are always bounded on `count` and `bytes`, so you can achieve long poll semantics by specifying a non-zero duration up to 60 seconds.
192    /// In the context of an SSE or S2S streaming read, the duration will bound how much time can elapse between records throughout the lifetime of the session.
193    #[cfg_attr(feature = "utoipa", param(value_type = u32, required = false))]
194    pub wait: Option<u32>,
195}
196
197impl From<ReadEnd> for types::stream::ReadEnd {
198    fn from(value: ReadEnd) -> Self {
199        Self {
200            limit: s2_common::read_extent::ReadLimit::from_count_and_bytes(
201                value.count,
202                value.bytes,
203            ),
204            until: value.until.into(),
205            wait: value.wait.map(|w| Duration::from_secs(w as u64)),
206        }
207    }
208}
209
210#[derive(Debug, Clone, Copy)]
211pub enum ReadRequest {
212    /// Unary
213    Unary {
214        format: Format,
215        response_mime: JsonOrProto,
216    },
217    /// Server-Sent Events streaming response
218    EventStream {
219        format: Format,
220        last_event_id: Option<sse::LastEventId>,
221    },
222    /// S2S streaming response
223    S2s {
224        response_compression: s2s::CompressionAlgorithm,
225    },
226}
227
228pub enum AppendRequest {
229    /// Unary
230    Unary {
231        input: types::stream::AppendInput,
232        response_mime: JsonOrProto,
233    },
234    /// S2S bi-directional streaming
235    S2s {
236        inputs: BoxStream<'static, Result<types::stream::AppendInput, AppendInputStreamError>>,
237        response_compression: s2s::CompressionAlgorithm,
238    },
239}
240
241impl std::fmt::Debug for AppendRequest {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        match self {
244            AppendRequest::Unary {
245                input,
246                response_mime: response,
247            } => f
248                .debug_struct("AppendRequest::Unary")
249                .field("input", input)
250                .field("response", response)
251                .finish(),
252            AppendRequest::S2s { .. } => f.debug_struct("AppendRequest::S2s").finish(),
253        }
254    }
255}
256
257#[derive(Debug, thiserror::Error)]
258pub enum AppendInputStreamError {
259    #[error("Failed to decode S2S frame: {0}")]
260    FrameDecode(#[from] std::io::Error),
261    #[error(transparent)]
262    Validation(#[from] types::ValidationError),
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
267pub struct Header(pub String, pub String);
268
269#[rustfmt::skip]
270/// Record that is durably sequenced on a stream.
271#[derive(Debug, Clone, Serialize)]
272#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
273pub struct SequencedRecord {
274    /// Sequence number assigned by the service.
275    pub seq_num: record::SeqNum,
276    /// Timestamp for this record.
277    pub timestamp: record::Timestamp,
278    /// Series of name-value pairs for this record.
279    #[serde(default, skip_serializing_if = "Vec::is_empty")]
280    #[cfg_attr(feature = "utoipa", schema(required = false))]
281    pub headers: Vec<Header>,
282    /// Body of the record.
283    #[serde(default, skip_serializing_if = "String::is_empty")]
284    #[cfg_attr(feature = "utoipa", schema(required = false))]
285    pub body: String,
286}
287
288impl SequencedRecord {
289    pub fn encode(
290        format: Format,
291        record::SequencedRecord {
292            position: record::StreamPosition { seq_num, timestamp },
293            record,
294        }: record::SequencedRecord,
295    ) -> Self {
296        let (headers, body) = record.into_parts();
297        Self {
298            seq_num,
299            timestamp,
300            headers: headers
301                .into_iter()
302                .map(|h| Header(format.encode(&h.name), format.encode(&h.value)))
303                .collect(),
304            body: format.encode(&body),
305        }
306    }
307}
308
309#[rustfmt::skip]
310/// Record to be appended to a stream.
311#[derive(Debug, Clone, Serialize, Deserialize)]
312#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
313pub struct AppendRecord {
314    /// Timestamp for this record.
315    /// The service will always ensure monotonicity by adjusting it up if necessary to the maximum observed timestamp.
316    /// Refer to stream timestamping configuration for the finer semantics around whether a client-specified timestamp is required, and whether it will be capped at the arrival time.
317    pub timestamp: Option<record::Timestamp>,
318    /// Series of name-value pairs for this record.
319    #[serde(default, skip_serializing_if = "Vec::is_empty")]
320    #[cfg_attr(feature = "utoipa", schema(required = false))]
321    pub headers: Vec<Header>,
322    /// Body of the record.
323    #[serde(default, skip_serializing_if = "String::is_empty")]
324    #[cfg_attr(feature = "utoipa", schema(required = false))]
325    pub body: String,
326}
327
328impl AppendRecord {
329    pub fn decode(
330        self,
331        format: Format,
332    ) -> Result<types::stream::AppendRecord, types::ValidationError> {
333        let headers = self
334            .headers
335            .into_iter()
336            .map(|Header(name, value)| {
337                Ok::<record::Header, types::ValidationError>(record::Header {
338                    name: format.decode(name)?,
339                    value: format.decode(value)?,
340                })
341            })
342            .try_collect()?;
343
344        let body = format.decode(self.body)?;
345
346        let record = record::Record::try_from_parts(headers, body)
347            .map_err(|e| e.to_string())?
348            .into();
349
350        let parts = types::stream::AppendRecordParts {
351            timestamp: self.timestamp,
352            record,
353        };
354
355        types::stream::AppendRecord::try_from(parts)
356            .map_err(|e| types::ValidationError(e.to_string()))
357    }
358}
359
360#[rustfmt::skip]
361/// Payload of an `append` request.
362#[derive(Debug, Clone, Serialize, Deserialize)]
363#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
364pub struct AppendInput {
365    /// Batch of records to append atomically, which must contain at least one record, and no more than 1000.
366    /// The total size of a batch of records may not exceed 1 MiB of metered bytes.
367    pub records: Vec<AppendRecord>,
368    /// Enforce that the sequence number assigned to the first record matches.
369    pub match_seq_num: Option<record::SeqNum>,
370    /// Enforce a fencing token, which starts out as an empty string that can be overridden by a `fence` command record.
371    pub fencing_token: Option<record::FencingToken>,
372}
373
374impl AppendInput {
375    pub fn decode(
376        self,
377        format: Format,
378    ) -> Result<types::stream::AppendInput, types::ValidationError> {
379        let records: Vec<types::stream::AppendRecord> = self
380            .records
381            .into_iter()
382            .map(|record| record.decode(format))
383            .try_collect()?;
384
385        Ok(types::stream::AppendInput {
386            records: types::stream::AppendRecordBatch::try_from(records)?,
387            match_seq_num: self.match_seq_num,
388            fencing_token: self.fencing_token,
389        })
390    }
391}
392
393#[rustfmt::skip]
394/// Success response to an `append` request.
395#[derive(Debug, Clone, Serialize)]
396#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
397pub struct AppendAck {
398    /// Sequence number and timestamp of the first record that was appended.
399    pub start: StreamPosition,
400    /// Sequence number of the last record that was appended `+ 1`, and timestamp of the last record that was appended.
401    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records appended.
402    pub end: StreamPosition,
403    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record on the stream.
404    /// This can be greater than the `end` position in case of concurrent appends.
405    pub tail: StreamPosition,
406}
407
408impl From<types::stream::AppendAck> for AppendAck {
409    fn from(ack: types::stream::AppendAck) -> Self {
410        Self {
411            start: ack.start.into(),
412            end: ack.end.into(),
413            tail: ack.tail.into(),
414        }
415    }
416}
417
418#[rustfmt::skip]
419/// Aborted due to a failed condition.
420#[derive(Debug, Clone, Serialize, Deserialize)]
421#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
422#[serde(rename_all = "snake_case")]
423pub enum AppendConditionFailed {
424    /// Fencing token did not match.
425    /// The expected fencing token is returned.
426    #[cfg_attr(feature = "utoipa", schema(title = "fencing token"))]
427    FencingTokenMismatch(record::FencingToken),
428    /// Sequence number did not match the tail of the stream.
429    /// The expected next sequence number is returned.
430    #[cfg_attr(feature = "utoipa", schema(title = "seq num"))]
431    SeqNumMismatch(record::SeqNum),
432}
433
434#[rustfmt::skip]
435#[derive(Debug, Clone, Serialize)]
436#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
437pub struct ReadBatch {
438    /// Records that are durably sequenced on the stream, retrieved based on the requested criteria.
439    /// This can only be empty in response to a unary read (i.e. not SSE), if the request cannot be satisfied without violating an explicit bound (`count`, `bytes`, or `until`).
440    pub records: Vec<SequencedRecord>,
441    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
442    /// This will only be present when reading recent records.
443    #[serde(skip_serializing_if = "Option::is_none")]
444    pub tail: Option<StreamPosition>,
445}
446
447impl ReadBatch {
448    pub fn encode(format: Format, batch: types::stream::ReadBatch) -> Self {
449        Self {
450            records: batch
451                .records
452                .into_iter()
453                .map(|record| SequencedRecord::encode(format, record))
454                .collect(),
455            tail: batch.tail.map(Into::into),
456        }
457    }
458}