Skip to main content

s2_api/v1/stream/
mod.rs

1#[cfg(feature = "axum")]
2pub mod extract;
3
4pub mod json;
5pub mod proto;
6pub mod s2s;
7pub mod sse;
8
9use std::time::Duration;
10
11use futures::stream::BoxStream;
12use itertools::Itertools as _;
13use s2_common::{
14    encryption::EncryptionSpec,
15    record,
16    types::{
17        self,
18        stream::{StreamName, StreamNamePrefix, StreamNameStartAfter},
19    },
20};
21use serde::{Deserialize, Serialize};
22use time::OffsetDateTime;
23
24use super::config::StreamConfig;
25use crate::{data::Format, mime::JsonOrProto};
26
27#[rustfmt::skip]
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
30pub struct StreamInfo {
31    /// Stream name.
32    pub name: StreamName,
33    /// Creation time in RFC 3339 format.
34    #[serde(with = "time::serde::rfc3339")]
35    pub created_at: OffsetDateTime,
36    /// Deletion time in RFC 3339 format, if the stream is being deleted.
37    #[serde(with = "time::serde::rfc3339::option")]
38    pub deleted_at: Option<OffsetDateTime>,
39}
40
41impl From<types::stream::StreamInfo> for StreamInfo {
42    fn from(value: types::stream::StreamInfo) -> Self {
43        Self {
44            name: value.name,
45            created_at: value.created_at,
46            deleted_at: value.deleted_at,
47        }
48    }
49}
50
51#[rustfmt::skip]
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
54#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
55pub struct ListStreamsRequest {
56    /// Filter to streams whose names begin with this prefix.
57    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
58    pub prefix: Option<StreamNamePrefix>,
59    /// Filter to streams whose names lexicographically start after this string.
60    /// It must be greater than or equal to the `prefix` if specified.
61    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
62    pub start_after: Option<StreamNameStartAfter>,
63    /// Number of results, up to a maximum of 1000.
64    #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
65    pub limit: Option<usize>,
66}
67
68super::impl_list_request_conversions!(
69    ListStreamsRequest,
70    types::stream::StreamNamePrefix,
71    types::stream::StreamNameStartAfter
72);
73
74#[rustfmt::skip]
75#[derive(Debug, Clone, Serialize, Deserialize)]
76#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
77pub struct ListStreamsResponse {
78    /// Matching streams.
79    #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
80    pub streams: Vec<StreamInfo>,
81    /// Indicates that there are more results that match the criteria.
82    pub has_more: bool,
83}
84
85#[rustfmt::skip]
86#[derive(Debug, Clone, Serialize, Deserialize)]
87#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
88pub struct CreateStreamRequest {
89    /// Stream name that is unique to the basin.
90    /// It can be between 1 and 512 bytes in length.
91    pub stream: StreamName,
92    /// Stream configuration.
93    pub config: Option<StreamConfig>,
94}
95
96#[rustfmt::skip]
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
99/// Position of a record in a stream.
100pub struct StreamPosition {
101    /// Sequence number assigned by the service.
102    pub seq_num: record::SeqNum,
103    /// Timestamp, which may be client-specified or assigned by the service.
104    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
105    pub timestamp: record::Timestamp,
106}
107
108impl From<record::StreamPosition> for StreamPosition {
109    fn from(pos: record::StreamPosition) -> Self {
110        Self {
111            seq_num: pos.seq_num,
112            timestamp: pos.timestamp,
113        }
114    }
115}
116
117impl From<StreamPosition> for record::StreamPosition {
118    fn from(pos: StreamPosition) -> Self {
119        Self {
120            seq_num: pos.seq_num,
121            timestamp: pos.timestamp,
122        }
123    }
124}
125
126#[rustfmt::skip]
127#[derive(Debug, Clone, Serialize, Deserialize)]
128#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
129pub struct TailResponse {
130    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
131    pub tail: StreamPosition,
132}
133
134#[rustfmt::skip]
135#[derive(Debug, Clone, Serialize, Deserialize)]
136#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
137#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
138pub struct ReadStart {
139    /// Start from a sequence number.
140    #[cfg_attr(feature = "utoipa", param(value_type = record::SeqNum, required = false))]
141    pub seq_num: Option<record::SeqNum>,
142    /// Start from a timestamp.
143    #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
144    pub timestamp: Option<record::Timestamp>,
145    /// Start from number of records before the next sequence number.
146    #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
147    pub tail_offset: Option<u64>,
148    /// Start reading from the tail if the requested position is beyond it.
149    /// Otherwise, a `416 Range Not Satisfiable` response is returned.
150    #[cfg_attr(feature = "utoipa", param(value_type = bool, required = false))]
151    pub clamp: Option<bool>,
152}
153
154impl TryFrom<ReadStart> for types::stream::ReadStart {
155    type Error = types::ValidationError;
156
157    fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
158        let from = match (value.seq_num, value.timestamp, value.tail_offset) {
159            (Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
160            (None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
161            (None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
162            (None, None, None) => types::stream::ReadFrom::TailOffset(0),
163            _ => {
164                return Err(types::ValidationError(
165                    "only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
166                ));
167            }
168        };
169        let clamp = value.clamp.unwrap_or(false);
170        Ok(Self { from, clamp })
171    }
172}
173
174#[rustfmt::skip]
175#[derive(Debug, Clone, Serialize, Deserialize)]
176#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
177#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
178pub struct ReadEnd {
179    /// Record count limit.
180    /// Non-streaming reads are capped by the default limit of 1000 records.
181    #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
182    pub count: Option<usize>,
183    /// Metered bytes limit.
184    /// Non-streaming reads are capped by the default limit of 1 MiB.
185    #[cfg_attr(feature = "utoipa", param(value_type = usize, required = false))]
186    pub bytes: Option<usize>,
187    /// Exclusive timestamp to read until.
188    #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
189    pub until: Option<record::Timestamp>,
190    /// Duration in seconds to wait for new records.
191    /// The default duration is 0 if there is a bound on `count`, `bytes`, or `until`, and otherwise infinite.
192    /// Non-streaming reads are always bounded on `count` and `bytes`, so you can achieve long poll semantics by specifying a non-zero duration up to 60 seconds.
193    /// In the context of an SSE or S2S streaming read, the duration will bound how much time can elapse between records throughout the lifetime of the session.
194    #[cfg_attr(feature = "utoipa", param(value_type = u32, required = false))]
195    pub wait: Option<u32>,
196}
197
198impl From<ReadEnd> for types::stream::ReadEnd {
199    fn from(value: ReadEnd) -> Self {
200        Self {
201            limit: s2_common::read_extent::ReadLimit::from_count_and_bytes(
202                value.count,
203                value.bytes,
204            ),
205            until: value.until.into(),
206            wait: value.wait.map(|w| Duration::from_secs(w as u64)),
207        }
208    }
209}
210
211#[derive(Debug, Clone)]
212pub enum ReadRequest {
213    /// Unary
214    Unary {
215        encryption: EncryptionSpec,
216        format: Format,
217        response_mime: JsonOrProto,
218    },
219    /// Server-Sent Events streaming response
220    EventStream {
221        encryption: EncryptionSpec,
222        format: Format,
223        last_event_id: Option<sse::LastEventId>,
224    },
225    /// S2S streaming response
226    S2s {
227        encryption: EncryptionSpec,
228        response_compression: s2s::CompressionAlgorithm,
229    },
230}
231
232pub enum AppendRequest {
233    /// Unary
234    Unary {
235        encryption: EncryptionSpec,
236        input: types::stream::AppendInput,
237        response_mime: JsonOrProto,
238    },
239    /// S2S bi-directional streaming
240    S2s {
241        encryption: EncryptionSpec,
242        inputs: BoxStream<'static, Result<types::stream::AppendInput, AppendInputStreamError>>,
243        response_compression: s2s::CompressionAlgorithm,
244    },
245}
246
247impl std::fmt::Debug for AppendRequest {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        match self {
250            AppendRequest::Unary {
251                encryption,
252                input,
253                response_mime: response,
254            } => f
255                .debug_struct("AppendRequest::Unary")
256                .field("encryption", encryption)
257                .field("input", input)
258                .field("response", response)
259                .finish(),
260            AppendRequest::S2s {
261                encryption,
262                response_compression,
263                ..
264            } => f
265                .debug_struct("AppendRequest::S2s")
266                .field("encryption", encryption)
267                .field("response_compression", response_compression)
268                .finish(),
269        }
270    }
271}
272
273#[derive(Debug, thiserror::Error)]
274pub enum AppendInputStreamError {
275    #[error("Failed to decode S2S frame: {0}")]
276    FrameDecode(#[from] std::io::Error),
277    #[error(transparent)]
278    Validation(#[from] types::ValidationError),
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
282#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
283pub struct Header(pub String, pub String);
284
285#[rustfmt::skip]
286/// Record that is durably sequenced on a stream.
287#[derive(Debug, Clone, Serialize)]
288#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
289pub struct SequencedRecord {
290    /// Sequence number assigned by the service.
291    pub seq_num: record::SeqNum,
292    /// Timestamp for this record.
293    pub timestamp: record::Timestamp,
294    /// Series of name-value pairs for this record.
295    #[serde(default, skip_serializing_if = "Vec::is_empty")]
296    #[cfg_attr(feature = "utoipa", schema(required = false))]
297    pub headers: Vec<Header>,
298    /// Body of the record.
299    #[serde(default, skip_serializing_if = "String::is_empty")]
300    #[cfg_attr(feature = "utoipa", schema(required = false))]
301    pub body: String,
302}
303
304impl SequencedRecord {
305    pub fn encode(format: Format, record: record::SequencedRecord) -> Self {
306        let (record::StreamPosition { seq_num, timestamp }, record) = record.into_parts();
307        let (headers, body) = record.into_parts();
308        Self {
309            seq_num,
310            timestamp,
311            headers: headers
312                .into_iter()
313                .map(|h| Header(format.encode(&h.name), format.encode(&h.value)))
314                .collect(),
315            body: format.encode(&body),
316        }
317    }
318}
319
320#[rustfmt::skip]
321/// Record to be appended to a stream.
322#[derive(Debug, Clone, Serialize, Deserialize)]
323#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
324pub struct AppendRecord {
325    /// Timestamp for this record.
326    /// The service will always ensure monotonicity by adjusting it up if necessary to the maximum observed timestamp.
327    /// Refer to stream timestamping configuration for the finer semantics around whether a client-specified timestamp is required, and whether it will be capped at the arrival time.
328    pub timestamp: Option<record::Timestamp>,
329    /// Series of name-value pairs for this record.
330    #[serde(default, skip_serializing_if = "Vec::is_empty")]
331    #[cfg_attr(feature = "utoipa", schema(required = false))]
332    pub headers: Vec<Header>,
333    /// Body of the record.
334    #[serde(default, skip_serializing_if = "String::is_empty")]
335    #[cfg_attr(feature = "utoipa", schema(required = false))]
336    pub body: String,
337}
338
339impl AppendRecord {
340    pub fn decode(
341        self,
342        format: Format,
343    ) -> Result<types::stream::AppendRecord, types::ValidationError> {
344        let headers = self
345            .headers
346            .into_iter()
347            .map(|Header(name, value)| {
348                Ok::<record::Header, types::ValidationError>(record::Header {
349                    name: format.decode(name)?,
350                    value: format.decode(value)?,
351                })
352            })
353            .try_collect()?;
354
355        let body = format.decode(self.body)?;
356
357        let record = record::Record::try_from_parts(headers, body)
358            .map_err(|e| e.to_string())?
359            .into();
360
361        let parts = types::stream::AppendRecordParts {
362            timestamp: self.timestamp,
363            record,
364        };
365
366        types::stream::AppendRecord::try_from(parts)
367            .map_err(|e| types::ValidationError(e.to_string()))
368    }
369}
370
371#[rustfmt::skip]
372/// Payload of an `append` request.
373#[derive(Debug, Clone, Serialize, Deserialize)]
374#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
375pub struct AppendInput {
376    /// Batch of records to append atomically, which must contain at least one record, and no more than 1000.
377    /// The total size of a batch of records may not exceed 1 MiB of metered bytes.
378    pub records: Vec<AppendRecord>,
379    /// Enforce that the sequence number assigned to the first record matches.
380    pub match_seq_num: Option<record::SeqNum>,
381    /// Enforce a fencing token, which starts out as an empty string that can be overridden by a `fence` command record.
382    pub fencing_token: Option<record::FencingToken>,
383}
384
385impl AppendInput {
386    pub fn decode(
387        self,
388        format: Format,
389    ) -> Result<types::stream::AppendInput, types::ValidationError> {
390        let records: Vec<types::stream::AppendRecord> = self
391            .records
392            .into_iter()
393            .map(|record| record.decode(format))
394            .try_collect()?;
395
396        Ok(types::stream::AppendInput {
397            records: types::stream::AppendRecordBatch::try_from(records)?,
398            match_seq_num: self.match_seq_num,
399            fencing_token: self.fencing_token,
400        })
401    }
402}
403
404#[rustfmt::skip]
405/// Success response to an `append` request.
406#[derive(Debug, Clone, Serialize)]
407#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
408pub struct AppendAck {
409    /// Sequence number and timestamp of the first record that was appended.
410    pub start: StreamPosition,
411    /// Sequence number of the last record that was appended `+ 1`, and timestamp of the last record that was appended.
412    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records appended.
413    pub end: StreamPosition,
414    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record on the stream.
415    /// This can be greater than the `end` position in case of concurrent appends.
416    pub tail: StreamPosition,
417}
418
419impl From<types::stream::AppendAck> for AppendAck {
420    fn from(ack: types::stream::AppendAck) -> Self {
421        Self {
422            start: ack.start.into(),
423            end: ack.end.into(),
424            tail: ack.tail.into(),
425        }
426    }
427}
428
429#[rustfmt::skip]
430/// Aborted due to a failed condition.
431#[derive(Debug, Clone, Serialize, Deserialize)]
432#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
433#[serde(rename_all = "snake_case")]
434pub enum AppendConditionFailed {
435    /// Fencing token did not match.
436    /// The expected fencing token is returned.
437    #[cfg_attr(feature = "utoipa", schema(title = "fencing token"))]
438    FencingTokenMismatch(record::FencingToken),
439    /// Sequence number did not match the tail of the stream.
440    /// The expected next sequence number is returned.
441    #[cfg_attr(feature = "utoipa", schema(title = "seq num"))]
442    SeqNumMismatch(record::SeqNum),
443}
444
445#[rustfmt::skip]
446#[derive(Debug, Clone, Serialize)]
447#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
448pub struct ReadBatch {
449    /// Records that are durably sequenced on the stream, retrieved based on the requested criteria.
450    /// This can only be empty in response to a unary read (i.e. not SSE), if the request cannot be satisfied without violating an explicit bound (`count`, `bytes`, or `until`).
451    pub records: Vec<SequencedRecord>,
452    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
453    /// This will only be present when reading recent records.
454    #[serde(skip_serializing_if = "Option::is_none")]
455    pub tail: Option<StreamPosition>,
456}
457
458impl ReadBatch {
459    pub fn encode(format: Format, batch: types::stream::ReadBatch) -> Self {
460        Self {
461            records: batch
462                .records
463                .into_iter()
464                .map(|record| SequencedRecord::encode(format, record))
465                .collect(),
466            tail: batch.tail.map(Into::into),
467        }
468    }
469}