s2_common/types/
stream.rs

1use std::{marker::PhantomData, ops::Deref, str::FromStr, time::Duration};
2
3use compact_str::{CompactString, ToCompactString};
4use time::OffsetDateTime;
5
6use super::{
7    ValidationError,
8    strings::{NameProps, PrefixProps, StartAfterProps, StrProps},
9};
10use crate::{
11    caps,
12    read_extent::{ReadLimit, ReadUntil},
13    record::{
14        FencingToken, Metered, MeteredSize, Record, SeqNum, SequencedRecord, StreamPosition,
15        Timestamp,
16    },
17    types::resources::ListItemsRequest,
18};
19
20#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
21#[cfg_attr(
22    feature = "rkyv",
23    derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
24)]
25pub struct StreamNameStr<T: StrProps>(CompactString, PhantomData<T>);
26
27impl<T: StrProps> StreamNameStr<T> {
28    fn validate_str(name: &str) -> Result<(), ValidationError> {
29        if !T::IS_PREFIX && name.is_empty() {
30            return Err(format!("stream {} must not be empty", T::FIELD_NAME).into());
31        }
32
33        if name.len() > caps::MAX_STREAM_NAME_LEN {
34            return Err(format!(
35                "stream {} must not exceed {} bytes in length",
36                T::FIELD_NAME,
37                caps::MAX_STREAM_NAME_LEN
38            )
39            .into());
40        }
41
42        Ok(())
43    }
44}
45
46#[cfg(feature = "utoipa")]
47impl<T> utoipa::PartialSchema for StreamNameStr<T>
48where
49    T: StrProps,
50{
51    fn schema() -> utoipa::openapi::RefOr<utoipa::openapi::schema::Schema> {
52        utoipa::openapi::Object::builder()
53            .schema_type(utoipa::openapi::Type::String)
54            .min_length((!T::IS_PREFIX).then_some(caps::MIN_STREAM_NAME_LEN))
55            .max_length(Some(caps::MAX_STREAM_NAME_LEN))
56            .into()
57    }
58}
59
60#[cfg(feature = "utoipa")]
61impl<T> utoipa::ToSchema for StreamNameStr<T> where T: StrProps {}
62
63impl<T: StrProps> serde::Serialize for StreamNameStr<T> {
64    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
65    where
66        S: serde::Serializer,
67    {
68        serializer.serialize_str(&self.0)
69    }
70}
71
72impl<'de, T: StrProps> serde::Deserialize<'de> for StreamNameStr<T> {
73    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
74    where
75        D: serde::Deserializer<'de>,
76    {
77        let s = CompactString::deserialize(deserializer)?;
78        s.try_into().map_err(serde::de::Error::custom)
79    }
80}
81
82impl<T: StrProps> AsRef<str> for StreamNameStr<T> {
83    fn as_ref(&self) -> &str {
84        &self.0
85    }
86}
87
88impl<T: StrProps> Deref for StreamNameStr<T> {
89    type Target = str;
90
91    fn deref(&self) -> &Self::Target {
92        &self.0
93    }
94}
95
96impl<T: StrProps> TryFrom<CompactString> for StreamNameStr<T> {
97    type Error = ValidationError;
98
99    fn try_from(name: CompactString) -> Result<Self, Self::Error> {
100        Self::validate_str(&name)?;
101        Ok(Self(name, PhantomData))
102    }
103}
104
105impl<T: StrProps> FromStr for StreamNameStr<T> {
106    type Err = ValidationError;
107
108    fn from_str(s: &str) -> Result<Self, Self::Err> {
109        Self::validate_str(s)?;
110        Ok(Self(s.to_compact_string(), PhantomData))
111    }
112}
113
114impl<T: StrProps> std::fmt::Debug for StreamNameStr<T> {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.write_str(&self.0)
117    }
118}
119
120impl<T: StrProps> std::fmt::Display for StreamNameStr<T> {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        f.write_str(&self.0)
123    }
124}
125
126impl<T: StrProps> From<StreamNameStr<T>> for CompactString {
127    fn from(value: StreamNameStr<T>) -> Self {
128        value.0
129    }
130}
131
132pub type StreamName = StreamNameStr<NameProps>;
133
134pub type StreamNamePrefix = StreamNameStr<PrefixProps>;
135
136impl Default for StreamNamePrefix {
137    fn default() -> Self {
138        StreamNameStr(CompactString::default(), PhantomData)
139    }
140}
141
142impl From<StreamName> for StreamNamePrefix {
143    fn from(value: StreamName) -> Self {
144        Self(value.0, PhantomData)
145    }
146}
147
148pub type StreamNameStartAfter = StreamNameStr<StartAfterProps>;
149
150impl Default for StreamNameStartAfter {
151    fn default() -> Self {
152        StreamNameStr(CompactString::default(), PhantomData)
153    }
154}
155
156impl From<StreamName> for StreamNameStartAfter {
157    fn from(value: StreamName) -> Self {
158        Self(value.0, PhantomData)
159    }
160}
161
162#[derive(Debug, Clone)]
163pub struct StreamInfo {
164    pub name: StreamName,
165    pub created_at: OffsetDateTime,
166    pub deleted_at: Option<OffsetDateTime>,
167}
168
169#[derive(Debug, Clone)]
170pub struct AppendRecord(AppendRecordParts);
171
172impl Deref for AppendRecord {
173    type Target = AppendRecordParts;
174
175    fn deref(&self) -> &Self::Target {
176        let Self(parts) = self;
177        parts
178    }
179}
180
181impl MeteredSize for AppendRecord {
182    fn metered_size(&self) -> usize {
183        self.0.record.metered_size()
184    }
185}
186
187#[derive(Debug, Clone)]
188pub struct AppendRecordParts {
189    pub timestamp: Option<Timestamp>,
190    pub record: Metered<Record>,
191}
192
193impl MeteredSize for AppendRecordParts {
194    fn metered_size(&self) -> usize {
195        self.record.metered_size()
196    }
197}
198
199impl From<AppendRecord> for AppendRecordParts {
200    fn from(AppendRecord(parts): AppendRecord) -> Self {
201        parts
202    }
203}
204
205impl TryFrom<AppendRecordParts> for AppendRecord {
206    type Error = &'static str;
207
208    fn try_from(parts: AppendRecordParts) -> Result<Self, Self::Error> {
209        if parts.metered_size() > caps::RECORD_BATCH_MAX.bytes {
210            Err("record must have metered size less than 1 MiB")
211        } else {
212            Ok(Self(parts))
213        }
214    }
215}
216
217#[derive(Clone)]
218pub struct AppendRecordBatch(Metered<Vec<AppendRecord>>);
219
220impl std::fmt::Debug for AppendRecordBatch {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        f.debug_struct("AppendRecordBatch")
223            .field("num_records", &self.0.len())
224            .field("metered_size", &self.0.metered_size())
225            .finish()
226    }
227}
228
229impl MeteredSize for AppendRecordBatch {
230    fn metered_size(&self) -> usize {
231        self.0.metered_size()
232    }
233}
234
235impl std::ops::Deref for AppendRecordBatch {
236    type Target = [AppendRecord];
237
238    fn deref(&self) -> &Self::Target {
239        &self.0
240    }
241}
242
243impl TryFrom<Metered<Vec<AppendRecord>>> for AppendRecordBatch {
244    type Error = &'static str;
245
246    fn try_from(records: Metered<Vec<AppendRecord>>) -> Result<Self, Self::Error> {
247        if records.is_empty() {
248            return Err("record batch must not be empty");
249        }
250
251        if records.len() > caps::RECORD_BATCH_MAX.count {
252            return Err("record batch must not exceed 1000 records");
253        }
254
255        if records.metered_size() > caps::RECORD_BATCH_MAX.bytes {
256            return Err("record batch must not exceed a metered size of 1 MiB");
257        }
258
259        Ok(Self(records))
260    }
261}
262
263impl TryFrom<Vec<AppendRecord>> for AppendRecordBatch {
264    type Error = &'static str;
265
266    fn try_from(records: Vec<AppendRecord>) -> Result<Self, Self::Error> {
267        let records = Metered::from(records);
268        Self::try_from(records)
269    }
270}
271
272impl IntoIterator for AppendRecordBatch {
273    type Item = AppendRecord;
274    type IntoIter = std::vec::IntoIter<Self::Item>;
275
276    fn into_iter(self) -> Self::IntoIter {
277        self.0.into_iter()
278    }
279}
280
281#[derive(Debug, Clone)]
282pub struct AppendInput {
283    pub records: AppendRecordBatch,
284    pub match_seq_num: Option<SeqNum>,
285    pub fencing_token: Option<FencingToken>,
286}
287
288#[derive(Debug, Clone)]
289pub struct AppendAck {
290    pub start: StreamPosition,
291    pub end: StreamPosition,
292    pub tail: StreamPosition,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq)]
296pub enum ReadPosition {
297    SeqNum(SeqNum),
298    Timestamp(Timestamp),
299}
300
301#[derive(Debug, Clone, Copy)]
302pub enum ReadFrom {
303    SeqNum(SeqNum),
304    Timestamp(Timestamp),
305    TailOffset(u64),
306}
307
308impl Default for ReadFrom {
309    fn default() -> Self {
310        Self::SeqNum(0)
311    }
312}
313
314#[derive(Debug, Default, Clone, Copy)]
315pub struct ReadStart {
316    pub from: ReadFrom,
317    pub clamp: bool,
318}
319
320#[derive(Debug, Default, Clone, Copy)]
321pub struct ReadEnd {
322    pub limit: ReadLimit,
323    pub until: ReadUntil,
324    pub wait: Option<Duration>,
325}
326
327impl ReadEnd {
328    pub fn may_follow(&self) -> bool {
329        (self.limit.is_unbounded() && self.until.is_unbounded())
330            || self.wait.is_some_and(|d| d > Duration::ZERO)
331    }
332}
333
334#[derive(Default, Clone)]
335pub struct ReadBatch {
336    pub records: Metered<Vec<SequencedRecord>>,
337    pub tail: Option<StreamPosition>,
338}
339
340impl std::fmt::Debug for ReadBatch {
341    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342        f.debug_struct("ReadBatch")
343            .field("num_records", &self.records.len())
344            .field("metered_size", &self.records.metered_size())
345            .field("tail", &self.tail)
346            .finish()
347    }
348}
349
350#[derive(Debug, Clone)]
351pub enum ReadSessionOutput {
352    Heartbeat(StreamPosition),
353    Batch(ReadBatch),
354}
355
356pub type ListStreamsRequest = ListItemsRequest<StreamNamePrefix, StreamNameStartAfter>;