1use std::{marker::PhantomData, ops::Deref, str::FromStr, time::Duration};
2
3use compact_str::{CompactString, ToCompactString};
4use time::OffsetDateTime;
5
6use super::{
7 ValidationError,
8 strings::{NameProps, PrefixProps, StartAfterProps, StrProps},
9};
10use crate::{
11 caps,
12 read_extent::{ReadLimit, ReadUntil},
13 record::{
14 FencingToken, Metered, MeteredSize, Record, SeqNum, SequencedRecord, StreamPosition,
15 Timestamp,
16 },
17 types::resources::ListItemsRequest,
18};
19
20#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
21#[cfg_attr(
22 feature = "rkyv",
23 derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
24)]
25pub struct StreamNameStr<T: StrProps>(CompactString, PhantomData<T>);
26
27impl<T: StrProps> StreamNameStr<T> {
28 fn validate_str(name: &str) -> Result<(), ValidationError> {
29 if !T::IS_PREFIX && name.is_empty() {
30 return Err(format!("stream {} must not be empty", T::FIELD_NAME).into());
31 }
32
33 if !T::IS_PREFIX && (name == "." || name == "..") {
34 return Err(format!("stream {} must not be \".\" or \"..\"", T::FIELD_NAME).into());
35 }
36
37 if name.len() > caps::MAX_STREAM_NAME_LEN {
38 return Err(format!(
39 "stream {} must not exceed {} bytes in length",
40 T::FIELD_NAME,
41 caps::MAX_STREAM_NAME_LEN
42 )
43 .into());
44 }
45
46 Ok(())
47 }
48}
49
50#[cfg(feature = "utoipa")]
51impl<T> utoipa::PartialSchema for StreamNameStr<T>
52where
53 T: StrProps,
54{
55 fn schema() -> utoipa::openapi::RefOr<utoipa::openapi::schema::Schema> {
56 utoipa::openapi::Object::builder()
57 .schema_type(utoipa::openapi::Type::String)
58 .min_length((!T::IS_PREFIX).then_some(caps::MIN_STREAM_NAME_LEN))
59 .max_length(Some(caps::MAX_STREAM_NAME_LEN))
60 .into()
61 }
62}
63
64#[cfg(feature = "utoipa")]
65impl<T> utoipa::ToSchema for StreamNameStr<T> where T: StrProps {}
66
67impl<T: StrProps> serde::Serialize for StreamNameStr<T> {
68 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
69 where
70 S: serde::Serializer,
71 {
72 serializer.serialize_str(&self.0)
73 }
74}
75
76impl<'de, T: StrProps> serde::Deserialize<'de> for StreamNameStr<T> {
77 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
78 where
79 D: serde::Deserializer<'de>,
80 {
81 let s = CompactString::deserialize(deserializer)?;
82 s.try_into().map_err(serde::de::Error::custom)
83 }
84}
85
86impl<T: StrProps> AsRef<str> for StreamNameStr<T> {
87 fn as_ref(&self) -> &str {
88 &self.0
89 }
90}
91
92impl<T: StrProps> Deref for StreamNameStr<T> {
93 type Target = str;
94
95 fn deref(&self) -> &Self::Target {
96 &self.0
97 }
98}
99
100impl<T: StrProps> TryFrom<CompactString> for StreamNameStr<T> {
101 type Error = ValidationError;
102
103 fn try_from(name: CompactString) -> Result<Self, Self::Error> {
104 Self::validate_str(&name)?;
105 Ok(Self(name, PhantomData))
106 }
107}
108
109impl<T: StrProps> FromStr for StreamNameStr<T> {
110 type Err = ValidationError;
111
112 fn from_str(s: &str) -> Result<Self, Self::Err> {
113 Self::validate_str(s)?;
114 Ok(Self(s.to_compact_string(), PhantomData))
115 }
116}
117
118impl<T: StrProps> std::fmt::Debug for StreamNameStr<T> {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 f.write_str(&self.0)
121 }
122}
123
124impl<T: StrProps> std::fmt::Display for StreamNameStr<T> {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.write_str(&self.0)
127 }
128}
129
130impl<T: StrProps> From<StreamNameStr<T>> for CompactString {
131 fn from(value: StreamNameStr<T>) -> Self {
132 value.0
133 }
134}
135
136pub type StreamName = StreamNameStr<NameProps>;
137
138pub type StreamNamePrefix = StreamNameStr<PrefixProps>;
139
140impl Default for StreamNamePrefix {
141 fn default() -> Self {
142 StreamNameStr(CompactString::default(), PhantomData)
143 }
144}
145
146impl From<StreamName> for StreamNamePrefix {
147 fn from(value: StreamName) -> Self {
148 Self(value.0, PhantomData)
149 }
150}
151
152pub type StreamNameStartAfter = StreamNameStr<StartAfterProps>;
153
154impl Default for StreamNameStartAfter {
155 fn default() -> Self {
156 StreamNameStr(CompactString::default(), PhantomData)
157 }
158}
159
160impl From<StreamName> for StreamNameStartAfter {
161 fn from(value: StreamName) -> Self {
162 Self(value.0, PhantomData)
163 }
164}
165
166#[derive(Debug, Clone)]
167pub struct StreamInfo {
168 pub name: StreamName,
169 pub created_at: OffsetDateTime,
170 pub deleted_at: Option<OffsetDateTime>,
171}
172
173#[derive(Debug, Clone)]
174pub struct AppendRecord(AppendRecordParts);
175
176impl Deref for AppendRecord {
177 type Target = AppendRecordParts;
178
179 fn deref(&self) -> &Self::Target {
180 let Self(parts) = self;
181 parts
182 }
183}
184
185impl MeteredSize for AppendRecord {
186 fn metered_size(&self) -> usize {
187 self.0.record.metered_size()
188 }
189}
190
191#[derive(Debug, Clone)]
192pub struct AppendRecordParts {
193 pub timestamp: Option<Timestamp>,
194 pub record: Metered<Record>,
195}
196
197impl MeteredSize for AppendRecordParts {
198 fn metered_size(&self) -> usize {
199 self.record.metered_size()
200 }
201}
202
203impl From<AppendRecord> for AppendRecordParts {
204 fn from(AppendRecord(parts): AppendRecord) -> Self {
205 parts
206 }
207}
208
209impl TryFrom<AppendRecordParts> for AppendRecord {
210 type Error = &'static str;
211
212 fn try_from(parts: AppendRecordParts) -> Result<Self, Self::Error> {
213 if parts.metered_size() > caps::RECORD_BATCH_MAX.bytes {
214 Err("record must have metered size less than 1 MiB")
215 } else {
216 Ok(Self(parts))
217 }
218 }
219}
220
221#[derive(Clone)]
222pub struct AppendRecordBatch(Metered<Vec<AppendRecord>>);
223
224impl std::fmt::Debug for AppendRecordBatch {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 f.debug_struct("AppendRecordBatch")
227 .field("num_records", &self.0.len())
228 .field("metered_size", &self.0.metered_size())
229 .finish()
230 }
231}
232
233impl MeteredSize for AppendRecordBatch {
234 fn metered_size(&self) -> usize {
235 self.0.metered_size()
236 }
237}
238
239impl std::ops::Deref for AppendRecordBatch {
240 type Target = [AppendRecord];
241
242 fn deref(&self) -> &Self::Target {
243 &self.0
244 }
245}
246
247impl TryFrom<Metered<Vec<AppendRecord>>> for AppendRecordBatch {
248 type Error = &'static str;
249
250 fn try_from(records: Metered<Vec<AppendRecord>>) -> Result<Self, Self::Error> {
251 if records.is_empty() {
252 return Err("record batch must not be empty");
253 }
254
255 if records.len() > caps::RECORD_BATCH_MAX.count {
256 return Err("record batch must not exceed 1000 records");
257 }
258
259 if records.metered_size() > caps::RECORD_BATCH_MAX.bytes {
260 return Err("record batch must not exceed a metered size of 1 MiB");
261 }
262
263 Ok(Self(records))
264 }
265}
266
267impl TryFrom<Vec<AppendRecord>> for AppendRecordBatch {
268 type Error = &'static str;
269
270 fn try_from(records: Vec<AppendRecord>) -> Result<Self, Self::Error> {
271 let records = Metered::from(records);
272 Self::try_from(records)
273 }
274}
275
276impl IntoIterator for AppendRecordBatch {
277 type Item = AppendRecord;
278 type IntoIter = std::vec::IntoIter<Self::Item>;
279
280 fn into_iter(self) -> Self::IntoIter {
281 self.0.into_iter()
282 }
283}
284
285#[derive(Debug, Clone)]
286pub struct AppendInput {
287 pub records: AppendRecordBatch,
288 pub match_seq_num: Option<SeqNum>,
289 pub fencing_token: Option<FencingToken>,
290}
291
292#[derive(Debug, Clone)]
293pub struct AppendAck {
294 pub start: StreamPosition,
295 pub end: StreamPosition,
296 pub tail: StreamPosition,
297}
298
299#[derive(Debug, Clone, Copy, PartialEq, Eq)]
300pub enum ReadPosition {
301 SeqNum(SeqNum),
302 Timestamp(Timestamp),
303}
304
305#[derive(Debug, Clone, Copy)]
306pub enum ReadFrom {
307 SeqNum(SeqNum),
308 Timestamp(Timestamp),
309 TailOffset(u64),
310}
311
312impl Default for ReadFrom {
313 fn default() -> Self {
314 Self::SeqNum(0)
315 }
316}
317
318#[derive(Debug, Default, Clone, Copy)]
319pub struct ReadStart {
320 pub from: ReadFrom,
321 pub clamp: bool,
322}
323
324#[derive(Debug, Default, Clone, Copy)]
325pub struct ReadEnd {
326 pub limit: ReadLimit,
327 pub until: ReadUntil,
328 pub wait: Option<Duration>,
329}
330
331impl ReadEnd {
332 pub fn may_follow(&self) -> bool {
333 (self.limit.is_unbounded() && self.until.is_unbounded())
334 || self.wait.is_some_and(|d| d > Duration::ZERO)
335 }
336}
337
338#[derive(Default, Clone)]
339pub struct ReadBatch {
340 pub records: Metered<Vec<SequencedRecord>>,
341 pub tail: Option<StreamPosition>,
342}
343
344impl std::fmt::Debug for ReadBatch {
345 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346 f.debug_struct("ReadBatch")
347 .field("num_records", &self.records.len())
348 .field("metered_size", &self.records.metered_size())
349 .field("tail", &self.tail)
350 .finish()
351 }
352}
353
354#[derive(Debug, Clone)]
355pub enum ReadSessionOutput {
356 Heartbeat(StreamPosition),
357 Batch(ReadBatch),
358}
359
360pub type ListStreamsRequest = ListItemsRequest<StreamNamePrefix, StreamNameStartAfter>;
361
362#[cfg(test)]
363mod test {
364 use rstest::rstest;
365
366 use super::{
367 super::strings::{NameProps, PrefixProps, StartAfterProps},
368 StreamNameStr,
369 };
370
371 #[rstest]
372 #[case::normal("my-stream".to_owned())]
373 #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
374 fn validate_name_ok(#[case] name: String) {
375 assert_eq!(StreamNameStr::<NameProps>::validate_str(&name), Ok(()));
376 }
377
378 #[rstest]
379 #[case::empty("".to_owned())]
380 #[case::dot(".".to_owned())]
381 #[case::dot_dot("..".to_owned())]
382 #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
383 fn validate_name_err(#[case] name: String) {
384 StreamNameStr::<NameProps>::validate_str(&name).expect_err("expected validation error");
385 }
386
387 #[rstest]
388 #[case::empty("".to_owned())]
389 #[case::dot(".".to_owned())]
390 #[case::dot_dot("..".to_owned())]
391 #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
392 fn validate_prefix_ok(#[case] prefix: String) {
393 assert_eq!(StreamNameStr::<PrefixProps>::validate_str(&prefix), Ok(()));
394 }
395
396 #[rstest]
397 #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
398 fn validate_prefix_err(#[case] prefix: String) {
399 StreamNameStr::<PrefixProps>::validate_str(&prefix).expect_err("expected validation error");
400 }
401
402 #[rstest]
403 #[case::empty("".to_owned())]
404 #[case::dot(".".to_owned())]
405 #[case::dot_dot("..".to_owned())]
406 #[case::max_len("a".repeat(crate::caps::MAX_STREAM_NAME_LEN))]
407 fn validate_start_after_ok(#[case] start_after: String) {
408 assert_eq!(
409 StreamNameStr::<StartAfterProps>::validate_str(&start_after),
410 Ok(())
411 );
412 }
413
414 #[rstest]
415 #[case::too_long("a".repeat(crate::caps::MAX_STREAM_NAME_LEN + 1))]
416 fn validate_start_after_err(#[case] start_after: String) {
417 StreamNameStr::<StartAfterProps>::validate_str(&start_after)
418 .expect_err("expected validation error");
419 }
420}