Skip to main content

s2_api/v1/stream/
mod.rs

1#[cfg(feature = "axum")]
2pub mod extract;
3
4pub mod json;
5pub mod proto;
6pub mod s2s;
7pub mod sse;
8
9use std::time::Duration;
10
11use futures::stream::BoxStream;
12use itertools::Itertools as _;
13use s2_common::{
14    encryption::EncryptionKey,
15    record,
16    types::{
17        self,
18        stream::{StreamName, StreamNamePrefix, StreamNameStartAfter},
19    },
20};
21use serde::{Deserialize, Serialize};
22use time::OffsetDateTime;
23
24use super::config::{EncryptionAlgorithm, StreamConfig};
25use crate::{data::Format, mime::JsonOrProto};
26
27#[rustfmt::skip]
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
30pub struct StreamInfo {
31    /// Stream name.
32    pub name: StreamName,
33    /// Creation time in RFC 3339 format.
34    #[serde(with = "time::serde::rfc3339")]
35    pub created_at: OffsetDateTime,
36    /// Deletion time in RFC 3339 format, if the stream is being deleted.
37    #[serde(with = "time::serde::rfc3339::option")]
38    pub deleted_at: Option<OffsetDateTime>,
39    /// Encryption algorithm for this stream, if encryption is enabled.
40    pub cipher: Option<EncryptionAlgorithm>,
41}
42
43impl From<types::stream::StreamInfo> for StreamInfo {
44    fn from(value: types::stream::StreamInfo) -> Self {
45        Self {
46            name: value.name,
47            created_at: value.created_at,
48            deleted_at: value.deleted_at,
49            cipher: value.cipher.map(Into::into),
50        }
51    }
52}
53
54#[rustfmt::skip]
55#[derive(Debug, Clone, Serialize, Deserialize)]
56#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
57#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
58pub struct ListStreamsRequest {
59    /// Filter to streams whose names begin with this prefix.
60    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
61    pub prefix: Option<StreamNamePrefix>,
62    /// Filter to streams whose names lexicographically start after this string.
63    /// It must be greater than or equal to the `prefix` if specified.
64    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
65    pub start_after: Option<StreamNameStartAfter>,
66    /// Number of results, up to a maximum of 1000.
67    #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
68    pub limit: Option<usize>,
69}
70
71super::impl_list_request_conversions!(
72    ListStreamsRequest,
73    types::stream::StreamNamePrefix,
74    types::stream::StreamNameStartAfter
75);
76
77#[rustfmt::skip]
78#[derive(Debug, Clone, Serialize, Deserialize)]
79#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
80pub struct ListStreamsResponse {
81    /// Matching streams.
82    #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
83    pub streams: Vec<StreamInfo>,
84    /// Indicates that there are more results that match the criteria.
85    pub has_more: bool,
86}
87
88#[rustfmt::skip]
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
91pub struct CreateStreamRequest {
92    /// Stream name that is unique to the basin.
93    /// It can be between 1 and 512 bytes in length.
94    pub stream: StreamName,
95    /// Stream configuration.
96    pub config: Option<StreamConfig>,
97}
98
99#[rustfmt::skip]
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
102/// Position of a record in a stream.
103pub struct StreamPosition {
104    /// Sequence number assigned by the service.
105    pub seq_num: record::SeqNum,
106    /// Timestamp, which may be client-specified or assigned by the service.
107    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
108    pub timestamp: record::Timestamp,
109}
110
111impl From<record::StreamPosition> for StreamPosition {
112    fn from(pos: record::StreamPosition) -> Self {
113        Self {
114            seq_num: pos.seq_num,
115            timestamp: pos.timestamp,
116        }
117    }
118}
119
120impl From<StreamPosition> for record::StreamPosition {
121    fn from(pos: StreamPosition) -> Self {
122        Self {
123            seq_num: pos.seq_num,
124            timestamp: pos.timestamp,
125        }
126    }
127}
128
129#[rustfmt::skip]
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
132pub struct TailResponse {
133    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
134    pub tail: StreamPosition,
135}
136
137#[rustfmt::skip]
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
140#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
141pub struct ReadStart {
142    /// Start from a sequence number.
143    #[cfg_attr(feature = "utoipa", param(value_type = record::SeqNum, required = false))]
144    pub seq_num: Option<record::SeqNum>,
145    /// Start from a timestamp.
146    #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
147    pub timestamp: Option<record::Timestamp>,
148    /// Start from number of records before the next sequence number.
149    #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
150    pub tail_offset: Option<u64>,
151    /// Start reading from the tail if the requested position is beyond it.
152    /// Otherwise, a `416 Range Not Satisfiable` response is returned.
153    #[cfg_attr(feature = "utoipa", param(value_type = bool, required = false))]
154    pub clamp: Option<bool>,
155}
156
157impl TryFrom<ReadStart> for types::stream::ReadStart {
158    type Error = types::ValidationError;
159
160    fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
161        let from = match (value.seq_num, value.timestamp, value.tail_offset) {
162            (Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
163            (None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
164            (None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
165            (None, None, None) => types::stream::ReadFrom::TailOffset(0),
166            _ => {
167                return Err(types::ValidationError(
168                    "only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
169                ));
170            }
171        };
172        let clamp = value.clamp.unwrap_or(false);
173        Ok(Self { from, clamp })
174    }
175}
176
177#[rustfmt::skip]
178#[derive(Debug, Clone, Serialize, Deserialize)]
179#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
180#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
181pub struct ReadEnd {
182    /// Record count limit.
183    /// Non-streaming reads are capped by the default limit of 1000 records.
184    #[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
185    pub count: Option<usize>,
186    /// Metered bytes limit.
187    /// Non-streaming reads are capped by the default limit of 1 MiB.
188    #[cfg_attr(feature = "utoipa", param(value_type = usize, required = false))]
189    pub bytes: Option<usize>,
190    /// Exclusive timestamp to read until.
191    #[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
192    pub until: Option<record::Timestamp>,
193    /// Duration in seconds to wait for new records.
194    /// The default duration is 0 if there is a bound on `count`, `bytes`, or `until`, and otherwise infinite.
195    /// Non-streaming reads are always bounded on `count` and `bytes`, so you can achieve long poll semantics by specifying a non-zero duration up to 60 seconds.
196    /// In the context of an SSE or S2S streaming read, the duration will bound how much time can elapse between records throughout the lifetime of the session.
197    #[cfg_attr(feature = "utoipa", param(value_type = u32, required = false))]
198    pub wait: Option<u32>,
199}
200
201impl From<ReadEnd> for types::stream::ReadEnd {
202    fn from(value: ReadEnd) -> Self {
203        Self {
204            limit: s2_common::read_extent::ReadLimit::from_count_and_bytes(
205                value.count,
206                value.bytes,
207            ),
208            until: value.until.into(),
209            wait: value.wait.map(|w| Duration::from_secs(w as u64)),
210        }
211    }
212}
213
214#[derive(Debug, Clone)]
215pub enum ReadRequest {
216    /// Unary
217    Unary {
218        encryption_key: Option<EncryptionKey>,
219        format: Format,
220        response_mime: JsonOrProto,
221    },
222    /// Server-Sent Events streaming response
223    EventStream {
224        encryption_key: Option<EncryptionKey>,
225        format: Format,
226        last_event_id: Option<sse::LastEventId>,
227    },
228    /// S2S streaming response
229    S2s {
230        encryption_key: Option<EncryptionKey>,
231        response_compression: s2s::CompressionAlgorithm,
232    },
233}
234
235pub enum AppendRequest {
236    /// Unary
237    Unary {
238        encryption_key: Option<EncryptionKey>,
239        input: types::stream::AppendInput,
240        response_mime: JsonOrProto,
241    },
242    /// S2S bi-directional streaming
243    S2s {
244        encryption_key: Option<EncryptionKey>,
245        inputs: BoxStream<'static, Result<types::stream::AppendInput, AppendInputStreamError>>,
246        response_compression: s2s::CompressionAlgorithm,
247    },
248}
249
250impl std::fmt::Debug for AppendRequest {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        match self {
253            AppendRequest::Unary {
254                encryption_key,
255                input,
256                response_mime: response,
257            } => f
258                .debug_struct("AppendRequest::Unary")
259                .field("encryption_key", encryption_key)
260                .field("input", input)
261                .field("response", response)
262                .finish(),
263            AppendRequest::S2s {
264                encryption_key,
265                response_compression,
266                ..
267            } => f
268                .debug_struct("AppendRequest::S2s")
269                .field("encryption_key", encryption_key)
270                .field("response_compression", response_compression)
271                .finish(),
272        }
273    }
274}
275
276#[derive(Debug, thiserror::Error)]
277pub enum AppendInputStreamError {
278    #[error("Failed to decode S2S frame: {0}")]
279    FrameDecode(#[from] std::io::Error),
280    #[error(transparent)]
281    Validation(#[from] types::ValidationError),
282}
283
284#[derive(Debug, Clone, Serialize, Deserialize)]
285#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
286pub struct Header(pub String, pub String);
287
288#[rustfmt::skip]
289/// Record that is durably sequenced on a stream.
290#[derive(Debug, Clone, Serialize)]
291#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
292pub struct SequencedRecord {
293    /// Sequence number assigned by the service.
294    pub seq_num: record::SeqNum,
295    /// Timestamp for this record.
296    pub timestamp: record::Timestamp,
297    /// Series of name-value pairs for this record.
298    #[serde(default, skip_serializing_if = "Vec::is_empty")]
299    #[cfg_attr(feature = "utoipa", schema(required = false))]
300    pub headers: Vec<Header>,
301    /// Body of the record.
302    #[serde(default, skip_serializing_if = "String::is_empty")]
303    #[cfg_attr(feature = "utoipa", schema(required = false))]
304    pub body: String,
305}
306
307impl SequencedRecord {
308    pub fn encode(format: Format, record: record::SequencedRecord) -> Self {
309        let (record::StreamPosition { seq_num, timestamp }, record) = record.into_parts();
310        let (headers, body) = record.into_parts();
311        Self {
312            seq_num,
313            timestamp,
314            headers: headers
315                .into_iter()
316                .map(|h| Header(format.encode(&h.name), format.encode(&h.value)))
317                .collect(),
318            body: format.encode(&body),
319        }
320    }
321}
322
323#[rustfmt::skip]
324/// Record to be appended to a stream.
325#[derive(Debug, Clone, Serialize, Deserialize)]
326#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
327pub struct AppendRecord {
328    /// Timestamp for this record.
329    /// The service will always ensure monotonicity by adjusting it up if necessary to the maximum observed timestamp.
330    /// Refer to stream timestamping configuration for the finer semantics around whether a client-specified timestamp is required, and whether it will be capped at the arrival time.
331    pub timestamp: Option<record::Timestamp>,
332    /// Series of name-value pairs for this record.
333    #[serde(default, skip_serializing_if = "Vec::is_empty")]
334    #[cfg_attr(feature = "utoipa", schema(required = false))]
335    pub headers: Vec<Header>,
336    /// Body of the record.
337    #[serde(default, skip_serializing_if = "String::is_empty")]
338    #[cfg_attr(feature = "utoipa", schema(required = false))]
339    pub body: String,
340}
341
342impl AppendRecord {
343    pub fn decode(
344        self,
345        format: Format,
346    ) -> Result<types::stream::AppendRecord, types::ValidationError> {
347        let headers = self
348            .headers
349            .into_iter()
350            .map(|Header(name, value)| {
351                Ok::<record::Header, types::ValidationError>(record::Header {
352                    name: format.decode(name)?,
353                    value: format.decode(value)?,
354                })
355            })
356            .try_collect()?;
357
358        let body = format.decode(self.body)?;
359
360        let record = record::Record::try_from_parts(headers, body)
361            .map_err(|e| e.to_string())?
362            .into();
363
364        let parts = types::stream::AppendRecordParts {
365            timestamp: self.timestamp,
366            record,
367        };
368
369        types::stream::AppendRecord::try_from(parts)
370            .map_err(|e| types::ValidationError(e.to_string()))
371    }
372}
373
374#[rustfmt::skip]
375/// Payload of an `append` request.
376#[derive(Debug, Clone, Serialize, Deserialize)]
377#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
378pub struct AppendInput {
379    /// Batch of records to append atomically, which must contain at least one record, and no more than 1000.
380    /// The total size of a batch of records may not exceed 1 MiB of metered bytes.
381    pub records: Vec<AppendRecord>,
382    /// Enforce that the sequence number assigned to the first record matches.
383    pub match_seq_num: Option<record::SeqNum>,
384    /// Enforce a fencing token, which starts out as an empty string that can be overridden by a `fence` command record.
385    pub fencing_token: Option<record::FencingToken>,
386}
387
388impl AppendInput {
389    pub fn decode(
390        self,
391        format: Format,
392    ) -> Result<types::stream::AppendInput, types::ValidationError> {
393        let records: Vec<types::stream::AppendRecord> = self
394            .records
395            .into_iter()
396            .map(|record| record.decode(format))
397            .try_collect()?;
398
399        Ok(types::stream::AppendInput {
400            records: types::stream::AppendRecordBatch::try_from(records)?,
401            match_seq_num: self.match_seq_num,
402            fencing_token: self.fencing_token,
403        })
404    }
405}
406
407#[rustfmt::skip]
408/// Success response to an `append` request.
409#[derive(Debug, Clone, Serialize)]
410#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
411pub struct AppendAck {
412    /// Sequence number and timestamp of the first record that was appended.
413    pub start: StreamPosition,
414    /// Sequence number of the last record that was appended `+ 1`, and timestamp of the last record that was appended.
415    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records appended.
416    pub end: StreamPosition,
417    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record on the stream.
418    /// This can be greater than the `end` position in case of concurrent appends.
419    pub tail: StreamPosition,
420}
421
422impl From<types::stream::AppendAck> for AppendAck {
423    fn from(ack: types::stream::AppendAck) -> Self {
424        Self {
425            start: ack.start.into(),
426            end: ack.end.into(),
427            tail: ack.tail.into(),
428        }
429    }
430}
431
432#[rustfmt::skip]
433/// Aborted due to a failed condition.
434#[derive(Debug, Clone, Serialize, Deserialize)]
435#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
436#[serde(rename_all = "snake_case")]
437pub enum AppendConditionFailed {
438    /// Fencing token did not match.
439    /// The expected fencing token is returned.
440    #[cfg_attr(feature = "utoipa", schema(title = "fencing token"))]
441    FencingTokenMismatch(record::FencingToken),
442    /// Sequence number did not match the tail of the stream.
443    /// The expected next sequence number is returned.
444    #[cfg_attr(feature = "utoipa", schema(title = "seq num"))]
445    SeqNumMismatch(record::SeqNum),
446}
447
448#[rustfmt::skip]
449#[derive(Debug, Clone, Serialize)]
450#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
451pub struct ReadBatch {
452    /// Records that are durably sequenced on the stream, retrieved based on the requested criteria.
453    /// This can only be empty in response to a unary read (i.e. not SSE), if the request cannot be satisfied without violating an explicit bound (`count`, `bytes`, or `until`).
454    pub records: Vec<SequencedRecord>,
455    /// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
456    /// This will only be present when reading recent records.
457    #[serde(skip_serializing_if = "Option::is_none")]
458    pub tail: Option<StreamPosition>,
459}
460
461impl ReadBatch {
462    pub fn encode(format: Format, batch: types::stream::ReadBatch) -> Self {
463        Self {
464            records: batch
465                .records
466                .into_iter()
467                .map(|record| SequencedRecord::encode(format, record))
468                .collect(),
469            tail: batch.tail.map(Into::into),
470        }
471    }
472}