use std::sync::Arc;
use sim_kernel::{Cx, Event, Expr, Ref, Result, Symbol};
use sim_lib_stream_core::{StreamDiagnostic, StreamItem, StreamMetadata, StreamValue};
pub trait StreamNode: Send + Sync {
fn metadata(&self) -> &StreamMetadata;
fn next_packet(&self) -> Result<Option<StreamItem>>;
fn is_done(&self) -> Result<bool>;
}
#[derive(Clone)]
pub struct Stream {
inner: Arc<dyn StreamNode>,
}
impl Stream {
pub fn new(inner: impl StreamNode + 'static) -> Self {
Self {
inner: Arc::new(inner),
}
}
pub fn from_value(value: Arc<StreamValue>) -> Self {
Self::new(ValueStream { value })
}
pub fn pull(metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
Self::from_value(Arc::new(StreamValue::pull(metadata, items)))
}
pub fn metadata(&self) -> &StreamMetadata {
self.inner.metadata()
}
pub fn next_packet(&self) -> Result<Option<StreamItem>> {
self.inner.next_packet()
}
pub fn is_done(&self) -> Result<bool> {
self.inner.is_done()
}
pub fn take_packets(&self, limit: usize) -> Result<Vec<StreamItem>> {
let mut out = Vec::new();
for _ in 0..limit {
let Some(item) = self.next_packet()? else {
break;
};
out.push(item);
}
Ok(out)
}
pub fn run_events(&self, cx: &mut Cx, run: Ref, start_seq: u64) -> Result<Vec<Event>> {
let mut seq = start_seq;
let mut out = Vec::new();
while let Some(item) = self.next_packet()? {
out.push(item.chunk_event(cx, run.clone(), seq)?);
seq = seq.saturating_add(1);
}
if self.is_done()? {
out.push(Event::done(run, seq)?);
}
Ok(out)
}
pub fn map_data_expr<F>(self, f: F) -> Self
where
F: Fn(Expr) -> Result<Expr> + Send + Sync + 'static,
{
crate::ops::map_data_expr(self, f)
}
pub fn filter_data_kind(self, kind: Symbol) -> Self {
crate::ops::filter_data_kind(self, kind)
}
pub fn filter_data_shape<F>(self, matches: F) -> Self
where
F: Fn(&Expr) -> Result<bool> + Send + Sync + 'static,
{
crate::ops::filter_data_shape(self, matches)
}
pub fn window_by_count(self, count: usize) -> Self {
crate::ops::window_by_count(self, count)
}
pub fn tap_diagnostics<F>(self, f: F) -> Self
where
F: Fn(&StreamDiagnostic) -> Result<()> + Send + Sync + 'static,
{
crate::ops::tap_diagnostics(self, f)
}
}
struct ValueStream {
value: Arc<StreamValue>,
}
impl StreamNode for ValueStream {
fn metadata(&self) -> &StreamMetadata {
self.value.metadata()
}
fn next_packet(&self) -> Result<Option<StreamItem>> {
self.value.next_packet()
}
fn is_done(&self) -> Result<bool> {
self.value.is_done()
}
}