use std::{
convert::TryFrom,
fmt::{self, Debug, Display, Formatter},
str::FromStr,
};
use anyhow::anyhow;
use derive_more::Display;
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use crate::{
scalar::nonempty_string,
scalars::{NodeId, StreamId},
tags::TagSet,
};
#[derive(Debug, Display, PartialEq)]
pub enum ParseError {
#[display(fmt = "SourceId was longer than maximum")]
SourceIdTooLong,
#[display(fmt = "Empty string is not permissible for SourceId")]
EmptySourceId,
#[display(fmt = "Empty string is not permissible for Semantics")]
EmptySemantics,
#[display(fmt = "Empty string is not permissible for FishName")]
EmptyFishName,
}
impl std::error::Error for ParseError {}
#[macro_export]
macro_rules! semantics {
($lit:tt) => {{
#[allow(dead_code)]
type X = $crate::assert_len!($lit, 1..);
use ::std::convert::TryFrom;
$crate::legacy::Semantics::try_from($lit).unwrap()
}};
}
#[macro_export]
macro_rules! fish_name {
($lit:tt) => {{
#[allow(dead_code)]
type X = $crate::assert_len!($lit, 1..);
use ::std::convert::TryFrom;
$crate::legacy::FishName::try_from($lit).unwrap()
}};
}
#[macro_export]
macro_rules! source_id {
($lit:tt) => {{
#[allow(dead_code)]
type X = $crate::assert_len!($lit, 1..=15);
use ::std::convert::TryFrom;
$crate::legacy::SourceId::try_from($lit).unwrap()
}};
}
pub(crate) const MAX_SOURCEID_LENGTH: usize = 15;
mk_scalar!(
struct Semantics, ParseError, crate::legacy::validate_semantics, "crate::scalar::nonempty_string"
);
pub fn validate_semantics(s: &str) -> Result<(), ParseError> {
if s.is_empty() {
return Err(ParseError::EmptySemantics);
}
Ok(())
}
impl Semantics {
pub fn unknown() -> Self {
semantics!("_t_")
}
}
mk_scalar!(
struct FishName, ParseError, crate::legacy::validate_fishname, "crate::scalar::nonempty_string"
);
pub fn validate_fishname(s: &str) -> Result<(), ParseError> {
if s.is_empty() {
return Err(ParseError::EmptySemantics);
}
Ok(())
}
impl FishName {
pub fn unknown() -> Self {
fish_name!("_t_")
}
}
impl TryFrom<&TagSet> for Semantics {
type Error = anyhow::Error;
fn try_from(value: &TagSet) -> Result<Self, Self::Error> {
let sem = value
.iter()
.filter(|t| t.to_string().starts_with("semantics:"))
.filter_map(|t| {
let t = t.to_string();
let pos = t.find(':')?;
Semantics::try_from(&t[pos + 1..]).ok()
})
.collect::<Vec<_>>();
if sem.len() == 1 {
sem.into_iter().next().ok_or_else(|| anyhow!("cannot happen"))
} else {
Err(anyhow!("no unique semantics tag found"))
}
}
}
impl TryFrom<&TagSet> for FishName {
type Error = anyhow::Error;
fn try_from(value: &TagSet) -> Result<Self, Self::Error> {
let names = value
.iter()
.filter(|t| t.to_string().starts_with("fish_name:"))
.filter_map(|t| {
let t = t.to_string();
let pos = t.find(':')?;
FishName::try_from(&t[pos + 1..]).ok()
})
.collect::<Vec<_>>();
if names.len() == 1 {
names.into_iter().next().ok_or_else(|| anyhow!("cannot happen"))
} else {
Err(anyhow!("no unique fish_name tag found"))
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "dataflow", derive(Abomonation))]
#[serde(rename_all = "camelCase")]
pub struct StreamInfo {
pub semantics: Semantics,
pub name: FishName,
pub source: SourceId,
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "dataflow", derive(Abomonation))]
pub struct SourceId(pub(crate) [u8; MAX_SOURCEID_LENGTH + 1]);
impl SourceId {
pub fn new(s: String) -> Result<Self, ParseError> {
Self::from_str(s.as_ref())
}
pub fn as_str(&self) -> &str {
let length = self.0[MAX_SOURCEID_LENGTH] as usize;
std::str::from_utf8(&self.0[0..length]).expect("content must be valid utf8 string")
}
pub fn is_wildcard(&self) -> bool {
self.is_empty()
}
pub fn is_empty(&self) -> bool {
self.0[MAX_SOURCEID_LENGTH] as usize == 0
}
}
impl Debug for SourceId {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(f, "SourceId({})", self.as_str())
}
}
impl TryFrom<&str> for SourceId {
type Error = ParseError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
Self::from_str(value)
}
}
impl FromStr for SourceId {
type Err = ParseError;
fn from_str(text: &str) -> Result<SourceId, ParseError> {
let bytes = text.as_bytes();
if bytes.len() > MAX_SOURCEID_LENGTH {
return Result::Err(ParseError::SourceIdTooLong);
}
if bytes.is_empty() {
return Result::Err(ParseError::EmptySourceId);
}
let mut buf = [0; MAX_SOURCEID_LENGTH + 1];
buf[MAX_SOURCEID_LENGTH] = bytes.len() as u8;
buf[..bytes.len()].clone_from_slice(bytes);
Result::Ok(SourceId(buf))
}
}
impl From<SourceId> for StreamId {
fn from(src: SourceId) -> Self {
Self::from(&src)
}
}
impl From<&SourceId> for StreamId {
fn from(src: &SourceId) -> Self {
let mut bytes = [0u8; 32];
bytes[0..=MAX_SOURCEID_LENGTH].copy_from_slice(&src.0[..]);
StreamId {
node_id: NodeId(bytes),
stream_nr: 0.into(),
}
}
}
impl Display for SourceId {
fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
write!(f, "{}", self.as_str())
}
}
impl<'de> Deserialize<'de> for SourceId {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<SourceId, D::Error> {
nonempty_string(deserializer).and_then(|arc| SourceId::try_from(&*arc).map_err(D::Error::custom))
}
}
impl Serialize for SourceId {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(self.as_str())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn deserialize() {
assert_eq!(
serde_json::from_str::<Semantics>(r#""abc""#).unwrap(),
semantics!("abc")
);
let res = serde_json::from_str::<Semantics>("\"\"").unwrap_err();
assert_eq!(res.to_string(), "expected non-empty string");
}
#[test]
fn deserialize_owned() {
assert_eq!(
serde_json::from_reader::<_, Semantics>(br#""abc""#.as_ref()).unwrap(),
semantics!("abc")
);
let res = serde_json::from_reader::<_, Semantics>(b"\"\"".as_ref()).unwrap_err();
assert_eq!(res.to_string(), "expected non-empty string");
assert_eq!(
serde_json::from_reader::<_, SourceId>(br#""abc""#.as_ref()).unwrap(),
source_id!("abc")
);
let res = serde_json::from_reader::<_, SourceId>(b"\"\"".as_ref()).unwrap_err();
assert_eq!(res.to_string(), "expected non-empty string");
}
#[test]
fn reject_empty_source_id() {
SourceId::from_str("").unwrap_err();
}
}