1use std::{marker::PhantomData, ops::Deref, str::FromStr, time::Duration};
2
3use compact_str::{CompactString, ToCompactString};
4use time::OffsetDateTime;
5
6use super::{
7 ValidationError,
8 strings::{NameProps, PrefixProps, StartAfterProps, StrProps},
9};
10use crate::{
11 caps,
12 read_extent::{ReadLimit, ReadUntil},
13 record::{
14 FencingToken, Metered, MeteredSize, Record, SeqNum, SequencedRecord, StreamPosition,
15 Timestamp,
16 },
17 types::resources::ListItemsRequest,
18};
19
20#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
21#[cfg_attr(
22 feature = "rkyv",
23 derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
24)]
25pub struct StreamNameStr<T: StrProps>(CompactString, PhantomData<T>);
26
27impl<T: StrProps> StreamNameStr<T> {
28 fn validate_str(name: &str) -> Result<(), ValidationError> {
29 if !T::IS_PREFIX && name.is_empty() {
30 return Err(format!("stream {} must not be empty", T::FIELD_NAME).into());
31 }
32
33 if name.len() > caps::MAX_STREAM_NAME_LEN {
34 return Err(format!(
35 "stream {} must not exceed {} bytes in length",
36 T::FIELD_NAME,
37 caps::MAX_STREAM_NAME_LEN
38 )
39 .into());
40 }
41
42 Ok(())
43 }
44}
45
46#[cfg(feature = "utoipa")]
47impl<T> utoipa::PartialSchema for StreamNameStr<T>
48where
49 T: StrProps,
50{
51 fn schema() -> utoipa::openapi::RefOr<utoipa::openapi::schema::Schema> {
52 utoipa::openapi::Object::builder()
53 .schema_type(utoipa::openapi::Type::String)
54 .min_length((!T::IS_PREFIX).then_some(caps::MIN_STREAM_NAME_LEN))
55 .max_length(Some(caps::MAX_STREAM_NAME_LEN))
56 .into()
57 }
58}
59
60#[cfg(feature = "utoipa")]
61impl<T> utoipa::ToSchema for StreamNameStr<T> where T: StrProps {}
62
63impl<T: StrProps> serde::Serialize for StreamNameStr<T> {
64 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
65 where
66 S: serde::Serializer,
67 {
68 serializer.serialize_str(&self.0)
69 }
70}
71
72impl<'de, T: StrProps> serde::Deserialize<'de> for StreamNameStr<T> {
73 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
74 where
75 D: serde::Deserializer<'de>,
76 {
77 let s = CompactString::deserialize(deserializer)?;
78 s.try_into().map_err(serde::de::Error::custom)
79 }
80}
81
82impl<T: StrProps> AsRef<str> for StreamNameStr<T> {
83 fn as_ref(&self) -> &str {
84 &self.0
85 }
86}
87
88impl<T: StrProps> Deref for StreamNameStr<T> {
89 type Target = str;
90
91 fn deref(&self) -> &Self::Target {
92 &self.0
93 }
94}
95
96impl<T: StrProps> TryFrom<CompactString> for StreamNameStr<T> {
97 type Error = ValidationError;
98
99 fn try_from(name: CompactString) -> Result<Self, Self::Error> {
100 Self::validate_str(&name)?;
101 Ok(Self(name, PhantomData))
102 }
103}
104
105impl<T: StrProps> FromStr for StreamNameStr<T> {
106 type Err = ValidationError;
107
108 fn from_str(s: &str) -> Result<Self, Self::Err> {
109 Self::validate_str(s)?;
110 Ok(Self(s.to_compact_string(), PhantomData))
111 }
112}
113
114impl<T: StrProps> std::fmt::Debug for StreamNameStr<T> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.write_str(&self.0)
117 }
118}
119
120impl<T: StrProps> std::fmt::Display for StreamNameStr<T> {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 f.write_str(&self.0)
123 }
124}
125
126impl<T: StrProps> From<StreamNameStr<T>> for CompactString {
127 fn from(value: StreamNameStr<T>) -> Self {
128 value.0
129 }
130}
131
132pub type StreamName = StreamNameStr<NameProps>;
133
134pub type StreamNamePrefix = StreamNameStr<PrefixProps>;
135
136impl Default for StreamNamePrefix {
137 fn default() -> Self {
138 StreamNameStr(CompactString::default(), PhantomData)
139 }
140}
141
142impl From<StreamName> for StreamNamePrefix {
143 fn from(value: StreamName) -> Self {
144 Self(value.0, PhantomData)
145 }
146}
147
148pub type StreamNameStartAfter = StreamNameStr<StartAfterProps>;
149
150impl Default for StreamNameStartAfter {
151 fn default() -> Self {
152 StreamNameStr(CompactString::default(), PhantomData)
153 }
154}
155
156impl From<StreamName> for StreamNameStartAfter {
157 fn from(value: StreamName) -> Self {
158 Self(value.0, PhantomData)
159 }
160}
161
162#[derive(Debug, Clone)]
163pub struct StreamInfo {
164 pub name: StreamName,
165 pub created_at: OffsetDateTime,
166 pub deleted_at: Option<OffsetDateTime>,
167}
168
169#[derive(Debug, Clone)]
170pub struct AppendRecord(AppendRecordParts);
171
172impl Deref for AppendRecord {
173 type Target = AppendRecordParts;
174
175 fn deref(&self) -> &Self::Target {
176 let Self(parts) = self;
177 parts
178 }
179}
180
181impl MeteredSize for AppendRecord {
182 fn metered_size(&self) -> usize {
183 self.0.record.metered_size()
184 }
185}
186
187#[derive(Debug, Clone)]
188pub struct AppendRecordParts {
189 pub timestamp: Option<Timestamp>,
190 pub record: Metered<Record>,
191}
192
193impl MeteredSize for AppendRecordParts {
194 fn metered_size(&self) -> usize {
195 self.record.metered_size()
196 }
197}
198
199impl From<AppendRecord> for AppendRecordParts {
200 fn from(AppendRecord(parts): AppendRecord) -> Self {
201 parts
202 }
203}
204
205impl TryFrom<AppendRecordParts> for AppendRecord {
206 type Error = &'static str;
207
208 fn try_from(parts: AppendRecordParts) -> Result<Self, Self::Error> {
209 if parts.metered_size() > caps::RECORD_BATCH_MAX.bytes {
210 Err("record must have metered size less than 1 MiB")
211 } else {
212 Ok(Self(parts))
213 }
214 }
215}
216
217#[derive(Clone)]
218pub struct AppendRecordBatch(Metered<Vec<AppendRecord>>);
219
220impl std::fmt::Debug for AppendRecordBatch {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 f.debug_struct("AppendRecordBatch")
223 .field("num_records", &self.0.len())
224 .field("metered_size", &self.0.metered_size())
225 .finish()
226 }
227}
228
229impl MeteredSize for AppendRecordBatch {
230 fn metered_size(&self) -> usize {
231 self.0.metered_size()
232 }
233}
234
235impl std::ops::Deref for AppendRecordBatch {
236 type Target = [AppendRecord];
237
238 fn deref(&self) -> &Self::Target {
239 &self.0
240 }
241}
242
243impl TryFrom<Metered<Vec<AppendRecord>>> for AppendRecordBatch {
244 type Error = &'static str;
245
246 fn try_from(records: Metered<Vec<AppendRecord>>) -> Result<Self, Self::Error> {
247 if records.is_empty() {
248 return Err("record batch must not be empty");
249 }
250
251 if records.len() > caps::RECORD_BATCH_MAX.count {
252 return Err("record batch must not exceed 1000 records");
253 }
254
255 if records.metered_size() > caps::RECORD_BATCH_MAX.bytes {
256 return Err("record batch must not exceed a metered size of 1 MiB");
257 }
258
259 Ok(Self(records))
260 }
261}
262
263impl TryFrom<Vec<AppendRecord>> for AppendRecordBatch {
264 type Error = &'static str;
265
266 fn try_from(records: Vec<AppendRecord>) -> Result<Self, Self::Error> {
267 let records = Metered::from(records);
268 Self::try_from(records)
269 }
270}
271
272impl IntoIterator for AppendRecordBatch {
273 type Item = AppendRecord;
274 type IntoIter = std::vec::IntoIter<Self::Item>;
275
276 fn into_iter(self) -> Self::IntoIter {
277 self.0.into_iter()
278 }
279}
280
281#[derive(Debug, Clone)]
282pub struct AppendInput {
283 pub records: AppendRecordBatch,
284 pub match_seq_num: Option<SeqNum>,
285 pub fencing_token: Option<FencingToken>,
286}
287
288#[derive(Debug, Clone)]
289pub struct AppendAck {
290 pub start: StreamPosition,
291 pub end: StreamPosition,
292 pub tail: StreamPosition,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq)]
296pub enum ReadPosition {
297 SeqNum(SeqNum),
298 Timestamp(Timestamp),
299}
300
301#[derive(Debug, Clone, Copy)]
302pub enum ReadFrom {
303 SeqNum(SeqNum),
304 Timestamp(Timestamp),
305 TailOffset(u64),
306}
307
308impl Default for ReadFrom {
309 fn default() -> Self {
310 Self::SeqNum(0)
311 }
312}
313
314#[derive(Debug, Default, Clone, Copy)]
315pub struct ReadStart {
316 pub from: ReadFrom,
317 pub clamp: bool,
318}
319
320#[derive(Debug, Default, Clone, Copy)]
321pub struct ReadEnd {
322 pub limit: ReadLimit,
323 pub until: ReadUntil,
324 pub wait: Option<Duration>,
325}
326
327impl ReadEnd {
328 pub fn may_follow(&self) -> bool {
329 (self.limit.is_unbounded() && self.until.is_unbounded())
330 || self.wait.is_some_and(|d| d > Duration::ZERO)
331 }
332}
333
334#[derive(Default, Clone)]
335pub struct ReadBatch {
336 pub records: Metered<Vec<SequencedRecord>>,
337 pub tail: Option<StreamPosition>,
338}
339
340impl std::fmt::Debug for ReadBatch {
341 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342 f.debug_struct("ReadBatch")
343 .field("num_records", &self.records.len())
344 .field("metered_size", &self.records.metered_size())
345 .field("tail", &self.tail)
346 .finish()
347 }
348}
349
350#[derive(Debug, Clone)]
351pub enum ReadSessionOutput {
352 Heartbeat(StreamPosition),
353 Batch(ReadBatch),
354}
355
356pub type ListStreamsRequest = ListItemsRequest<StreamNamePrefix, StreamNameStartAfter>;