Skip to main content

s2_common/types/
stream.rs

1use std::{marker::PhantomData, ops::Deref, str::FromStr, time::Duration};
2
3use compact_str::{CompactString, ToCompactString};
4use time::OffsetDateTime;
5
6use super::{
7    ValidationError,
8    strings::{NameProps, PrefixProps, StartAfterProps, StrProps},
9};
10use crate::{
11    caps,
12    read_extent::{ReadLimit, ReadUntil},
13    record::{
14        FencingToken, Metered, MeteredSize, Record, SeqNum, SequencedRecord, StreamPosition,
15        Timestamp,
16    },
17    types::resources::ListItemsRequest,
18};
19
20#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
21#[cfg_attr(
22    feature = "rkyv",
23    derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
24)]
25pub struct StreamNameStr<T: StrProps>(CompactString, PhantomData<T>);
26
27impl<T: StrProps> StreamNameStr<T> {
28    fn validate_str(name: &str) -> Result<(), ValidationError> {
29        if !T::IS_PREFIX && name.is_empty() {
30            return Err(format!("stream {} must not be empty", T::FIELD_NAME).into());
31        }
32
33        if !T::IS_PREFIX && (name == "." || name == "..") {
34            return Err(format!("stream {} must not be \".\" or \"..\"", T::FIELD_NAME).into());
35        }
36
37        if name.len() > caps::MAX_STREAM_NAME_LEN {
38            return Err(format!(
39                "stream {} must not exceed {} bytes in length",
40                T::FIELD_NAME,
41                caps::MAX_STREAM_NAME_LEN
42            )
43            .into());
44        }
45
46        Ok(())
47    }
48}
49
50#[cfg(feature = "utoipa")]
51impl<T> utoipa::PartialSchema for StreamNameStr<T>
52where
53    T: StrProps,
54{
55    fn schema() -> utoipa::openapi::RefOr<utoipa::openapi::schema::Schema> {
56        utoipa::openapi::Object::builder()
57            .schema_type(utoipa::openapi::Type::String)
58            .min_length((!T::IS_PREFIX).then_some(caps::MIN_STREAM_NAME_LEN))
59            .max_length(Some(caps::MAX_STREAM_NAME_LEN))
60            .into()
61    }
62}
63
64#[cfg(feature = "utoipa")]
65impl<T> utoipa::ToSchema for StreamNameStr<T> where T: StrProps {}
66
67impl<T: StrProps> serde::Serialize for StreamNameStr<T> {
68    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
69    where
70        S: serde::Serializer,
71    {
72        serializer.serialize_str(&self.0)
73    }
74}
75
76impl<'de, T: StrProps> serde::Deserialize<'de> for StreamNameStr<T> {
77    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
78    where
79        D: serde::Deserializer<'de>,
80    {
81        let s = CompactString::deserialize(deserializer)?;
82        s.try_into().map_err(serde::de::Error::custom)
83    }
84}
85
86impl<T: StrProps> AsRef<str> for StreamNameStr<T> {
87    fn as_ref(&self) -> &str {
88        &self.0
89    }
90}
91
92impl<T: StrProps> Deref for StreamNameStr<T> {
93    type Target = str;
94
95    fn deref(&self) -> &Self::Target {
96        &self.0
97    }
98}
99
100impl<T: StrProps> TryFrom<CompactString> for StreamNameStr<T> {
101    type Error = ValidationError;
102
103    fn try_from(name: CompactString) -> Result<Self, Self::Error> {
104        Self::validate_str(&name)?;
105        Ok(Self(name, PhantomData))
106    }
107}
108
109impl<T: StrProps> FromStr for StreamNameStr<T> {
110    type Err = ValidationError;
111
112    fn from_str(s: &str) -> Result<Self, Self::Err> {
113        Self::validate_str(s)?;
114        Ok(Self(s.to_compact_string(), PhantomData))
115    }
116}
117
118impl<T: StrProps> std::fmt::Debug for StreamNameStr<T> {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        f.write_str(&self.0)
121    }
122}
123
124impl<T: StrProps> std::fmt::Display for StreamNameStr<T> {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.write_str(&self.0)
127    }
128}
129
130impl<T: StrProps> From<StreamNameStr<T>> for CompactString {
131    fn from(value: StreamNameStr<T>) -> Self {
132        value.0
133    }
134}
135
136pub type StreamName = StreamNameStr<NameProps>;
137
138pub type StreamNamePrefix = StreamNameStr<PrefixProps>;
139
140impl Default for StreamNamePrefix {
141    fn default() -> Self {
142        StreamNameStr(CompactString::default(), PhantomData)
143    }
144}
145
146impl From<StreamName> for StreamNamePrefix {
147    fn from(value: StreamName) -> Self {
148        Self(value.0, PhantomData)
149    }
150}
151
152pub type StreamNameStartAfter = StreamNameStr<StartAfterProps>;
153
154impl Default for StreamNameStartAfter {
155    fn default() -> Self {
156        StreamNameStr(CompactString::default(), PhantomData)
157    }
158}
159
160impl From<StreamName> for StreamNameStartAfter {
161    fn from(value: StreamName) -> Self {
162        Self(value.0, PhantomData)
163    }
164}
165
166#[derive(Debug, Clone)]
167pub struct StreamInfo {
168    pub name: StreamName,
169    pub created_at: OffsetDateTime,
170    pub deleted_at: Option<OffsetDateTime>,
171}
172
173#[derive(Debug, Clone)]
174pub struct AppendRecord(AppendRecordParts);
175
176impl Deref for AppendRecord {
177    type Target = AppendRecordParts;
178
179    fn deref(&self) -> &Self::Target {
180        let Self(parts) = self;
181        parts
182    }
183}
184
185impl MeteredSize for AppendRecord {
186    fn metered_size(&self) -> usize {
187        self.0.record.metered_size()
188    }
189}
190
191#[derive(Debug, Clone)]
192pub struct AppendRecordParts {
193    pub timestamp: Option<Timestamp>,
194    pub record: Metered<Record>,
195}
196
197impl MeteredSize for AppendRecordParts {
198    fn metered_size(&self) -> usize {
199        self.record.metered_size()
200    }
201}
202
203impl From<AppendRecord> for AppendRecordParts {
204    fn from(AppendRecord(parts): AppendRecord) -> Self {
205        parts
206    }
207}
208
209impl TryFrom<AppendRecordParts> for AppendRecord {
210    type Error = &'static str;
211
212    fn try_from(parts: AppendRecordParts) -> Result<Self, Self::Error> {
213        if parts.metered_size() > caps::RECORD_BATCH_MAX.bytes {
214            Err("record must have metered size less than 1 MiB")
215        } else {
216            Ok(Self(parts))
217        }
218    }
219}
220
221#[derive(Clone)]
222pub struct AppendRecordBatch(Metered<Vec<AppendRecord>>);
223
224impl std::fmt::Debug for AppendRecordBatch {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        f.debug_struct("AppendRecordBatch")
227            .field("num_records", &self.0.len())
228            .field("metered_size", &self.0.metered_size())
229            .finish()
230    }
231}
232
233impl MeteredSize for AppendRecordBatch {
234    fn metered_size(&self) -> usize {
235        self.0.metered_size()
236    }
237}
238
239impl std::ops::Deref for AppendRecordBatch {
240    type Target = [AppendRecord];
241
242    fn deref(&self) -> &Self::Target {
243        &self.0
244    }
245}
246
247impl TryFrom<Metered<Vec<AppendRecord>>> for AppendRecordBatch {
248    type Error = &'static str;
249
250    fn try_from(records: Metered<Vec<AppendRecord>>) -> Result<Self, Self::Error> {
251        if records.is_empty() {
252            return Err("record batch must not be empty");
253        }
254
255        if records.len() > caps::RECORD_BATCH_MAX.count {
256            return Err("record batch must not exceed 1000 records");
257        }
258
259        if records.metered_size() > caps::RECORD_BATCH_MAX.bytes {
260            return Err("record batch must not exceed a metered size of 1 MiB");
261        }
262
263        Ok(Self(records))
264    }
265}
266
267impl TryFrom<Vec<AppendRecord>> for AppendRecordBatch {
268    type Error = &'static str;
269
270    fn try_from(records: Vec<AppendRecord>) -> Result<Self, Self::Error> {
271        let records = Metered::from(records);
272        Self::try_from(records)
273    }
274}
275
276impl IntoIterator for AppendRecordBatch {
277    type Item = AppendRecord;
278    type IntoIter = std::vec::IntoIter<Self::Item>;
279
280    fn into_iter(self) -> Self::IntoIter {
281        self.0.into_iter()
282    }
283}
284
285#[derive(Debug, Clone)]
286pub struct AppendInput {
287    pub records: AppendRecordBatch,
288    pub match_seq_num: Option<SeqNum>,
289    pub fencing_token: Option<FencingToken>,
290}
291
292#[derive(Debug, Clone)]
293pub struct AppendAck {
294    pub start: StreamPosition,
295    pub end: StreamPosition,
296    pub tail: StreamPosition,
297}
298
299#[derive(Debug, Clone, Copy, PartialEq, Eq)]
300pub enum ReadPosition {
301    SeqNum(SeqNum),
302    Timestamp(Timestamp),
303}
304
305#[derive(Debug, Clone, Copy)]
306pub enum ReadFrom {
307    SeqNum(SeqNum),
308    Timestamp(Timestamp),
309    TailOffset(u64),
310}
311
312impl Default for ReadFrom {
313    fn default() -> Self {
314        Self::SeqNum(0)
315    }
316}
317
318#[derive(Debug, Default, Clone, Copy)]
319pub struct ReadStart {
320    pub from: ReadFrom,
321    pub clamp: bool,
322}
323
324#[derive(Debug, Default, Clone, Copy)]
325pub struct ReadEnd {
326    pub limit: ReadLimit,
327    pub until: ReadUntil,
328    pub wait: Option<Duration>,
329}
330
331impl ReadEnd {
332    pub fn may_follow(&self) -> bool {
333        (self.limit.is_unbounded() && self.until.is_unbounded())
334            || self.wait.is_some_and(|d| d > Duration::ZERO)
335    }
336}
337
338#[derive(Default, Clone)]
339pub struct ReadBatch {
340    pub records: Metered<Vec<SequencedRecord>>,
341    pub tail: Option<StreamPosition>,
342}
343
344impl std::fmt::Debug for ReadBatch {
345    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346        f.debug_struct("ReadBatch")
347            .field("num_records", &self.records.len())
348            .field("metered_size", &self.records.metered_size())
349            .field("tail", &self.tail)
350            .finish()
351    }
352}
353
354#[derive(Debug, Clone)]
355pub enum ReadSessionOutput {
356    Heartbeat(StreamPosition),
357    Batch(ReadBatch),
358}
359
360pub type ListStreamsRequest = ListItemsRequest<StreamNamePrefix, StreamNameStartAfter>;
361
362#[cfg(test)]
363mod test {
364    use rstest::rstest;
365
366    use super::{
367        super::strings::{NameProps, PrefixProps, StartAfterProps},
368        StreamNameStr,
369    };
370
371    #[rstest]
372    #[case::normal("my-stream".to_owned())]
373    #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
374    fn validate_name_ok(#[case] name: String) {
375        assert_eq!(StreamNameStr::<NameProps>::validate_str(&name), Ok(()));
376    }
377
378    #[rstest]
379    #[case::empty("".to_owned())]
380    #[case::dot(".".to_owned())]
381    #[case::dot_dot("..".to_owned())]
382    #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
383    fn validate_name_err(#[case] name: String) {
384        StreamNameStr::<NameProps>::validate_str(&name).expect_err("expected validation error");
385    }
386
387    #[rstest]
388    #[case::empty("".to_owned())]
389    #[case::dot(".".to_owned())]
390    #[case::dot_dot("..".to_owned())]
391    #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
392    fn validate_prefix_ok(#[case] prefix: String) {
393        assert_eq!(StreamNameStr::<PrefixProps>::validate_str(&prefix), Ok(()));
394    }
395
396    #[rstest]
397    #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
398    fn validate_prefix_err(#[case] prefix: String) {
399        StreamNameStr::<PrefixProps>::validate_str(&prefix).expect_err("expected validation error");
400    }
401
402    #[rstest]
403    #[case::empty("".to_owned())]
404    #[case::dot(".".to_owned())]
405    #[case::dot_dot("..".to_owned())]
406    #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
407    fn validate_start_after_ok(#[case] start_after: String) {
408        assert_eq!(
409            StreamNameStr::<StartAfterProps>::validate_str(&start_after),
410            Ok(())
411        );
412    }
413
414    #[rstest]
415    #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
416    fn validate_start_after_err(#[case] start_after: String) {
417        StreamNameStr::<StartAfterProps>::validate_str(&start_after)
418            .expect_err("expected validation error");
419    }
420}