sea_streamer_types/
stream.rs1use std::{fmt::Display, str::FromStr, sync::Arc};
2pub use time::OffsetDateTime as Timestamp;
3
4use crate::StreamKeyErr;
5
6pub const MAX_STREAM_KEY_LEN: usize = 249;
8
9pub const SEA_STREAMER_INTERNAL: &str = "SEA_STREAMER_INTERNAL";
11
12pub const TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] =
14 time::macros::format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]");
15
16#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
17pub struct StreamKey {
19 name: Arc<str>,
20}
21
22#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
23pub struct ShardId {
25 id: u64,
26}
27
28#[cfg(not(feature = "wide-seq-no"))]
30pub type SeqNo = u64;
31#[cfg(feature = "wide-seq-no")]
32pub type SeqNo = u128;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SeqPos {
37 Beginning,
38 End,
39 At(SeqNo),
40}
41
42impl StreamKey {
43 pub fn new<S: AsRef<str>>(key: S) -> Result<Self, StreamKeyErr> {
44 let key = key.as_ref();
45 if is_valid_stream_key(key) {
46 Ok(Self {
47 name: Arc::from(key),
48 })
49 } else {
50 Err(StreamKeyErr::InvalidStreamKey)
51 }
52 }
53
54 pub fn name(&self) -> &str {
55 &self.name
56 }
57}
58
59impl ShardId {
60 pub const fn new(id: u64) -> Self {
61 Self { id }
62 }
63
64 pub fn id(&self) -> u64 {
65 self.id
66 }
67}
68
69impl Display for StreamKey {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 write!(f, "{}", self.name)
72 }
73}
74
75impl Display for ShardId {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 write!(f, "{self:?}")
78 }
79}
80
81impl FromStr for StreamKey {
82 type Err = StreamKeyErr;
83
84 fn from_str(s: &str) -> Result<Self, Self::Err> {
85 StreamKey::new(s)
86 }
87}
88
89pub fn is_valid_stream_key(s: &str) -> bool {
90 s.len() <= MAX_STREAM_KEY_LEN && s.chars().all(is_valid_stream_key_char)
91}
92
93pub fn is_valid_stream_key_char(c: char) -> bool {
95 c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-')
97}
98
99#[cfg(feature = "serde")]
100mod impl_serde {
101 use super::StreamKey;
102
103 impl<'de> serde::Deserialize<'de> for StreamKey {
104 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
105 where
106 D: serde::Deserializer<'de>,
107 {
108 let s = <&str>::deserialize(deserializer)?;
109 s.parse().map_err(serde::de::Error::custom)
110 }
111 }
112
113 impl serde::Serialize for StreamKey {
114 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
115 where
116 S: serde::Serializer,
117 {
118 serializer.serialize_str(self.name())
119 }
120 }
121}