use crate::signal::{DerivedBitVecSignal, SignalMap, States};
use crate::vcd::{VcdBitVecChange, VcdOrError, decode_vcd_bit_vec_change};
use crate::viewers::HeaderResult;
use crate::wavemem::write_n_state_from_ascii;
use crate::{
Hierarchy, LoadOptions, Real, Result, SignalEncoding, SignalRef, SignalValue, SignalValueRef,
Time, WellenError, viewers,
};
use fst_reader::FstSignalValue;
use rustc_hash::FxHashSet;
use smallvec::{SmallVec, smallvec};
use std::fmt::{Debug, Formatter};
use std::io::{BufRead, Seek};
use std::sync::Arc;
#[derive(Debug, thiserror::Error)]
pub enum StreamError<E> {
Wellen(#[from] WellenError),
Callback(E),
}
pub type StreamResult<E> = std::result::Result<(), StreamError<E>>;
pub fn read_from_file<P: AsRef<std::path::Path>>(
filename: P,
options: &LoadOptions,
) -> Result<StreamingWaveform<std::io::BufReader<std::fs::File>>> {
viewers::read_header_from_file(filename, options).map(|r| r.into())
}
pub fn read<R: BufRead + Seek + Send + Sync + 'static>(
input: R,
options: &LoadOptions,
) -> Result<StreamingWaveform<R>> {
viewers::read_header(input, options).map(|r| r.into())
}
pub struct StreamingWaveform<R: BufRead + Seek> {
hierarchy: Hierarchy,
body: viewers::ReadBodyData<R>,
}
impl<R: BufRead + Seek> From<HeaderResult<R>> for StreamingWaveform<R> {
fn from(value: HeaderResult<R>) -> Self {
StreamingWaveform {
hierarchy: value.hierarchy,
body: value.body.0,
}
}
}
impl<R: BufRead + Seek> From<(Hierarchy, viewers::ReadBodyContinuation<R>)>
for StreamingWaveform<R>
{
fn from(value: (Hierarchy, viewers::ReadBodyContinuation<R>)) -> Self {
StreamingWaveform {
hierarchy: value.0,
body: value.1.0,
}
}
}
impl<R: BufRead + Seek> Debug for StreamingWaveform<R> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "stream::Waveform(...)")
}
}
#[derive(Debug, Copy, Clone)]
pub struct Filter<'a> {
pub start: Time,
pub end: Option<Time>,
pub signals: Option<&'a [SignalRef]>,
}
impl<'a> Filter<'a> {
pub fn all() -> Self {
Filter {
start: 0,
end: None,
signals: None,
}
}
pub fn new(start: u64, end: u64, signals: &'a [SignalRef]) -> Self {
Filter {
start,
end: Some(end),
signals: Some(signals),
}
}
pub fn include_signals(signals: &'a [SignalRef]) -> Self {
Filter {
start: 0,
end: None,
signals: Some(signals),
}
}
pub fn includes_signal(&self, signal: SignalRef) -> bool {
self.signals.map(|s| s.contains(&signal)).unwrap_or(true)
}
}
pub struct SignalValues {
values: std::sync::Arc<SignalMap<SignalValue>>,
}
impl SignalValues {
pub fn get(&self, signal: &SignalRef) -> Option<SignalValueRef<'_>> {
self.values.get(signal).map(|v| v.into())
}
}
impl<R: BufRead + Seek> StreamingWaveform<R> {
pub fn hierarchy(&self) -> &Hierarchy {
&self.hierarchy
}
pub fn stream_changes<E>(
&mut self,
filter: Filter,
callback: impl FnMut(Time, SignalRef, SignalValueRef<'_>) -> std::result::Result<(), E>,
) -> StreamResult<E> {
let (mut dispatcher, maybe_augmented_filter) =
StreamDispatcherOnChange::new(self.hierarchy(), &filter, callback);
let sub_filter = maybe_augmented_filter
.as_ref()
.map(|refs| Filter::include_signals(refs))
.unwrap_or(filter);
self.do_stream(
&sub_filter,
StreamEncoder::new(self.hierarchy(), &sub_filter, |time, signal, value| {
dispatcher.on_change(time, signal, value)
}),
)?;
dispatcher.finish().map_err(StreamError::Callback)?;
Ok(())
}
pub fn stream_time_steps<E>(
&mut self,
filter: Filter,
callback: impl FnMut(Time, SignalValues, &[SignalRef]) -> std::result::Result<(), E>,
) -> StreamResult<E> {
let (mut dispatcher, maybe_augmented_filter) =
StreamDispatcherOnTimeStep::new(self.hierarchy(), &filter, callback);
let sub_filter = maybe_augmented_filter
.as_ref()
.map(|refs| Filter::include_signals(refs))
.unwrap_or(filter);
self.do_stream(
&sub_filter,
StreamEncoder::new(self.hierarchy(), &sub_filter, |time, signal, value| {
dispatcher.on_change(time, signal, value)
}),
)?;
dispatcher.finish().map_err(StreamError::Callback)?;
Ok(())
}
fn do_stream<C: FnMut(Time, SignalRef, SignalValueRef<'_>) -> std::result::Result<(), E>, E>(
&mut self,
filter: &Filter,
enc: StreamEncoder<C, E>,
) -> StreamResult<E> {
match &mut self.body {
viewers::ReadBodyData::Vcd(data) => {
crate::vcd::stream_body(data, enc).map_err(|e| match e {
VcdOrError::Vcd(e) => WellenError::from(e).into(),
VcdOrError::Custom(e) => StreamError::Callback(e),
})?
}
viewers::ReadBodyData::Fst(data) => crate::fst::stream_body(data, enc, filter)?,
viewers::ReadBodyData::Ghw(_) => panic!("streaming GHW files is not supported"),
}
Ok(())
}
}
pub(crate) struct StreamEncoder<C, E>
where
C: FnMut(Time, SignalRef, SignalValueRef<'_>) -> std::result::Result<(), E>,
{
callback: C,
time: Option<Time>,
skipping_time_step: bool,
encoding: SignalMap<SignalEncoding>,
buf: SignalValueBuffer,
}
#[derive(Debug, Default)]
struct SignalValueBuffer {
data: Vec<u8>,
string: String,
kind: SignalKind,
}
#[derive(Debug, Default)]
enum SignalKind {
#[default]
Event,
String,
BitVec(States, u32),
Real(Real),
}
impl SignalValueBuffer {
fn update_fst(&mut self, encoding: SignalEncoding, value: &FstSignalValue) {
self.data.clear();
self.string.clear();
self.kind = match value {
FstSignalValue::String(value) => match encoding {
SignalEncoding::String => {
debug_assert!(self.string.is_empty());
self.string
.push_str(String::from_utf8_lossy(value).as_ref());
SignalKind::String
}
SignalEncoding::BitVector(0) => {
debug_assert!(value.is_empty(), "events do not carry data");
SignalKind::Event
}
SignalEncoding::BitVector(width) => {
debug_assert_eq!(
value.len(),
width as usize,
"{}",
String::from_utf8_lossy(value)
);
let states = States::from_ascii(value).unwrap_or_else(|| {
panic!(
"Unexpected signal value: {}",
String::from_utf8_lossy(value)
)
});
debug_assert!(self.data.is_empty());
write_n_state_from_ascii(states, value, &mut self.data, None);
SignalKind::BitVec(states, width)
}
SignalEncoding::Real => panic!(
"Expecting reals, but got: {}",
String::from_utf8_lossy(value)
),
},
FstSignalValue::Real(value) => {
debug_assert_eq!(encoding, SignalEncoding::Real);
SignalKind::Real(*value)
}
};
}
fn update_vcd(&mut self, encoding: SignalEncoding, value: &[u8]) {
self.data.clear();
self.string.clear();
self.kind = match encoding {
SignalEncoding::BitVector(0) => {
debug_assert!(
value.len() <= 1,
"event changes carry no value, or a 1-bit value"
);
SignalKind::Event
}
SignalEncoding::BitVector(width) => {
let (data, states) = decode_vcd_bit_vec_change(width, value);
debug_assert!(self.data.is_empty());
match data {
VcdBitVecChange::SingleBit(bit_value) => {
self.data.push(bit_value.into());
}
VcdBitVecChange::MultiBit(data_to_write) => {
write_n_state_from_ascii(states, &data_to_write, &mut self.data, None);
}
}
SignalKind::BitVec(states, width)
}
SignalEncoding::String => {
assert!(
matches!(value[0], b's' | b'S'),
"expected a string, not {}",
String::from_utf8_lossy(value)
);
let characters = &value[1..];
debug_assert!(self.string.is_empty());
self.string
.push_str(std::str::from_utf8(characters).unwrap());
SignalKind::String
}
SignalEncoding::Real => {
assert!(
matches!(value[0], b'r' | b'R'),
"expected a real, not {}",
String::from_utf8_lossy(value)
);
let float_value: Real = std::str::from_utf8(&value[1..])
.unwrap()
.parse::<Real>()
.unwrap();
SignalKind::Real(float_value)
}
};
}
}
impl<'a> From<&'a SignalValueBuffer> for SignalValueRef<'a> {
fn from(value: &'a SignalValueBuffer) -> Self {
match value.kind {
SignalKind::Event => SignalValueRef::Event,
SignalKind::String => SignalValueRef::String(&value.string),
SignalKind::BitVec(states, width) => {
SignalValueRef::bit_vec(states, width, &value.data)
}
SignalKind::Real(value) => SignalValueRef::Real(value),
}
}
}
impl<C, E> StreamEncoder<C, E>
where
C: FnMut(Time, SignalRef, SignalValueRef<'_>) -> std::result::Result<(), E>,
{
pub(crate) fn new(hierarchy: &Hierarchy, filter: &Filter, callback: C) -> Self {
let encoding = match filter.signals {
None => hierarchy
.ground_signal_encodings()
.collect::<Vec<_>>()
.into(),
Some([]) => SignalMap::sparse(),
Some(signals) => {
SignalMap::from_iter(
signals
.iter()
.filter(|&&s| !hierarchy.is_derived_signal(s))
.map(|&s| (s, hierarchy.get_signal_tpe(s).unwrap())),
)
}
};
Self {
callback,
time: Default::default(),
skipping_time_step: false,
encoding,
buf: SignalValueBuffer::default(),
}
}
fn get_encoding(&self, id: u64) -> Option<SignalEncoding> {
self.encoding.get_index(id).cloned()
}
pub(crate) fn fst_value_change(
&mut self,
time: u64,
id: u64,
value: &FstSignalValue,
) -> std::result::Result<(), E> {
debug_assert!(
!self.skipping_time_step,
"fst reader should filter out time steps"
);
if self.time.is_none_or(|t| time > t) {
self.time_change(time);
}
if let Some(encoding) = self.get_encoding(id) {
let signal_ref = SignalRef::from_index(id as usize).unwrap();
self.buf.update_fst(encoding, value);
(self.callback)(time, signal_ref, (&self.buf).into())?;
}
Ok(())
}
pub(crate) fn vcd_value_change(&mut self, id: u64, value: &[u8]) -> std::result::Result<(), E> {
if self.skipping_time_step {
return Ok(());
}
if let Some(encoding) = self.get_encoding(id) {
let signal_ref = SignalRef::from_index(id as usize).unwrap();
let time = self.time.unwrap();
self.buf.update_vcd(encoding, value);
(self.callback)(time, signal_ref, (&self.buf).into())?;
}
Ok(())
}
pub(crate) fn time_change(&mut self, time: u64) {
if let Some(prev_time) = self.time {
match prev_time.cmp(&time) {
std::cmp::Ordering::Equal => {
return; }
std::cmp::Ordering::Greater => {
println!("WARN: time decreased from {prev_time} to {time}. Skipping!");
self.skipping_time_step = true;
return;
}
std::cmp::Ordering::Less => {
}
}
}
self.time = Some(time);
self.skipping_time_step = false;
}
pub(crate) fn time_is_none(&self) -> bool {
self.time.is_none()
}
pub fn finish(&mut self) {
}
}
struct StreamDerivedSignalInfo {
requested: Option<FxHashSet<SignalRef>>,
to_derived: SignalMap<SmallVec<[SignalRef; 4]>>,
transforms: SignalMap<DerivedBitVecSignal>,
new_filter: Option<Vec<SignalRef>>,
}
impl StreamDerivedSignalInfo {
fn compute(hierarchy: &Hierarchy, filter: &Filter) -> Self {
let mut transforms = SignalMap::sparse();
let mut extras = vec![];
let requested = match filter.signals {
None => {
transforms = SignalMap::from_iter(
hierarchy.all_derived_signals().map(|(s, t)| (s, t.clone())),
);
None
}
Some([]) => Some(FxHashSet::default()),
Some(signals) => {
let requested = FxHashSet::from_iter(signals.iter().cloned());
for &signal in signals {
if let Some(transform) = hierarchy.get_derived_signal(signal) {
debug_assert!(hierarchy.is_derived_signal(signal));
for &input in transform.inputs() {
if !requested.contains(&input) {
extras.push(input);
}
}
transforms.insert(signal, transform.clone());
} else {
debug_assert!(!hierarchy.is_derived_signal(signal));
}
}
Some(requested)
}
};
let mut to_derived = SignalMap::sparse();
for (&signal_ref, transform) in transforms.iter() {
for &input in transform.inputs() {
to_derived
.entry(input)
.or_insert_with(|| smallvec![])
.push(signal_ref);
}
}
let new_filter = if !extras.is_empty()
&& let Some(orig) = filter.signals
{
extras.sort();
extras.dedup();
let mut new_signals: Vec<_> = orig
.iter()
.filter(|&&s| !hierarchy.is_derived_signal(s))
.cloned()
.collect();
new_signals.append(&mut extras);
Some(new_signals)
} else {
None
};
Self {
requested,
to_derived,
transforms,
new_filter,
}
}
}
struct StreamDispatcherOnChange<C, E>
where
C: FnMut(Time, SignalRef, SignalValueRef<'_>) -> std::result::Result<(), E>,
{
callback: C,
time: Option<Time>,
requested: Option<FxHashSet<SignalRef>>,
values: SignalMap<SignalValue>,
to_derived: SignalMap<SmallVec<[SignalRef; 4]>>,
has_changed: FxHashSet<SignalRef>,
transforms: SignalMap<DerivedBitVecSignal>,
}
impl<C, E> StreamDispatcherOnChange<C, E>
where
C: FnMut(Time, SignalRef, SignalValueRef<'_>) -> std::result::Result<(), E>,
{
fn new(hierarchy: &Hierarchy, filter: &Filter, callback: C) -> (Self, Option<Vec<SignalRef>>) {
let info = StreamDerivedSignalInfo::compute(hierarchy, filter);
let new_filter = info.new_filter;
let out = Self {
callback,
time: None,
requested: info.requested,
values: SignalMap::sparse(),
to_derived: info.to_derived,
has_changed: Default::default(),
transforms: info.transforms,
};
(out, new_filter)
}
fn on_change(
&mut self,
time: Time,
signal: SignalRef,
value: SignalValueRef,
) -> std::result::Result<(), E> {
if let Some(prev) = self.time
&& time > prev
{
self.emit_derived_signal_changes()?;
}
self.time = Some(time);
self.update_derived(signal, value);
self.dispatch_change(time, signal, value)
}
#[inline]
fn update_derived(&mut self, signal_ref: SignalRef, value: SignalValueRef) {
if let Some(derived) = self.to_derived.get(&signal_ref) {
self.values.insert(signal_ref, value.into());
for &signal in derived.iter() {
self.has_changed.insert(signal);
}
}
}
#[inline]
fn dispatch_change(
&mut self,
time: Time,
signal_ref: SignalRef,
value: SignalValueRef,
) -> std::result::Result<(), E> {
if self
.requested
.as_ref()
.map(|r| r.contains(&signal_ref))
.unwrap_or(true)
{
(self.callback)(time, signal_ref, value)
} else {
Ok(())
}
}
fn emit_derived_signal_changes(&mut self) -> std::result::Result<(), E> {
if !self.has_changed.is_empty() {
let time = self
.time
.expect("time cannot be None when there are changes");
for signal in self.has_changed.drain() {
let t = &self.transforms.get(&signal).unwrap();
let inputs: Vec<_> = t
.inputs()
.iter()
.map(|i| {
self.values
.get(i)
.map(|v| SignalValueRef::from(v).as_bit_vec().unwrap())
})
.collect();
let value = t.on_change(&inputs);
(self.callback)(time, signal, (&value).into())?;
}
}
Ok(())
}
pub(crate) fn finish(&mut self) -> std::result::Result<(), E> {
self.emit_derived_signal_changes()
}
}
struct StreamDispatcherOnTimeStep<C, E>
where
C: FnMut(Time, SignalValues, &[SignalRef]) -> std::result::Result<(), E>,
{
callback: C,
time: Option<Time>,
values: Arc<SignalMap<SignalValue>>,
to_derived: SignalMap<SmallVec<[SignalRef; 4]>>,
transforms: SignalMap<DerivedBitVecSignal>,
derived_input_has_changed: FxHashSet<SignalRef>,
changes: Vec<SignalRef>,
}
impl<C, E> StreamDispatcherOnTimeStep<C, E>
where
C: FnMut(Time, SignalValues, &[SignalRef]) -> std::result::Result<(), E>,
{
fn new(hierarchy: &Hierarchy, filter: &Filter, callback: C) -> (Self, Option<Vec<SignalRef>>) {
let info = StreamDerivedSignalInfo::compute(hierarchy, filter);
let new_filter = info.new_filter;
let values = std::sync::Arc::new(if filter.signals.is_none() {
SignalMap::dense()
} else {
SignalMap::sparse()
});
let out = Self {
callback,
time: None,
values,
to_derived: info.to_derived,
transforms: info.transforms,
derived_input_has_changed: FxHashSet::default(),
changes: vec![],
};
(out, new_filter)
}
fn on_change(
&mut self,
time: Time,
signal: SignalRef,
value: SignalValueRef,
) -> std::result::Result<(), E> {
if let Some(prev) = self.time
&& time > prev
{
self.dispatch()?;
}
self.time = Some(time);
let changed = self
.values
.get(&signal)
.map(|old| SignalValueRef::from(old) != value)
.unwrap_or(true);
if changed {
Arc::make_mut(&mut self.values).insert(signal, value.into());
if let Some(derived) = self.to_derived.get(&signal) {
for &signal in derived.iter() {
self.derived_input_has_changed.insert(signal);
}
}
self.changes.push(signal);
} else if value.is_event() {
self.changes.push(signal);
}
Ok(())
}
fn dispatch(&mut self) -> std::result::Result<(), E> {
if !self.changes.is_empty() {
self.update_derived_signal_changes();
let time = self
.time
.expect("dispatch should only be called when we know the time");
(self.callback)(
time,
SignalValues {
values: self.values.clone(),
},
&self.changes,
)?;
self.changes.clear();
}
Ok(())
}
fn update_derived_signal_changes(&mut self) {
if !self.derived_input_has_changed.is_empty() {
for signal in self.derived_input_has_changed.drain() {
let t = &self.transforms.get(&signal).unwrap();
let inputs: Vec<_> = t
.inputs()
.iter()
.map(|i| {
self.values
.get(i)
.map(|v| SignalValueRef::from(v).as_bit_vec().unwrap())
})
.collect();
let value: SignalValue = t.on_change(&inputs).into();
let changed = self
.values
.get(&signal)
.map(|old| SignalValueRef::from(old) != SignalValueRef::from(&value))
.unwrap_or(true);
if changed {
Arc::make_mut(&mut self.values).insert(signal, value);
self.changes.push(signal);
}
}
}
}
pub(crate) fn finish(&mut self) -> std::result::Result<(), E> {
if self.time.is_some() {
self.dispatch()?;
}
Ok(())
}
}