use crate::{
physical::{BitCount, Complexity, Fields, PhysicalStream},
util::log2_ceil,
Error, Name, NonNegative, PathName, Positive, PositiveReal, Result, Reverse,
};
use indexmap::IndexMap;
use std::str::FromStr;
use std::{
convert::{TryFrom, TryInto},
error,
};
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Direction {
Forward,
Reverse,
}
impl Default for Direction {
fn default() -> Self {
Direction::Forward
}
}
impl FromStr for Direction {
type Err = Error;
fn from_str(input: &str) -> Result<Self> {
match input {
"Forward" => Ok(Direction::Forward),
"Reverse" => Ok(Direction::Reverse),
_ => Err(Error::InvalidArgument(format!(
"{} is not a valid Direction",
input
))),
}
}
}
impl Reverse for Direction {
fn reverse(&mut self) {
*self = match self {
Direction::Forward => Direction::Reverse,
Direction::Reverse => Direction::Forward,
};
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Synchronicity {
Sync,
Flatten,
Desync,
FlatDesync,
}
impl Default for Synchronicity {
fn default() -> Self {
Synchronicity::Sync
}
}
impl FromStr for Synchronicity {
type Err = Error;
fn from_str(input: &str) -> Result<Self> {
match input {
"Sync" => Ok(Synchronicity::Sync),
"Flatten" => Ok(Synchronicity::Flatten),
"Desync" => Ok(Synchronicity::Desync),
"FlatDesync" => Ok(Synchronicity::FlatDesync),
_ => Err(Error::InvalidArgument(format!(
"{} is not a valid Synchronicity",
input
))),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Stream {
data: Box<LogicalStreamType>,
throughput: PositiveReal,
dimensionality: NonNegative,
synchronicity: Synchronicity,
complexity: Complexity,
direction: Direction,
user: Option<Box<LogicalStreamType>>,
keep: bool,
}
impl Reverse for Stream {
fn reverse(&mut self) {
self.direction.reverse();
}
}
impl Stream {
#[allow(clippy::too_many_arguments)]
pub fn new(
data: LogicalStreamType,
throughput: PositiveReal,
dimensionality: NonNegative,
synchronicity: Synchronicity,
complexity: impl Into<Complexity>,
direction: Direction,
user: Option<LogicalStreamType>,
keep: bool,
) -> Self {
Stream {
data: Box::new(data),
throughput,
dimensionality,
synchronicity,
complexity: complexity.into(),
direction,
user: user.map(Box::new),
keep,
}
}
pub fn new_basic(data: LogicalStreamType) -> Self {
Stream {
data: Box::new(data),
throughput: PositiveReal::new(1.).unwrap(),
dimensionality: 0,
synchronicity: Synchronicity::Sync,
complexity: Complexity::default(),
direction: Direction::Forward,
user: None,
keep: false,
}
}
pub fn data(&self) -> &LogicalStreamType {
&self.data
}
pub fn direction(&self) -> Direction {
self.direction
}
pub fn synchronicity(&self) -> Synchronicity {
self.synchronicity
}
pub fn dimensionality(&self) -> NonNegative {
self.dimensionality
}
pub fn throughput(&self) -> PositiveReal {
self.throughput
}
pub fn is_null(&self) -> bool {
self.data.is_null()
&& (self.user.is_some() && self.user.as_ref().unwrap().is_null())
&& !self.keep
}
fn set_throughput(&mut self, throughput: PositiveReal) {
self.throughput = throughput;
}
fn set_synchronicity(&mut self, synchronicity: Synchronicity) {
self.synchronicity = synchronicity;
}
fn set_dimensionality(&mut self, dimensionality: NonNegative) {
self.dimensionality = dimensionality;
}
}
impl From<Stream> for LogicalStreamType {
fn from(stream: Stream) -> Self {
LogicalStreamType::Stream(stream)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Group(IndexMap<Name, LogicalStreamType>);
impl Group {
pub fn try_new(
group: impl IntoIterator<
Item = (
impl TryInto<Name, Error = impl Into<Box<dyn error::Error>>>,
impl TryInto<LogicalStreamType, Error = impl Into<Box<dyn error::Error>>>,
),
>,
) -> Result<Self> {
let mut map = IndexMap::new();
for (name, stream) in group
.into_iter()
.map(
|(name, stream)| match (name.try_into(), stream.try_into()) {
(Ok(name), Ok(stream)) => Ok((name, stream)),
(Err(name), _) => Err(Error::from(name.into())),
(_, Err(stream)) => Err(Error::from(stream.into())),
},
)
.collect::<Result<Vec<_>>>()?
{
map.insert(name, stream)
.map(|_| -> Result<()> { Err(Error::UnexpectedDuplicate) })
.transpose()?;
}
Ok(Group(map))
}
pub fn iter(&self) -> impl Iterator<Item = (&Name, &LogicalStreamType)> {
self.0.iter()
}
}
impl From<Group> for LogicalStreamType {
fn from(group: Group) -> Self {
LogicalStreamType::Group(group)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Union(IndexMap<Name, LogicalStreamType>);
impl Union {
pub fn try_new(
union: impl IntoIterator<
Item = (
impl TryInto<Name, Error = impl Into<Box<dyn error::Error>>>,
impl TryInto<LogicalStreamType, Error = impl Into<Box<dyn error::Error>>>,
),
>,
) -> Result<Self> {
let mut map = IndexMap::new();
for (name, stream) in union
.into_iter()
.map(
|(name, stream)| match (name.try_into(), stream.try_into()) {
(Ok(name), Ok(stream)) => Ok((name, stream)),
(Err(name), _) => Err(Error::from(name.into())),
(_, Err(stream)) => Err(Error::from(stream.into())),
},
)
.collect::<Result<Vec<_>>>()?
{
map.insert(name, stream)
.map(|_| -> Result<()> { Err(Error::UnexpectedDuplicate) })
.transpose()?;
}
Ok(Union(map))
}
pub fn tag(&self) -> Option<(String, BitCount)> {
if self.0.len() > 1 {
Some((
"tag".to_string(),
BitCount::new(log2_ceil(
BitCount::new(self.0.len() as NonNegative).unwrap(),
))
.unwrap(),
))
} else {
None
}
}
pub fn iter(&self) -> impl Iterator<Item = (&Name, &LogicalStreamType)> {
self.0.iter()
}
}
impl From<Union> for LogicalStreamType {
fn from(union: Union) -> Self {
LogicalStreamType::Union(union)
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum LogicalStreamType {
Null,
Bits(Positive),
Group(Group),
Union(Union),
Stream(Stream),
}
impl TryFrom<NonNegative> for LogicalStreamType {
type Error = Error;
fn try_from(bit_count: NonNegative) -> Result<Self> {
LogicalStreamType::try_new_bits(bit_count)
}
}
impl From<Positive> for LogicalStreamType {
fn from(bit_count: Positive) -> Self {
LogicalStreamType::Bits(bit_count)
}
}
impl LogicalStreamType {
pub fn try_new_bits(bit_count: NonNegative) -> Result<Self> {
Ok(LogicalStreamType::Bits(
Positive::new(bit_count)
.ok_or_else(|| Error::InvalidArgument("bit count cannot be zero".to_string()))?,
))
}
pub fn try_new_group(
group: impl IntoIterator<
Item = (
impl TryInto<Name, Error = impl Into<Box<dyn error::Error>>>,
impl TryInto<LogicalStreamType, Error = impl Into<Box<dyn error::Error>>>,
),
>,
) -> Result<Self> {
Group::try_new(group).map(Into::into)
}
pub fn try_new_union(
union: impl IntoIterator<
Item = (
impl TryInto<Name, Error = impl Into<Box<dyn error::Error>>>,
impl TryInto<LogicalStreamType, Error = impl Into<Box<dyn error::Error>>>,
),
>,
) -> Result<Self> {
Union::try_new(union).map(Into::into)
}
pub fn is_element_only(&self) -> bool {
match self {
LogicalStreamType::Null | LogicalStreamType::Bits(_) => true,
LogicalStreamType::Group(Group(fields)) | LogicalStreamType::Union(Union(fields)) => {
fields.values().all(|stream| stream.is_element_only())
}
LogicalStreamType::Stream(stream) => stream.data.is_element_only(),
}
}
pub fn is_null(&self) -> bool {
match self {
LogicalStreamType::Null => true,
LogicalStreamType::Group(Group(fields)) => {
fields.values().all(|stream| stream.is_null())
}
LogicalStreamType::Union(Union(fields)) => {
fields.len() == 1 && fields.values().all(|stream| stream.is_null())
}
LogicalStreamType::Stream(stream) => stream.is_null(),
LogicalStreamType::Bits(_) => false,
}
}
pub(crate) fn split(&self) -> SplitStreams {
match self {
LogicalStreamType::Stream(stream_in) => {
let mut streams = IndexMap::new();
let split = stream_in.data.split();
let (element, rest) = (split.signals, split.streams);
if !element.is_null()
|| (stream_in.user.is_some() && !stream_in.user.as_ref().unwrap().is_null())
|| stream_in.keep
{
streams.insert(
PathName::new_empty(),
Stream::new(
element,
stream_in.throughput,
stream_in.dimensionality,
stream_in.synchronicity,
stream_in.complexity.clone(),
stream_in.direction,
stream_in.user.clone().map(|stream| *stream),
stream_in.keep,
)
.into(),
);
}
streams.extend(rest.into_iter().map(|(name, stream)| match stream {
LogicalStreamType::Stream(mut stream) => {
if stream_in.direction == Direction::Reverse {
stream.reverse();
}
if stream_in.synchronicity == Synchronicity::Flatten
|| stream_in.synchronicity == Synchronicity::FlatDesync
{
stream.set_synchronicity(Synchronicity::FlatDesync);
}
if stream.synchronicity != Synchronicity::Flatten
&& stream_in.synchronicity != Synchronicity::FlatDesync
{
stream.set_dimensionality(
stream.dimensionality + stream_in.dimensionality,
);
};
stream.set_throughput(stream.throughput * stream_in.throughput);
(name, stream.into())
}
_ => unreachable!(),
}));
SplitStreams {
signals: LogicalStreamType::Null,
streams,
}
}
LogicalStreamType::Null | LogicalStreamType::Bits(_) => SplitStreams {
signals: self.clone(),
streams: IndexMap::new(),
},
LogicalStreamType::Group(Group(fields)) | LogicalStreamType::Union(Union(fields)) => {
let signals = fields
.into_iter()
.map(|(name, stream)| (name.clone(), stream.split().signals))
.collect();
SplitStreams {
signals: match self {
LogicalStreamType::Group(_) => LogicalStreamType::Group(Group(signals)),
LogicalStreamType::Union(_) => LogicalStreamType::Union(Union(signals)),
_ => unreachable!(),
},
streams: fields
.into_iter()
.map(|(name, stream)| {
stream.split().streams.into_iter().map(
move |(mut path_name, stream_)| {
path_name.push(name.clone());
(path_name, stream_)
},
)
})
.flatten()
.collect(),
}
}
}
}
pub fn fields(&self) -> Fields {
let mut fields = Fields::new_empty();
match self {
LogicalStreamType::Null | LogicalStreamType::Stream(_) => fields,
LogicalStreamType::Bits(b) => {
fields.insert(PathName::new_empty(), *b).unwrap();
fields
}
LogicalStreamType::Group(Group(inner)) => {
inner.iter().for_each(|(name, stream)| {
stream.fields().iter().for_each(|(path_name, bit_count)| {
fields
.insert(path_name.with_parent(name.clone()), *bit_count)
.unwrap();
})
});
fields
}
LogicalStreamType::Union(Union(inner)) => {
if inner.len() > 1 {
fields
.insert(
PathName::try_new(vec!["tag"]).unwrap(),
BitCount::new(log2_ceil(
BitCount::new(inner.len() as NonNegative).unwrap(),
))
.unwrap(),
)
.unwrap();
}
let b = inner.iter().fold(0, |acc, (_, stream)| {
acc.max(
stream
.fields()
.values()
.fold(0, |acc, count| acc.max(count.get())),
)
});
if b > 0 {
fields
.insert(
PathName::try_new(vec!["union"]).unwrap(),
BitCount::new(b).unwrap(),
)
.unwrap();
}
fields
}
}
}
pub fn synthesize(&self) -> LogicalStream {
let split = self.split();
let (signals, rest) = (split.signals.fields(), split.streams);
LogicalStream {
signals,
streams: rest
.into_iter()
.map(|(path_name, stream)| match stream {
LogicalStreamType::Stream(stream) => (
path_name,
PhysicalStream::new(
stream.data.fields(),
Positive::new(stream.throughput.get().ceil() as NonNegative).unwrap(),
stream.dimensionality,
stream.complexity,
stream
.user
.map(|stream| stream.fields())
.unwrap_or_else(Fields::new_empty),
),
),
_ => unreachable!(),
})
.collect(),
}
}
pub fn compatible(&self, other: &LogicalStreamType) -> bool {
self == other
|| match other {
LogicalStreamType::Stream(other) => match self {
LogicalStreamType::Stream(stream) => {
stream.data.compatible(&other.data) && stream.complexity < other.complexity
}
_ => false,
},
_ => false,
}
|| match self {
LogicalStreamType::Group(Group(source))
| LogicalStreamType::Union(Union(source)) => match other {
LogicalStreamType::Group(Group(sink))
| LogicalStreamType::Union(Union(sink)) => {
source.len() == sink.len()
&& source.iter().zip(sink.iter()).all(
|((name, stream), (name_, stream_))| {
name == name_ && stream.compatible(&stream_)
},
)
}
_ => false,
},
_ => false,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct SplitStreams {
signals: LogicalStreamType,
streams: IndexMap<PathName, LogicalStreamType>,
}
impl SplitStreams {
pub fn streams(&self) -> impl Iterator<Item = (&PathName, &LogicalStreamType)> {
self.streams.iter()
}
pub fn signal(&self) -> &LogicalStreamType {
&self.signals
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct LogicalStream {
signals: Fields,
streams: IndexMap<PathName, PhysicalStream>,
}
impl LogicalStream {
pub fn signals(&self) -> impl Iterator<Item = (&PathName, &BitCount)> {
self.signals.iter()
}
pub fn streams(&self) -> impl Iterator<Item = (&PathName, &PhysicalStream)> {
self.streams.iter()
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
pub(crate) mod elements {
use super::*;
pub(crate) fn prim(bits: u32) -> LogicalStreamType {
LogicalStreamType::try_new_bits(bits).unwrap()
}
pub(crate) fn group() -> LogicalStreamType {
LogicalStreamType::try_new_group(vec![("a", prim(42)), ("b", prim(1337))]).unwrap()
}
pub(crate) fn group_of_single() -> LogicalStreamType {
LogicalStreamType::try_new_group(vec![("a", prim(42))]).unwrap()
}
pub(crate) fn group_nested() -> LogicalStreamType {
LogicalStreamType::try_new_group(vec![("c", group()), ("d", group())]).unwrap()
}
}
pub(crate) mod streams {
use super::*;
pub(crate) fn prim(bits: u32) -> LogicalStreamType {
LogicalStreamType::from(Stream::new_basic(elements::prim(bits)))
}
pub(crate) fn group() -> LogicalStreamType {
LogicalStreamType::try_new_group(vec![("a", prim(42)), ("b", prim(1337))]).unwrap()
}
pub(crate) fn nested() -> LogicalStreamType {
LogicalStreamType::from(Stream::new_basic(LogicalStreamType::from(Stream {
data: Box::new(elements::prim(8)),
throughput: PositiveReal::new(1.).unwrap(),
dimensionality: 1,
synchronicity: Synchronicity::Sync,
complexity: Complexity::default(),
direction: Direction::Forward,
user: None,
keep: false,
})))
}
}
#[test]
fn union() -> Result<()> {
let b = LogicalStreamType::try_new_group(vec![("x", 2), ("y", 2)])?;
let c = Stream::new(
LogicalStreamType::Bits(Positive::new(4).unwrap()),
PositiveReal::new(1.).unwrap(),
1,
Synchronicity::Sync,
1,
Direction::Forward,
None,
false,
);
let u = LogicalStreamType::try_new_union(vec![
("a", 3.try_into()?),
("b", b.clone()),
("c", LogicalStreamType::Stream(c)),
])?;
let stream: LogicalStreamType = Stream::new(
u,
PositiveReal::new(1.).unwrap(),
1,
Synchronicity::Sync,
1,
Direction::Forward,
None,
false,
)
.into();
let logical_stream = stream.synthesize();
assert_eq!(logical_stream.streams.len(), 2);
assert_eq!(
logical_stream.streams.keys().collect::<Vec<_>>(),
vec![&PathName::new_empty(), &PathName::try_new(vec!["c"])?]
);
assert_eq!(
logical_stream
.streams
.values()
.map(|physical_stream| physical_stream.element_fields().iter())
.flatten()
.collect::<Vec<_>>(),
vec![
(&PathName::try_new(vec!["tag"])?, &Positive::new(2).unwrap()),
(
&PathName::try_new(vec!["union"])?,
&Positive::new(3).unwrap()
),
(&PathName::new_empty(), &Positive::new(4).unwrap()),
]
);
assert_eq!(
logical_stream
.streams
.values()
.map(|physical_stream| physical_stream.dimensionality())
.collect::<Vec<_>>(),
vec![1, 2]
);
let c = Stream::new(
LogicalStreamType::Bits(Positive::new(4).unwrap()),
PositiveReal::new(1.).unwrap(),
1,
Synchronicity::Flatten,
1,
Direction::Forward,
None,
false,
);
let u = LogicalStreamType::try_new_union(vec![
("a", 3.try_into()?),
("b", b.clone()),
("c", c.into()),
])?;
let stream: LogicalStreamType = Stream::new(
u,
PositiveReal::new(1.).unwrap(),
1,
Synchronicity::Sync,
1,
Direction::Forward,
None,
false,
)
.into();
let logical_stream = stream.synthesize();
assert_eq!(
logical_stream
.streams
.values()
.map(|physical_stream| physical_stream.dimensionality())
.collect::<Vec<_>>(),
vec![1, 1]
);
let c = Stream::new(
LogicalStreamType::Bits(Positive::new(4).unwrap()),
PositiveReal::new(1.).unwrap(),
1,
Synchronicity::Desync,
1,
Direction::Forward,
None,
false,
);
let u = LogicalStreamType::try_new_union(vec![
("a", 3.try_into()?),
("b", b),
("c", c.into()),
])?;
let stream: LogicalStreamType = Stream::new(
u,
PositiveReal::new(1.).unwrap(),
1,
Synchronicity::Sync,
1,
Direction::Forward,
None,
false,
)
.into();
let logical_stream = stream.synthesize();
assert_eq!(
logical_stream
.streams
.values()
.map(|physical_stream| physical_stream.dimensionality())
.collect::<Vec<_>>(),
vec![1, 2]
);
Ok(())
}
}