use std::fmt::{Display, Formatter};
use serde::{Deserialize, Serialize};
use crate::block::NextStrategy;
use crate::operator::{ExchangeData, KeyerFn};
use crate::scheduler::BlockId;
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct DataType(String);
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct BlockStructure {
pub operators: Vec<OperatorStructure>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OperatorStructure {
pub title: String,
pub subtitle: String,
pub kind: OperatorKind,
pub receivers: Vec<OperatorReceiver>,
pub connections: Vec<Connection>,
pub out_type: DataType,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum OperatorKind {
Operator,
Sink,
Source,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OperatorReceiver {
pub previous_block_id: BlockId,
pub data_type: DataType,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Connection {
pub to_block_id: BlockId,
pub data_type: DataType,
pub strategy: ConnectionStrategy,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ConnectionStrategy {
OnlyOne,
Random,
GroupBy,
All,
}
impl DataType {
pub fn of<T: ?Sized>() -> Self {
let type_name = std::any::type_name::<T>();
Self(DataType::clean_str(type_name))
}
fn clean_str(s: &str) -> String {
let mut result = "".to_string();
let mut current_ident = "".to_string();
for c in s.chars() {
if c.is_alphanumeric() {
current_ident.push(c);
} else if c == ':' {
current_ident = "".to_string();
} else {
if !current_ident.is_empty() {
result += ¤t_ident;
current_ident = "".to_string();
}
result.push(c);
}
}
if !current_ident.is_empty() {
result += ¤t_ident;
}
result
}
}
impl Display for DataType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl BlockStructure {
pub fn add_operator(mut self, operator: OperatorStructure) -> Self {
self.operators.push(operator);
self
}
}
impl OperatorStructure {
pub fn new<Out: ?Sized, S: Into<String>>(title: S) -> Self {
Self {
title: title.into(),
subtitle: "".into(),
kind: OperatorKind::Operator,
receivers: Default::default(),
connections: Default::default(),
out_type: DataType::of::<Out>(),
}
}
}
impl OperatorReceiver {
pub fn new<T: ?Sized>(previous_block_id: BlockId) -> Self {
Self {
previous_block_id,
data_type: DataType::of::<T>(),
}
}
}
impl Connection {
pub(crate) fn new<T: ExchangeData, IndexFn>(
to_block_id: BlockId,
strategy: &NextStrategy<T, IndexFn>,
) -> Self
where
IndexFn: KeyerFn<u64, T>,
{
Self {
to_block_id,
data_type: DataType::of::<T>(),
strategy: strategy.into(),
}
}
}
impl<Out: ExchangeData, IndexFn> From<&NextStrategy<Out, IndexFn>> for ConnectionStrategy
where
IndexFn: KeyerFn<u64, Out>,
{
fn from(strategy: &NextStrategy<Out, IndexFn>) -> Self {
match strategy {
NextStrategy::OnlyOne => ConnectionStrategy::OnlyOne,
NextStrategy::Random => ConnectionStrategy::Random,
NextStrategy::GroupBy(_, _) => ConnectionStrategy::GroupBy,
NextStrategy::All => ConnectionStrategy::All,
}
}
}
#[cfg(test)]
mod tests {
use crate::block::DataType;
#[test]
fn test_data_type_clean() {
let dataset = [
("aaa", "aaa"),
("aaa::bbb::ccc", "ccc"),
("(aaa, bbb::ccc::ddd)", "(aaa, ddd)"),
("aaa<bbb>", "aaa<bbb>"),
("aaa::bbb::ccc<ddd::eee>", "ccc<eee>"),
("aaa::bbb<ccc::ddd<eee>>", "bbb<ddd<eee>>"),
];
for (input, expected) in &dataset {
assert_eq!(&DataType::clean_str(input), expected);
}
}
}