Skip to main content

s2_api/v1/stream/
mod.rs

1#[cfg(feature = "axum")]
2pub mod extract;
3
4pub mod json;
5pub mod proto;
6pub mod s2s;
7pub mod sse;
8
9use std::time::Duration;
10
11use futures::stream::BoxStream;
12use itertools::Itertools as _;
13use s2_common::{
14    encryption::EncryptionKey,
15    record,
16    types::{
17        self,
18        stream::{StreamName, StreamNamePrefix, StreamNameStartAfter},
19    },
20};
21use serde::{Deserialize, Serialize};
22use time::OffsetDateTime;
23
24use super::config::{EncryptionAlgorithm, StreamConfig};
25use crate::{data::Format, mime::JsonOrProto};
26
27#[rustfmt::skip]
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
30pub struct StreamInfo {
31    /// Stream name.
32    pub name: StreamName,
33    /// Creation time in RFC 3339 format.
34    #[serde(with = "time::serde::rfc3339")]
35    pub created_at: OffsetDateTime,
36    /// Deletion time in RFC 3339 format, if the stream is being deleted.
37    #[serde(with = "time::serde::rfc3339::option")]
38    pub deleted_at: Option<OffsetDateTime>,
39    /// Encryption algorithm for this stream, if encryption is enabled.
40    pub cipher: Option<EncryptionAlgorithm>,
41}
42
43impl From<types::stream::StreamInfo> for StreamInfo {
44    fn from(value: types::stream::StreamInfo) -> Self {
45        Self {
46            name: value.name,
47            created_at: value.created_at,
48            deleted_at: value.deleted_at,
49            cipher: value.cipher.map(Into::into),
50        }
51    }
52}
53
54#[rustfmt::skip]
55#[derive(Debug, Clone, Serialize, Deserialize)]
56#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
57#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
58pub struct ListStreamsRequest {
59    /// Filter to streams whose names begin with this prefix.
60    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
61    pub prefix: Option<StreamNamePrefix>,
62    /// Filter to streams whose names lexicographically start after this string.
63    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
64    pub start_after: Option<StreamNameStartAfter>,
65    /// Number of results, up to a maximum of 1000.
66    #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
67    pub limit: Option<usize>,
68}
69
70super::impl_list_request_conversions!(
71    ListStreamsRequest,
72    types::stream::StreamNamePrefix,
73    types::stream::StreamNameStartAfter
74);
75
76#[rustfmt::skip]
77#[derive(Debug, Clone, Serialize, Deserialize)]
78#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
79pub struct ListStreamsResponse {
80    /// Matching streams.
81    #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
82    pub streams: Vec<StreamInfo>,
83    /// Indicates that there are more results that match the criteria.
84    pub has_more: bool,
85}
86
87#[rustfmt::skip]
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
90pub struct CreateStreamRequest {
91    /// Stream name that is unique to the basin.
92    /// It can be between 1 and 512 bytes in length.
93    pub stream: StreamName,
94    /// Stream configuration.
95    pub config: Option<StreamConfig>,
96}
97
98#[rustfmt::skip]
99#[derive(Debug, Clone, Serialize, Deserialize)]
100#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
101/// Position of a record in a stream.
102pub struct StreamPosition {
103    /// Sequence number assigned by the service.
104    pub seq_num: record::SeqNum,
105    /// Timestamp, which may be client-specified or assigned by the service.
106    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
107    pub timestamp: record::Timestamp,
108}
109
110impl From<record::StreamPosition> for StreamPosition {
111    fn from(pos: record::StreamPosition) -> Self {
112        Self {
113            seq_num: pos.seq_num,
114            timestamp: pos.timestamp,
115        }
116    }
117}
118
119impl From<StreamPosition> for record::StreamPosition {
120    fn from(pos: StreamPosition) -> Self {
121        Self {
122            seq_num: pos.seq_num,
123            timestamp: pos.timestamp,
124        }
125    }
126}
127
128#[rustfmt::skip]
129#[derive(Debug, Clone, Serialize, Deserialize)]
130#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
131pub struct TailResponse {
132    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
133    pub tail: StreamPosition,
134}
135
136#[rustfmt::skip]
137#[derive(Debug, Clone, Serialize, Deserialize)]
138#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
139#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
140pub struct ReadStart {
141    /// Start from a sequence number.
142    #[cfg_attr(feature = "utoipa", param(value_type = record::SeqNum, required = false))]
143    pub seq_num: Option<record::SeqNum>,
144    /// Start from a timestamp.
145    #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
146    pub timestamp: Option<record::Timestamp>,
147    /// Start from number of records before the next sequence number.
148    #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
149    pub tail_offset: Option<u64>,
150    /// Start reading from the tail if the requested position is beyond it.
151    /// Otherwise, a `416 Range Not Satisfiable` response is returned.
152    #[cfg_attr(feature = "utoipa", param(value_type = bool, required = false))]
153    pub clamp: Option<bool>,
154}
155
156impl TryFrom<ReadStart> for types::stream::ReadStart {
157    type Error = types::ValidationError;
158
159    fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
160        let from = match (value.seq_num, value.timestamp, value.tail_offset) {
161            (Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
162            (None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
163            (None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
164            (None, None, None) => types::stream::ReadFrom::TailOffset(0),
165            _ => {
166                return Err(types::ValidationError(
167                    "only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
168                ));
169            }
170        };
171        let clamp = value.clamp.unwrap_or(false);
172        Ok(Self { from, clamp })
173    }
174}
175
176#[rustfmt::skip]
177#[derive(Debug, Clone, Serialize, Deserialize)]
178#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
179#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
180pub struct ReadEnd {
181    /// Record count limit.
182    /// Non-streaming reads are capped by the default limit of 1000 records.
183    #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
184    pub count: Option<usize>,
185    /// Metered bytes limit.
186    /// Non-streaming reads are capped by the default limit of 1 MiB.
187    #[cfg_attr(feature = "utoipa", param(value_type = usize, required = false))]
188    pub bytes: Option<usize>,
189    /// Exclusive timestamp to read until.
190    #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
191    pub until: Option<record::Timestamp>,
192    /// Duration in seconds to wait for new records.
193    /// The default duration is 0 if there is a bound on `count`, `bytes`, or `until`, and otherwise infinite.
194    /// Non-streaming reads are always bounded on `count` and `bytes`, so you can achieve long poll semantics by specifying a non-zero duration up to 60 seconds.
195    /// In the context of an SSE or S2S streaming read, the duration will bound how much time can elapse between records throughout the lifetime of the session.
196    #[cfg_attr(feature = "utoipa", param(value_type = u32, required = false))]
197    pub wait: Option<u32>,
198}
199
200impl From<ReadEnd> for types::stream::ReadEnd {
201    fn from(value: ReadEnd) -> Self {
202        Self {
203            limit: s2_common::read_extent::ReadLimit::from_count_and_bytes(
204                value.count,
205                value.bytes,
206            ),
207            until: value.until.into(),
208            wait: value.wait.map(|w| Duration::from_secs(w as u64)),
209        }
210    }
211}
212
213#[derive(Debug, Clone)]
214pub enum ReadRequest {
215    /// Unary
216    Unary {
217        encryption_key: Option<EncryptionKey>,
218        format: Format,
219        response_mime: JsonOrProto,
220    },
221    /// Server-Sent Events streaming response
222    EventStream {
223        encryption_key: Option<EncryptionKey>,
224        format: Format,
225        last_event_id: Option<sse::LastEventId>,
226    },
227    /// S2S streaming response
228    S2s {
229        encryption_key: Option<EncryptionKey>,
230        response_compression: s2s::CompressionAlgorithm,
231    },
232}
233
234pub enum AppendRequest {
235    /// Unary
236    Unary {
237        encryption_key: Option<EncryptionKey>,
238        input: types::stream::AppendInput,
239        response_mime: JsonOrProto,
240    },
241    /// S2S bi-directional streaming
242    S2s {
243        encryption_key: Option<EncryptionKey>,
244        inputs: BoxStream<'static, Result<types::stream::AppendInput, AppendInputStreamError>>,
245        response_compression: s2s::CompressionAlgorithm,
246    },
247}
248
249impl std::fmt::Debug for AppendRequest {
250    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251        match self {
252            AppendRequest::Unary {
253                encryption_key,
254                input,
255                response_mime: response,
256            } => f
257                .debug_struct("AppendRequest::Unary")
258                .field("encryption_key", encryption_key)
259                .field("input", input)
260                .field("response", response)
261                .finish(),
262            AppendRequest::S2s {
263                encryption_key,
264                response_compression,
265                ..
266            } => f
267                .debug_struct("AppendRequest::S2s")
268                .field("encryption_key", encryption_key)
269                .field("response_compression", response_compression)
270                .finish(),
271        }
272    }
273}
274
275#[derive(Debug, thiserror::Error)]
276pub enum AppendInputStreamError {
277    #[error("Failed to decode S2S frame: {0}")]
278    FrameDecode(#[from] std::io::Error),
279    #[error(transparent)]
280    Validation(#[from] types::ValidationError),
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
284#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
285pub struct Header(pub String, pub String);
286
287#[rustfmt::skip]
288/// Record that is durably sequenced on a stream.
289#[derive(Debug, Clone, Serialize)]
290#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
291pub struct SequencedRecord {
292    /// Sequence number assigned by the service.
293    pub seq_num: record::SeqNum,
294    /// Timestamp for this record.
295    pub timestamp: record::Timestamp,
296    /// Series of name-value pairs for this record.
297    #[serde(default, skip_serializing_if = "Vec::is_empty")]
298    #[cfg_attr(feature = "utoipa", schema(required = false))]
299    pub headers: Vec<Header>,
300    /// Body of the record.
301    #[serde(default, skip_serializing_if = "String::is_empty")]
302    #[cfg_attr(feature = "utoipa", schema(required = false))]
303    pub body: String,
304}
305
306impl SequencedRecord {
307    pub fn encode(format: Format, record: record::SequencedRecord) -> Self {
308        let (record::StreamPosition { seq_num, timestamp }, record) = record.into_parts();
309        let (headers, body) = record.into_parts();
310        Self {
311            seq_num,
312            timestamp,
313            headers: headers
314                .into_iter()
315                .map(|h| Header(format.encode(&h.name), format.encode(&h.value)))
316                .collect(),
317            body: format.encode(&body),
318        }
319    }
320}
321
322#[rustfmt::skip]
323/// Record to be appended to a stream.
324#[derive(Debug, Clone, Serialize, Deserialize)]
325#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
326pub struct AppendRecord {
327    /// Timestamp for this record.
328    /// The service will always ensure monotonicity by adjusting it up if necessary to the maximum observed timestamp.
329    /// Refer to stream timestamping configuration for the finer semantics around whether a client-specified timestamp is required, and whether it will be capped at the arrival time.
330    pub timestamp: Option<record::Timestamp>,
331    /// Series of name-value pairs for this record.
332    #[serde(default, skip_serializing_if = "Vec::is_empty")]
333    #[cfg_attr(feature = "utoipa", schema(required = false))]
334    pub headers: Vec<Header>,
335    /// Body of the record.
336    #[serde(default, skip_serializing_if = "String::is_empty")]
337    #[cfg_attr(feature = "utoipa", schema(required = false))]
338    pub body: String,
339}
340
341impl AppendRecord {
342    pub fn decode(
343        self,
344        format: Format,
345    ) -> Result<types::stream::AppendRecord, types::ValidationError> {
346        let headers = self
347            .headers
348            .into_iter()
349            .map(|Header(name, value)| {
350                Ok::<record::Header, types::ValidationError>(record::Header {
351                    name: format.decode(name)?,
352                    value: format.decode(value)?,
353                })
354            })
355            .try_collect()?;
356
357        let body = format.decode(self.body)?;
358
359        let record = record::Record::try_from_parts(headers, body)
360            .map_err(|e| e.to_string())?
361            .into();
362
363        let parts = types::stream::AppendRecordParts {
364            timestamp: self.timestamp,
365            record,
366        };
367
368        types::stream::AppendRecord::try_from(parts)
369            .map_err(|e| types::ValidationError(e.to_string()))
370    }
371}
372
373#[rustfmt::skip]
374/// Payload of an `append` request.
375#[derive(Debug, Clone, Serialize, Deserialize)]
376#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
377pub struct AppendInput {
378    /// Batch of records to append atomically, which must contain at least one record, and no more than 1000.
379    /// The total size of a batch of records may not exceed 1 MiB of metered bytes.
380    pub records: Vec<AppendRecord>,
381    /// Enforce that the sequence number assigned to the first record matches.
382    pub match_seq_num: Option<record::SeqNum>,
383    /// Enforce a fencing token, which starts out as an empty string that can be overridden by a `fence` command record.
384    pub fencing_token: Option<record::FencingToken>,
385}
386
387impl AppendInput {
388    pub fn decode(
389        self,
390        format: Format,
391    ) -> Result<types::stream::AppendInput, types::ValidationError> {
392        let records: Vec<types::stream::AppendRecord> = self
393            .records
394            .into_iter()
395            .map(|record| record.decode(format))
396            .try_collect()?;
397
398        Ok(types::stream::AppendInput {
399            records: types::stream::AppendRecordBatch::try_from(records)?,
400            match_seq_num: self.match_seq_num,
401            fencing_token: self.fencing_token,
402        })
403    }
404}
405
406#[rustfmt::skip]
407/// Success response to an `append` request.
408#[derive(Debug, Clone, Serialize)]
409#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
410pub struct AppendAck {
411    /// Sequence number and timestamp of the first record that was appended.
412    pub start: StreamPosition,
413    /// Sequence number of the last record that was appended `+ 1`, and timestamp of the last record that was appended.
414    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records appended.
415    pub end: StreamPosition,
416    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record on the stream.
417    /// This can be greater than the `end` position in case of concurrent appends.
418    pub tail: StreamPosition,
419}
420
421impl From<types::stream::AppendAck> for AppendAck {
422    fn from(ack: types::stream::AppendAck) -> Self {
423        Self {
424            start: ack.start.into(),
425            end: ack.end.into(),
426            tail: ack.tail.into(),
427        }
428    }
429}
430
431#[rustfmt::skip]
432/// Aborted due to a failed condition.
433#[derive(Debug, Clone, Serialize, Deserialize)]
434#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
435#[serde(rename_all = "snake_case")]
436pub enum AppendConditionFailed {
437    /// Fencing token did not match.
438    /// The expected fencing token is returned.
439    #[cfg_attr(feature = "utoipa", schema(title = "fencing token"))]
440    FencingTokenMismatch(record::FencingToken),
441    /// Sequence number did not match the tail of the stream.
442    /// The expected next sequence number is returned.
443    #[cfg_attr(feature = "utoipa", schema(title = "seq num"))]
444    SeqNumMismatch(record::SeqNum),
445}
446
447#[rustfmt::skip]
448#[derive(Debug, Clone, Serialize)]
449#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
450pub struct ReadBatch {
451    /// Records that are durably sequenced on the stream, retrieved based on the requested criteria.
452    /// This can only be empty in response to a unary read (i.e. not SSE), if the request cannot be satisfied without violating an explicit bound (`count`, `bytes`, or `until`).
453    pub records: Vec<SequencedRecord>,
454    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
455    /// This will only be present when reading recent records.
456    #[serde(skip_serializing_if = "Option::is_none")]
457    pub tail: Option<StreamPosition>,
458}
459
460impl ReadBatch {
461    pub fn encode(format: Format, batch: types::stream::ReadBatch) -> Self {
462        Self {
463            records: batch
464                .records
465                .into_iter()
466                .map(|record| SequencedRecord::encode(format, record))
467                .collect(),
468            tail: batch.tail.map(Into::into),
469        }
470    }
471}