use std::{fmt::Display, str::FromStr, sync::Arc};
pub use time::OffsetDateTime as Timestamp;
use crate::StreamKeyErr;
pub const MAX_STREAM_KEY_LEN: usize = 249;
pub const SEA_STREAMER_INTERNAL: &str = "SEA_STREAMER_INTERNAL";
pub const TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] =
time::macros::format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]");
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct StreamKey {
name: Arc<str>,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ShardId {
id: u64,
}
#[cfg(not(feature = "wide-seq-no"))]
pub type SeqNo = u64;
#[cfg(feature = "wide-seq-no")]
pub type SeqNo = u128;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SeqPos {
Beginning,
End,
At(SeqNo),
}
impl StreamKey {
pub fn new<S: AsRef<str>>(key: S) -> Result<Self, StreamKeyErr> {
let key = key.as_ref();
if is_valid_stream_key(key) {
Ok(Self {
name: Arc::from(key),
})
} else {
Err(StreamKeyErr::InvalidStreamKey)
}
}
pub fn name(&self) -> &str {
&self.name
}
}
impl ShardId {
pub const fn new(id: u64) -> Self {
Self { id }
}
pub fn id(&self) -> u64 {
self.id
}
}
impl Display for StreamKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}
}
impl Display for ShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
impl FromStr for StreamKey {
type Err = StreamKeyErr;
fn from_str(s: &str) -> Result<Self, Self::Err> {
StreamKey::new(s)
}
}
pub fn is_valid_stream_key(s: &str) -> bool {
s.len() <= MAX_STREAM_KEY_LEN && s.chars().all(is_valid_stream_key_char)
}
pub fn is_valid_stream_key_char(c: char) -> bool {
c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-')
}
#[cfg(feature = "serde")]
mod impl_serde {
use super::StreamKey;
impl<'de> serde::Deserialize<'de> for StreamKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = <&str>::deserialize(deserializer)?;
s.parse().map_err(serde::de::Error::custom)
}
}
impl serde::Serialize for StreamKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(self.name())
}
}
}