Skip to main content

s2_common/
stream.rs

1use std::{marker::PhantomData, ops::Deref, str::FromStr, time::Duration};
2
3use compact_str::{CompactString, ToCompactString};
4use time::OffsetDateTime;
5
6use super::{
7    ValidationError,
8    strings::{NameProps, PrefixProps, StartAfterProps, StrProps},
9};
10use crate::{
11    caps,
12    encryption::EncryptionAlgorithm,
13    read_extent::{ReadLimit, ReadUntil},
14    record::{
15        FencingToken, Metered, MeteredSize, Record, SeqNum, Sequenced, StreamPosition, Timestamp,
16    },
17    resources::ListItemsRequest,
18};
19
20#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
21#[cfg_attr(
22    feature = "rkyv",
23    derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
24)]
25pub struct StreamNameStr<T: StrProps>(CompactString, PhantomData<T>);
26
27impl<T: StrProps> StreamNameStr<T> {
28    fn validate_str(name: &str) -> Result<(), ValidationError> {
29        if !T::IS_PREFIX && name.is_empty() {
30            return Err(format!("stream {} must not be empty", T::FIELD_NAME).into());
31        }
32
33        if !T::IS_PREFIX && (name == "." || name == "..") {
34            return Err(format!("stream {} must not be \".\" or \"..\"", T::FIELD_NAME).into());
35        }
36
37        if name.len() > caps::MAX_STREAM_NAME_LEN {
38            return Err(format!(
39                "stream {} must not exceed {} bytes in length",
40                T::FIELD_NAME,
41                caps::MAX_STREAM_NAME_LEN
42            )
43            .into());
44        }
45
46        Ok(())
47    }
48}
49
50#[cfg(feature = "utoipa")]
51impl<T> utoipa::PartialSchema for StreamNameStr<T>
52where
53    T: StrProps,
54{
55    fn schema() -> utoipa::openapi::RefOr<utoipa::openapi::schema::Schema> {
56        utoipa::openapi::Object::builder()
57            .schema_type(utoipa::openapi::Type::String)
58            .min_length((!T::IS_PREFIX).then_some(caps::MIN_STREAM_NAME_LEN))
59            .max_length(Some(caps::MAX_STREAM_NAME_LEN))
60            .into()
61    }
62}
63
64#[cfg(feature = "utoipa")]
65impl<T> utoipa::ToSchema for StreamNameStr<T> where T: StrProps {}
66
67impl<T: StrProps> serde::Serialize for StreamNameStr<T> {
68    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
69    where
70        S: serde::Serializer,
71    {
72        serializer.serialize_str(&self.0)
73    }
74}
75
76impl<'de, T: StrProps> serde::Deserialize<'de> for StreamNameStr<T> {
77    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
78    where
79        D: serde::Deserializer<'de>,
80    {
81        let s = CompactString::deserialize(deserializer)?;
82        s.try_into().map_err(serde::de::Error::custom)
83    }
84}
85
86impl<T: StrProps> AsRef<str> for StreamNameStr<T> {
87    fn as_ref(&self) -> &str {
88        &self.0
89    }
90}
91
92impl<T: StrProps> Deref for StreamNameStr<T> {
93    type Target = str;
94
95    fn deref(&self) -> &Self::Target {
96        &self.0
97    }
98}
99
100impl<T: StrProps> TryFrom<CompactString> for StreamNameStr<T> {
101    type Error = ValidationError;
102
103    fn try_from(name: CompactString) -> Result<Self, Self::Error> {
104        Self::validate_str(&name)?;
105        Ok(Self(name, PhantomData))
106    }
107}
108
109impl<T: StrProps> FromStr for StreamNameStr<T> {
110    type Err = ValidationError;
111
112    fn from_str(s: &str) -> Result<Self, Self::Err> {
113        Self::validate_str(s)?;
114        Ok(Self(s.to_compact_string(), PhantomData))
115    }
116}
117
118impl<T: StrProps> std::fmt::Debug for StreamNameStr<T> {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        f.write_str(&self.0)
121    }
122}
123
124impl<T: StrProps> std::fmt::Display for StreamNameStr<T> {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.write_str(&self.0)
127    }
128}
129
130impl<T: StrProps> From<StreamNameStr<T>> for CompactString {
131    fn from(value: StreamNameStr<T>) -> Self {
132        value.0
133    }
134}
135
136pub type StreamName = StreamNameStr<NameProps>;
137
138pub type StreamNamePrefix = StreamNameStr<PrefixProps>;
139
140impl Default for StreamNamePrefix {
141    fn default() -> Self {
142        StreamNameStr(CompactString::default(), PhantomData)
143    }
144}
145
146impl From<StreamName> for StreamNamePrefix {
147    fn from(value: StreamName) -> Self {
148        Self(value.0, PhantomData)
149    }
150}
151
152pub type StreamNameStartAfter = StreamNameStr<StartAfterProps>;
153
154impl Default for StreamNameStartAfter {
155    fn default() -> Self {
156        StreamNameStr(CompactString::default(), PhantomData)
157    }
158}
159
160impl From<StreamName> for StreamNameStartAfter {
161    fn from(value: StreamName) -> Self {
162        Self(value.0, PhantomData)
163    }
164}
165
166#[derive(Debug, Clone)]
167pub struct StreamInfo {
168    pub name: StreamName,
169    pub created_at: OffsetDateTime,
170    pub deleted_at: Option<OffsetDateTime>,
171    pub cipher: Option<EncryptionAlgorithm>,
172}
173
174#[derive(Debug, Clone)]
175pub struct AppendRecord<T = Record>(AppendRecordParts<T>);
176
177impl<T> AppendRecord<T> {
178    pub fn parts(&self) -> &AppendRecordParts<T> {
179        let Self(parts) = self;
180        parts
181    }
182
183    pub fn into_parts(self) -> AppendRecordParts<T> {
184        let Self(parts) = self;
185        parts
186    }
187}
188
189impl<T> MeteredSize for AppendRecord<T> {
190    fn metered_size(&self) -> usize {
191        self.0.record.metered_size()
192    }
193}
194
195#[derive(Debug, Clone)]
196pub struct AppendRecordParts<T = Record> {
197    pub timestamp: Option<Timestamp>,
198    pub record: Metered<T>,
199}
200
201impl<T> MeteredSize for AppendRecordParts<T> {
202    fn metered_size(&self) -> usize {
203        self.record.metered_size()
204    }
205}
206
207impl<T> From<AppendRecord<T>> for AppendRecordParts<T> {
208    fn from(record: AppendRecord<T>) -> Self {
209        record.into_parts()
210    }
211}
212
213impl<T> TryFrom<AppendRecordParts<T>> for AppendRecord<T> {
214    type Error = &'static str;
215
216    fn try_from(parts: AppendRecordParts<T>) -> Result<Self, Self::Error> {
217        if parts.metered_size() > caps::RECORD_BATCH_MAX.bytes {
218            Err("record must have metered size less than 1 MiB")
219        } else {
220            Ok(Self(parts))
221        }
222    }
223}
224
225#[derive(Clone)]
226pub struct AppendRecordBatch<T = Record>(Metered<Vec<AppendRecord<T>>>);
227
228impl<T> std::fmt::Debug for AppendRecordBatch<T> {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        f.debug_struct("AppendRecordBatch")
231            .field("num_records", &self.0.len())
232            .field("metered_size", &self.0.metered_size())
233            .finish()
234    }
235}
236
237impl<T> MeteredSize for AppendRecordBatch<T> {
238    fn metered_size(&self) -> usize {
239        self.0.metered_size()
240    }
241}
242
243impl<T> std::ops::Deref for AppendRecordBatch<T> {
244    type Target = [AppendRecord<T>];
245
246    fn deref(&self) -> &Self::Target {
247        &self.0
248    }
249}
250
251impl<T> TryFrom<Metered<Vec<AppendRecord<T>>>> for AppendRecordBatch<T> {
252    type Error = &'static str;
253
254    fn try_from(records: Metered<Vec<AppendRecord<T>>>) -> Result<Self, Self::Error> {
255        if records.is_empty() {
256            return Err("record batch must not be empty");
257        }
258
259        if records.len() > caps::RECORD_BATCH_MAX.count {
260            return Err("record batch must not exceed 1000 records");
261        }
262
263        if records.metered_size() > caps::RECORD_BATCH_MAX.bytes {
264            return Err("record batch must not exceed a metered size of 1 MiB");
265        }
266
267        Ok(Self(records))
268    }
269}
270
271impl<T> TryFrom<Vec<AppendRecord<T>>> for AppendRecordBatch<T> {
272    type Error = &'static str;
273
274    fn try_from(records: Vec<AppendRecord<T>>) -> Result<Self, Self::Error> {
275        let records = Metered::from(records);
276        Self::try_from(records)
277    }
278}
279
280impl<T> IntoIterator for AppendRecordBatch<T> {
281    type Item = AppendRecord<T>;
282    type IntoIter = std::vec::IntoIter<Self::Item>;
283
284    fn into_iter(self) -> Self::IntoIter {
285        self.0.into_iter()
286    }
287}
288
289#[derive(Debug, Clone)]
290pub struct AppendInput<T = Record> {
291    pub records: AppendRecordBatch<T>,
292    pub match_seq_num: Option<SeqNum>,
293    pub fencing_token: Option<FencingToken>,
294}
295
296#[derive(Debug, Clone)]
297pub struct AppendAck {
298    pub start: StreamPosition,
299    pub end: StreamPosition,
300    pub tail: StreamPosition,
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
304pub enum ReadPosition {
305    SeqNum(SeqNum),
306    Timestamp(Timestamp),
307}
308
309#[derive(Debug, Clone, Copy)]
310pub enum ReadFrom {
311    SeqNum(SeqNum),
312    Timestamp(Timestamp),
313    TailOffset(u64),
314}
315
316impl Default for ReadFrom {
317    fn default() -> Self {
318        Self::SeqNum(0)
319    }
320}
321
322#[derive(Debug, Default, Clone, Copy)]
323pub struct ReadStart {
324    pub from: ReadFrom,
325    pub clamp: bool,
326}
327
328#[derive(Debug, Default, Clone, Copy)]
329pub struct ReadEnd {
330    pub limit: ReadLimit,
331    pub until: ReadUntil,
332    pub wait: Option<Duration>,
333}
334
335impl ReadEnd {
336    pub fn may_follow(&self) -> bool {
337        (self.limit.is_unbounded() && self.until.is_unbounded())
338            || self.wait.is_some_and(|d| d > Duration::ZERO)
339    }
340}
341
342#[derive(Clone)]
343pub struct ReadBatch<T = Record> {
344    pub records: Metered<Vec<Sequenced<T>>>,
345    pub tail: Option<StreamPosition>,
346}
347
348impl<T> Default for ReadBatch<T>
349where
350    T: MeteredSize,
351{
352    fn default() -> Self {
353        Self {
354            records: Metered::default(),
355            tail: None,
356        }
357    }
358}
359
360impl<T> std::fmt::Debug for ReadBatch<T> {
361    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362        f.debug_struct("ReadBatch")
363            .field("num_records", &self.records.len())
364            .field("metered_size", &self.records.metered_size())
365            .field("tail", &self.tail)
366            .finish()
367    }
368}
369
370#[derive(Debug, Clone)]
371pub enum ReadSessionOutput<T = Record> {
372    Heartbeat(StreamPosition),
373    Batch(ReadBatch<T>),
374}
375
376pub type ListStreamsRequest = ListItemsRequest<StreamNamePrefix, StreamNameStartAfter>;
377
378#[cfg(test)]
379mod test {
380    use rstest::rstest;
381
382    use super::{
383        super::strings::{NameProps, PrefixProps, StartAfterProps},
384        *,
385    };
386
387    #[rstest]
388    #[case::normal("my-stream".to_owned())]
389    #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
390    fn validate_name_ok(#[case] name: String) {
391        assert_eq!(StreamNameStr::<NameProps>::validate_str(&name), Ok(()));
392    }
393
394    #[rstest]
395    #[case::empty("".to_owned())]
396    #[case::dot(".".to_owned())]
397    #[case::dot_dot("..".to_owned())]
398    #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
399    fn validate_name_err(#[case] name: String) {
400        StreamNameStr::<NameProps>::validate_str(&name).expect_err("expected validation error");
401    }
402
403    #[rstest]
404    #[case::empty("".to_owned())]
405    #[case::dot(".".to_owned())]
406    #[case::dot_dot("..".to_owned())]
407    #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
408    fn validate_prefix_ok(#[case] prefix: String) {
409        assert_eq!(StreamNameStr::<PrefixProps>::validate_str(&prefix), Ok(()));
410    }
411
412    #[rstest]
413    #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
414    fn validate_prefix_err(#[case] prefix: String) {
415        StreamNameStr::<PrefixProps>::validate_str(&prefix).expect_err("expected validation error");
416    }
417
418    #[rstest]
419    #[case::empty("".to_owned())]
420    #[case::dot(".".to_owned())]
421    #[case::dot_dot("..".to_owned())]
422    #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
423    fn validate_start_after_ok(#[case] start_after: String) {
424        assert_eq!(
425            StreamNameStr::<StartAfterProps>::validate_str(&start_after),
426            Ok(())
427        );
428    }
429
430    #[rstest]
431    #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
432    fn validate_start_after_err(#[case] start_after: String) {
433        StreamNameStr::<StartAfterProps>::validate_str(&start_after)
434            .expect_err("expected validation error");
435    }
436
437    #[test]
438    fn append_record_batch_rejects_empty_batches() {
439        let empty_batch: Result<AppendRecordBatch, _> = Vec::<AppendRecord>::new().try_into();
440
441        assert_eq!(empty_batch.unwrap_err(), "record batch must not be empty");
442    }
443}