1use std::{marker::PhantomData, ops::Deref, str::FromStr, time::Duration};
2
3use compact_str::{CompactString, ToCompactString};
4use time::OffsetDateTime;
5
6use super::{
7 ValidationError,
8 strings::{NameProps, PrefixProps, StartAfterProps, StrProps},
9};
10use crate::{
11 caps,
12 encryption::EncryptionAlgorithm,
13 read_extent::{ReadLimit, ReadUntil},
14 record::{
15 FencingToken, Metered, MeteredSize, Record, SeqNum, Sequenced, StreamPosition, Timestamp,
16 },
17 resources::ListItemsRequest,
18};
19
20#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
21#[cfg_attr(
22 feature = "rkyv",
23 derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
24)]
25pub struct StreamNameStr<T: StrProps>(CompactString, PhantomData<T>);
26
27impl<T: StrProps> StreamNameStr<T> {
28 fn validate_str(name: &str) -> Result<(), ValidationError> {
29 if !T::IS_PREFIX && name.is_empty() {
30 return Err(format!("stream {} must not be empty", T::FIELD_NAME).into());
31 }
32
33 if !T::IS_PREFIX && (name == "." || name == "..") {
34 return Err(format!("stream {} must not be \".\" or \"..\"", T::FIELD_NAME).into());
35 }
36
37 if name.len() > caps::MAX_STREAM_NAME_LEN {
38 return Err(format!(
39 "stream {} must not exceed {} bytes in length",
40 T::FIELD_NAME,
41 caps::MAX_STREAM_NAME_LEN
42 )
43 .into());
44 }
45
46 Ok(())
47 }
48}
49
50#[cfg(feature = "utoipa")]
51impl<T> utoipa::PartialSchema for StreamNameStr<T>
52where
53 T: StrProps,
54{
55 fn schema() -> utoipa::openapi::RefOr<utoipa::openapi::schema::Schema> {
56 utoipa::openapi::Object::builder()
57 .schema_type(utoipa::openapi::Type::String)
58 .min_length((!T::IS_PREFIX).then_some(caps::MIN_STREAM_NAME_LEN))
59 .max_length(Some(caps::MAX_STREAM_NAME_LEN))
60 .into()
61 }
62}
63
64#[cfg(feature = "utoipa")]
65impl<T> utoipa::ToSchema for StreamNameStr<T> where T: StrProps {}
66
67impl<T: StrProps> serde::Serialize for StreamNameStr<T> {
68 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
69 where
70 S: serde::Serializer,
71 {
72 serializer.serialize_str(&self.0)
73 }
74}
75
76impl<'de, T: StrProps> serde::Deserialize<'de> for StreamNameStr<T> {
77 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
78 where
79 D: serde::Deserializer<'de>,
80 {
81 let s = CompactString::deserialize(deserializer)?;
82 s.try_into().map_err(serde::de::Error::custom)
83 }
84}
85
86impl<T: StrProps> AsRef<str> for StreamNameStr<T> {
87 fn as_ref(&self) -> &str {
88 &self.0
89 }
90}
91
92impl<T: StrProps> Deref for StreamNameStr<T> {
93 type Target = str;
94
95 fn deref(&self) -> &Self::Target {
96 &self.0
97 }
98}
99
100impl<T: StrProps> TryFrom<CompactString> for StreamNameStr<T> {
101 type Error = ValidationError;
102
103 fn try_from(name: CompactString) -> Result<Self, Self::Error> {
104 Self::validate_str(&name)?;
105 Ok(Self(name, PhantomData))
106 }
107}
108
109impl<T: StrProps> FromStr for StreamNameStr<T> {
110 type Err = ValidationError;
111
112 fn from_str(s: &str) -> Result<Self, Self::Err> {
113 Self::validate_str(s)?;
114 Ok(Self(s.to_compact_string(), PhantomData))
115 }
116}
117
118impl<T: StrProps> std::fmt::Debug for StreamNameStr<T> {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 f.write_str(&self.0)
121 }
122}
123
124impl<T: StrProps> std::fmt::Display for StreamNameStr<T> {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.write_str(&self.0)
127 }
128}
129
130impl<T: StrProps> From<StreamNameStr<T>> for CompactString {
131 fn from(value: StreamNameStr<T>) -> Self {
132 value.0
133 }
134}
135
136pub type StreamName = StreamNameStr<NameProps>;
137
138pub type StreamNamePrefix = StreamNameStr<PrefixProps>;
139
140impl Default for StreamNamePrefix {
141 fn default() -> Self {
142 StreamNameStr(CompactString::default(), PhantomData)
143 }
144}
145
146impl From<StreamName> for StreamNamePrefix {
147 fn from(value: StreamName) -> Self {
148 Self(value.0, PhantomData)
149 }
150}
151
152pub type StreamNameStartAfter = StreamNameStr<StartAfterProps>;
153
154impl Default for StreamNameStartAfter {
155 fn default() -> Self {
156 StreamNameStr(CompactString::default(), PhantomData)
157 }
158}
159
160impl From<StreamName> for StreamNameStartAfter {
161 fn from(value: StreamName) -> Self {
162 Self(value.0, PhantomData)
163 }
164}
165
166#[derive(Debug, Clone)]
167pub struct StreamInfo {
168 pub name: StreamName,
169 pub created_at: OffsetDateTime,
170 pub deleted_at: Option<OffsetDateTime>,
171 pub cipher: Option<EncryptionAlgorithm>,
172}
173
174#[derive(Debug, Clone)]
175pub struct AppendRecord<T = Record>(AppendRecordParts<T>);
176
177impl<T> AppendRecord<T> {
178 pub fn parts(&self) -> &AppendRecordParts<T> {
179 let Self(parts) = self;
180 parts
181 }
182
183 pub fn into_parts(self) -> AppendRecordParts<T> {
184 let Self(parts) = self;
185 parts
186 }
187}
188
189impl<T> MeteredSize for AppendRecord<T> {
190 fn metered_size(&self) -> usize {
191 self.0.record.metered_size()
192 }
193}
194
195#[derive(Debug, Clone)]
196pub struct AppendRecordParts<T = Record> {
197 pub timestamp: Option<Timestamp>,
198 pub record: Metered<T>,
199}
200
201impl<T> MeteredSize for AppendRecordParts<T> {
202 fn metered_size(&self) -> usize {
203 self.record.metered_size()
204 }
205}
206
207impl<T> From<AppendRecord<T>> for AppendRecordParts<T> {
208 fn from(record: AppendRecord<T>) -> Self {
209 record.into_parts()
210 }
211}
212
213impl<T> TryFrom<AppendRecordParts<T>> for AppendRecord<T> {
214 type Error = &'static str;
215
216 fn try_from(parts: AppendRecordParts<T>) -> Result<Self, Self::Error> {
217 if parts.metered_size() > caps::RECORD_BATCH_MAX.bytes {
218 Err("record must have metered size less than 1 MiB")
219 } else {
220 Ok(Self(parts))
221 }
222 }
223}
224
225#[derive(Clone)]
226pub struct AppendRecordBatch<T = Record>(Metered<Vec<AppendRecord<T>>>);
227
228impl<T> std::fmt::Debug for AppendRecordBatch<T> {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 f.debug_struct("AppendRecordBatch")
231 .field("num_records", &self.0.len())
232 .field("metered_size", &self.0.metered_size())
233 .finish()
234 }
235}
236
237impl<T> MeteredSize for AppendRecordBatch<T> {
238 fn metered_size(&self) -> usize {
239 self.0.metered_size()
240 }
241}
242
243impl<T> std::ops::Deref for AppendRecordBatch<T> {
244 type Target = [AppendRecord<T>];
245
246 fn deref(&self) -> &Self::Target {
247 &self.0
248 }
249}
250
251impl<T> TryFrom<Metered<Vec<AppendRecord<T>>>> for AppendRecordBatch<T> {
252 type Error = &'static str;
253
254 fn try_from(records: Metered<Vec<AppendRecord<T>>>) -> Result<Self, Self::Error> {
255 if records.is_empty() {
256 return Err("record batch must not be empty");
257 }
258
259 if records.len() > caps::RECORD_BATCH_MAX.count {
260 return Err("record batch must not exceed 1000 records");
261 }
262
263 if records.metered_size() > caps::RECORD_BATCH_MAX.bytes {
264 return Err("record batch must not exceed a metered size of 1 MiB");
265 }
266
267 Ok(Self(records))
268 }
269}
270
271impl<T> TryFrom<Vec<AppendRecord<T>>> for AppendRecordBatch<T> {
272 type Error = &'static str;
273
274 fn try_from(records: Vec<AppendRecord<T>>) -> Result<Self, Self::Error> {
275 let records = Metered::from(records);
276 Self::try_from(records)
277 }
278}
279
280impl<T> IntoIterator for AppendRecordBatch<T> {
281 type Item = AppendRecord<T>;
282 type IntoIter = std::vec::IntoIter<Self::Item>;
283
284 fn into_iter(self) -> Self::IntoIter {
285 self.0.into_iter()
286 }
287}
288
289#[derive(Debug, Clone)]
290pub struct AppendInput<T = Record> {
291 pub records: AppendRecordBatch<T>,
292 pub match_seq_num: Option<SeqNum>,
293 pub fencing_token: Option<FencingToken>,
294}
295
296#[derive(Debug, Clone)]
297pub struct AppendAck {
298 pub start: StreamPosition,
299 pub end: StreamPosition,
300 pub tail: StreamPosition,
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
304pub enum ReadPosition {
305 SeqNum(SeqNum),
306 Timestamp(Timestamp),
307}
308
309#[derive(Debug, Clone, Copy)]
310pub enum ReadFrom {
311 SeqNum(SeqNum),
312 Timestamp(Timestamp),
313 TailOffset(u64),
314}
315
316impl Default for ReadFrom {
317 fn default() -> Self {
318 Self::SeqNum(0)
319 }
320}
321
322#[derive(Debug, Default, Clone, Copy)]
323pub struct ReadStart {
324 pub from: ReadFrom,
325 pub clamp: bool,
326}
327
328#[derive(Debug, Default, Clone, Copy)]
329pub struct ReadEnd {
330 pub limit: ReadLimit,
331 pub until: ReadUntil,
332 pub wait: Option<Duration>,
333}
334
335impl ReadEnd {
336 pub fn may_follow(&self) -> bool {
337 (self.limit.is_unbounded() && self.until.is_unbounded())
338 || self.wait.is_some_and(|d| d > Duration::ZERO)
339 }
340}
341
342#[derive(Clone)]
343pub struct ReadBatch<T = Record> {
344 pub records: Metered<Vec<Sequenced<T>>>,
345 pub tail: Option<StreamPosition>,
346}
347
348impl<T> Default for ReadBatch<T>
349where
350 T: MeteredSize,
351{
352 fn default() -> Self {
353 Self {
354 records: Metered::default(),
355 tail: None,
356 }
357 }
358}
359
360impl<T> std::fmt::Debug for ReadBatch<T> {
361 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362 f.debug_struct("ReadBatch")
363 .field("num_records", &self.records.len())
364 .field("metered_size", &self.records.metered_size())
365 .field("tail", &self.tail)
366 .finish()
367 }
368}
369
370#[derive(Debug, Clone)]
371pub enum ReadSessionOutput<T = Record> {
372 Heartbeat(StreamPosition),
373 Batch(ReadBatch<T>),
374}
375
376pub type ListStreamsRequest = ListItemsRequest<StreamNamePrefix, StreamNameStartAfter>;
377
378#[cfg(test)]
379mod test {
380 use rstest::rstest;
381
382 use super::{
383 super::strings::{NameProps, PrefixProps, StartAfterProps},
384 *,
385 };
386
387 #[rstest]
388 #[case::normal("my-stream".to_owned())]
389 #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
390 fn validate_name_ok(#[case] name: String) {
391 assert_eq!(StreamNameStr::<NameProps>::validate_str(&name), Ok(()));
392 }
393
394 #[rstest]
395 #[case::empty("".to_owned())]
396 #[case::dot(".".to_owned())]
397 #[case::dot_dot("..".to_owned())]
398 #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
399 fn validate_name_err(#[case] name: String) {
400 StreamNameStr::<NameProps>::validate_str(&name).expect_err("expected validation error");
401 }
402
403 #[rstest]
404 #[case::empty("".to_owned())]
405 #[case::dot(".".to_owned())]
406 #[case::dot_dot("..".to_owned())]
407 #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
408 fn validate_prefix_ok(#[case] prefix: String) {
409 assert_eq!(StreamNameStr::<PrefixProps>::validate_str(&prefix), Ok(()));
410 }
411
412 #[rstest]
413 #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
414 fn validate_prefix_err(#[case] prefix: String) {
415 StreamNameStr::<PrefixProps>::validate_str(&prefix).expect_err("expected validation error");
416 }
417
418 #[rstest]
419 #[case::empty("".to_owned())]
420 #[case::dot(".".to_owned())]
421 #[case::dot_dot("..".to_owned())]
422 #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
423 fn validate_start_after_ok(#[case] start_after: String) {
424 assert_eq!(
425 StreamNameStr::<StartAfterProps>::validate_str(&start_after),
426 Ok(())
427 );
428 }
429
430 #[rstest]
431 #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
432 fn validate_start_after_err(#[case] start_after: String) {
433 StreamNameStr::<StartAfterProps>::validate_str(&start_after)
434 .expect_err("expected validation error");
435 }
436
437 #[test]
438 fn append_record_batch_rejects_empty_batches() {
439 let empty_batch: Result<AppendRecordBatch, _> = Vec::<AppendRecord>::new().try_into();
440
441 assert_eq!(empty_batch.unwrap_err(), "record batch must not be empty");
442 }
443}