sea_streamer_types/
stream.rs

1use std::{fmt::Display, str::FromStr, sync::Arc};
2pub use time::OffsetDateTime as Timestamp;
3
4use crate::StreamKeyErr;
5
6/// Maximum string length of a stream key.
7pub const MAX_STREAM_KEY_LEN: usize = 249;
8
9/// Reserved by SeaStreamer. Avoid using this as StreamKey.
10pub const SEA_STREAMER_INTERNAL: &str = "SEA_STREAMER_INTERNAL";
11
12/// Canonical display format for Timestamp.
13pub const TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] =
14    time::macros::format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]");
15
16#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
17/// Identifies a stream. Aka. topic.
18pub struct StreamKey {
19    name: Arc<str>,
20}
21
22#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
23/// Identifies a shard. Aka. partition.
24pub struct ShardId {
25    id: u64,
26}
27
28/// The tuple (StreamKey, ShardId, SeqNo) uniquely identifies a message. Aka. offset.
29#[cfg(not(feature = "wide-seq-no"))]
30pub type SeqNo = u64;
31#[cfg(feature = "wide-seq-no")]
32pub type SeqNo = u128;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35/// Identifies a position in a stream.
36pub enum SeqPos {
37    Beginning,
38    End,
39    At(SeqNo),
40}
41
42impl StreamKey {
43    pub fn new<S: AsRef<str>>(key: S) -> Result<Self, StreamKeyErr> {
44        let key = key.as_ref();
45        if is_valid_stream_key(key) {
46            Ok(Self {
47                name: Arc::from(key),
48            })
49        } else {
50            Err(StreamKeyErr::InvalidStreamKey)
51        }
52    }
53
54    pub fn name(&self) -> &str {
55        &self.name
56    }
57}
58
59impl ShardId {
60    pub const fn new(id: u64) -> Self {
61        Self { id }
62    }
63
64    pub fn id(&self) -> u64 {
65        self.id
66    }
67}
68
69impl Display for StreamKey {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        write!(f, "{}", self.name)
72    }
73}
74
75impl Display for ShardId {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        write!(f, "{self:?}")
78    }
79}
80
81impl FromStr for StreamKey {
82    type Err = StreamKeyErr;
83
84    fn from_str(s: &str) -> Result<Self, Self::Err> {
85        StreamKey::new(s)
86    }
87}
88
89pub fn is_valid_stream_key(s: &str) -> bool {
90    s.len() <= MAX_STREAM_KEY_LEN && s.chars().all(is_valid_stream_key_char)
91}
92
93/// Returns true if this character can be used in a stream key.
94pub fn is_valid_stream_key_char(c: char) -> bool {
95    // https://stackoverflow.com/questions/37062904/what-are-apache-kafka-topic-name-limitations
96    c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-')
97}
98
99#[cfg(feature = "serde")]
100mod impl_serde {
101    use super::StreamKey;
102
103    impl<'de> serde::Deserialize<'de> for StreamKey {
104        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
105        where
106            D: serde::Deserializer<'de>,
107        {
108            let s = <&str>::deserialize(deserializer)?;
109            s.parse().map_err(serde::de::Error::custom)
110        }
111    }
112
113    impl serde::Serialize for StreamKey {
114        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
115        where
116            S: serde::Serializer,
117        {
118            serializer.serialize_str(self.name())
119        }
120    }
121}