use std::ops::RangeBounds;
use std::sync::{Arc, Mutex};
use sim_kernel::{
Cx, Error, Event, EventKind, EventLedger, Ref, Result, Severity, Symbol, Tick, value_from_ref,
};
use sim_lib_stream_core::{
StreamCassette, StreamDiagnostic, StreamItem, StreamMetadata, StreamPacket, StreamStats,
TransportProfile,
};
use crate::stream::{Stream, StreamNode};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamRecording {
metadata: StreamMetadata,
items: Vec<StreamItem>,
}
impl StreamRecording {
pub fn new(metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
Self { metadata, items }
}
pub fn metadata(&self) -> &StreamMetadata {
&self.metadata
}
pub fn items(&self) -> &[StreamItem] {
&self.items
}
pub fn len(&self) -> usize {
self.items.len()
}
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
pub fn replay(&self) -> Stream {
replay(self)
}
pub fn seek(&self, target: SeekTarget) -> Stream {
seek(self.replay(), target)
}
pub fn cassette(&self, profile: TransportProfile) -> Result<StreamCassette> {
StreamCassette::from_items(
self.metadata.clone(),
self.items.clone(),
profile,
StreamStats {
yielded: self.items.len() as u64,
..StreamStats::default()
},
)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SeekTarget {
PacketIndex(usize),
ClockIndex {
clock: Symbol,
index: Ref,
},
}
impl SeekTarget {
pub fn packet_index(index: usize) -> Self {
Self::PacketIndex(index)
}
pub fn clock_index(clock: Symbol, index: Ref) -> Self {
Self::ClockIndex { clock, index }
}
}
pub fn record_bang(source: &Stream) -> Result<StreamRecording> {
let mut items = Vec::new();
while let Some(item) = source.next_packet()? {
items.push(item);
}
if !source.is_done()? {
return Err(Error::Eval(
"cannot record a stream that has not reached done".to_owned(),
));
}
Ok(StreamRecording::new(source.metadata().clone(), items))
}
pub fn replay(recording: &StreamRecording) -> Stream {
Stream::pull(recording.metadata.clone(), recording.items.clone())
}
pub fn record_cassette_bang(source: &Stream, profile: TransportProfile) -> Result<StreamCassette> {
record_bang(source)?.cassette(profile)
}
pub fn replay_cassette(cassette: &StreamCassette) -> Result<Stream> {
Ok(Stream::from_value(Arc::new(
cassette.replay_stream_value()?,
)))
}
pub fn seek(source: Stream, target: SeekTarget) -> Stream {
Stream::new(SeekNode {
source,
target,
state: Mutex::new(SeekState::Pending),
})
}
pub fn record_ledger_run(
cx: &mut Cx,
metadata: StreamMetadata,
ledger: &EventLedger,
run: &Ref,
) -> Result<StreamRecording> {
record_events(cx, metadata, ledger.events_for_run(run))
}
pub fn record_ledger_slice<R>(
cx: &mut Cx,
metadata: StreamMetadata,
ledger: &EventLedger,
run: &Ref,
seq_range: R,
) -> Result<StreamRecording>
where
R: RangeBounds<u64>,
{
record_events(
cx,
metadata,
ledger
.events_for_run(run)
.iter()
.filter(|event| seq_range.contains(&event.seq)),
)
}
pub fn record_events<'a>(
cx: &mut Cx,
metadata: StreamMetadata,
events: impl IntoIterator<Item = &'a Event>,
) -> Result<StreamRecording> {
let mut items = Vec::new();
for event in events {
match &event.kind {
EventKind::Chunk { payload } => {
items.push(item_from_payload(cx, payload, event.ticks.clone())?);
}
EventKind::Diagnostic(diagnostic) => {
items.push(StreamItem::new(StreamPacket::Diagnostic(
diagnostic_packet(diagnostic),
)));
}
EventKind::Done => break,
EventKind::Failed(_) => {
return Err(Error::Eval(
"cannot record a failed stream event slice".to_owned(),
));
}
EventKind::Started { .. }
| EventKind::Claim { .. }
| EventKind::Trace(_)
| EventKind::EffectRequested { .. }
| EventKind::EffectResolved { .. }
| EventKind::Capture { .. }
| EventKind::Card { .. }
| EventKind::Final(_) => {}
}
}
Ok(StreamRecording::new(metadata, items))
}
fn item_from_payload(cx: &mut Cx, payload: &Ref, ticks: Vec<Tick>) -> Result<StreamItem> {
let value = value_from_ref(cx, payload)?;
let packet = StreamPacket::try_from(value.object().as_expr(cx)?)?;
StreamItem::with_ticks(packet, ticks)
}
fn diagnostic_packet(diagnostic: &sim_kernel::Diagnostic) -> StreamDiagnostic {
let kind = diagnostic
.code
.clone()
.unwrap_or_else(|| Symbol::qualified("stream/combinator", "Diagnostic"));
let prefix = match diagnostic.severity {
Severity::Error => "error",
Severity::Warning => "warning",
Severity::Info => "info",
Severity::Note => "note",
};
StreamDiagnostic::new(kind, format!("{prefix}: {}", diagnostic.message))
}
struct SeekNode {
source: Stream,
target: SeekTarget,
state: Mutex<SeekState>,
}
enum SeekState {
Pending,
Ready,
Drained,
}
impl StreamNode for SeekNode {
fn metadata(&self) -> &StreamMetadata {
self.source.metadata()
}
fn next_packet(&self) -> Result<Option<StreamItem>> {
let mut state = self
.state
.lock()
.map_err(|_| Error::PoisonedLock("seek stream"))?;
match *state {
SeekState::Ready => self.source.next_packet(),
SeekState::Drained => Ok(None),
SeekState::Pending => {
let item = seek_first(&self.source, &self.target)?;
*state = if item.is_some() {
SeekState::Ready
} else {
SeekState::Drained
};
Ok(item)
}
}
}
fn is_done(&self) -> Result<bool> {
let state = self
.state
.lock()
.map_err(|_| Error::PoisonedLock("seek stream"))?;
match *state {
SeekState::Drained => Ok(true),
SeekState::Pending | SeekState::Ready => self.source.is_done(),
}
}
}
fn seek_first(source: &Stream, target: &SeekTarget) -> Result<Option<StreamItem>> {
match target {
SeekTarget::PacketIndex(index) => {
for _ in 0..*index {
if source.next_packet()?.is_none() {
return Ok(None);
}
}
source.next_packet()
}
SeekTarget::ClockIndex { clock, index } => {
while let Some(item) = source.next_packet()? {
if item
.ticks()
.iter()
.any(|tick| &tick.clock == clock && &tick.index == index)
{
return Ok(Some(item));
}
}
Ok(None)
}
}
}