mod event_source;
mod queue;
use std::{sync::Arc, time::Duration};
use sim_citizen_derive::non_citizen;
use sim_kernel::{
CORE_SEQUENCE_CLASS_ID, ClassRef, Cx, Error, Event, Object, ObjectCompat, Ref, Result,
Sequence, SequenceItem, Symbol, Tick, Value, stream_surface::stream_packet_event,
validate_ticks,
};
use crate::{StreamMetadata, StreamPacket, publish_metadata_claims};
pub use event_source::StreamEventSource;
use queue::{PullSpine, PushSpine};
pub use queue::{PushResult, StreamStats};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamItem {
packet: StreamPacket,
ticks: Vec<Tick>,
}
impl StreamItem {
pub fn new(packet: StreamPacket) -> Self {
Self {
packet,
ticks: Vec::new(),
}
}
pub fn with_ticks(packet: StreamPacket, ticks: Vec<Tick>) -> Result<Self> {
validate_ticks(&ticks)?;
Ok(Self { packet, ticks })
}
pub fn packet(&self) -> &StreamPacket {
&self.packet
}
pub fn ticks(&self) -> &[Tick] {
&self.ticks
}
pub fn packet_value(&self, cx: &mut Cx) -> Result<Value> {
cx.factory().expr(self.packet.to_expr())
}
pub fn sequence_item(&self, cx: &mut Cx) -> Result<SequenceItem> {
SequenceItem::with_ticks(self.packet_value(cx)?, self.ticks.clone())
}
pub fn chunk_event(&self, cx: &mut Cx, run: Ref, seq: u64) -> Result<Event> {
let payload = self.packet.intern_ref(cx)?;
stream_packet_event(run, seq, self.ticks.clone(), payload)
}
}
#[non_citizen(
reason = "live stream spine; reconstruct stream/Packet and stream/Metadata descriptors then realize separately",
kind = "handle",
descriptor = "stream/Packet"
)]
pub struct StreamValue {
metadata: StreamMetadata,
spine: StreamSpine,
}
enum StreamSpine {
Pull(PullSpine),
Push(PushSpine),
}
impl StreamValue {
pub fn pull(metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
Self {
metadata,
spine: StreamSpine::Pull(PullSpine::new(items)),
}
}
pub fn push(metadata: StreamMetadata) -> Self {
Self {
spine: StreamSpine::Push(PushSpine::new(metadata.buffer().clone())),
metadata,
}
}
pub fn metadata(&self) -> &StreamMetadata {
&self.metadata
}
pub fn publish_claims(&self, cx: &mut Cx, subject: Ref) -> Result<()> {
publish_metadata_claims(cx, subject, &self.metadata)
}
pub fn push_packet(&self, item: StreamItem) -> Result<PushResult> {
match &self.spine {
StreamSpine::Pull(_) => Err(Error::Eval(
"cannot push packets into a pull stream".to_owned(),
)),
StreamSpine::Push(spine) => spine.push(item),
}
}
pub fn close_push(&self) -> Result<()> {
match &self.spine {
StreamSpine::Pull(spine) => spine.close(),
StreamSpine::Push(spine) => spine.close(),
}
}
pub fn next_packet(&self) -> Result<Option<StreamItem>> {
match &self.spine {
StreamSpine::Pull(spine) => spine.next(),
StreamSpine::Push(spine) => spine.next(),
}
}
pub fn next_packet_timeout(&self, timeout: Duration) -> Result<Option<StreamItem>> {
match &self.spine {
StreamSpine::Pull(spine) => spine.next(),
StreamSpine::Push(spine) => spine.next_timeout(timeout),
}
}
pub fn peek_packet(&self) -> Result<Option<StreamItem>> {
match &self.spine {
StreamSpine::Pull(spine) => spine.peek(),
StreamSpine::Push(spine) => spine.peek(),
}
}
pub fn is_done(&self) -> Result<bool> {
match &self.spine {
StreamSpine::Pull(spine) => spine.is_done(),
StreamSpine::Push(spine) => spine.is_done(),
}
}
pub fn take_packets(&self, limit: usize) -> Result<Vec<StreamItem>> {
let mut out = Vec::new();
for _ in 0..limit {
let Some(item) = self.next_packet()? else {
break;
};
out.push(item);
}
Ok(out)
}
pub fn run_events(&self, cx: &mut Cx, run: Ref, start_seq: u64) -> Result<Vec<Event>> {
let mut seq = start_seq;
let mut out = Vec::new();
while let Some(item) = self.next_packet()? {
out.push(item.chunk_event(cx, run.clone(), seq)?);
seq = seq.saturating_add(1);
}
if self.is_done()? {
out.push(Event::done(run, seq)?);
}
Ok(out)
}
pub fn cancel(&self) -> Result<()> {
match &self.spine {
StreamSpine::Pull(spine) => spine.cancel(),
StreamSpine::Push(spine) => spine.cancel(),
}
}
pub fn stats(&self) -> Result<StreamStats> {
match &self.spine {
StreamSpine::Pull(spine) => spine.stats(),
StreamSpine::Push(spine) => spine.stats(),
}
}
pub fn queue_depth(&self) -> Result<usize> {
match &self.spine {
StreamSpine::Pull(spine) => spine.depth(),
StreamSpine::Push(spine) => spine.depth(),
}
}
pub fn event_source(self: &Arc<Self>, run: Ref, start_seq: u64) -> Arc<StreamEventSource> {
Arc::new(StreamEventSource::new(Arc::clone(self), run, start_seq))
}
}
impl Object for StreamValue {
fn display(&self, _cx: &mut Cx) -> Result<String> {
Ok(format!("#<stream {}>", self.metadata.id()))
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
impl ObjectCompat for StreamValue {
fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
cx.factory().class_stub(
CORE_SEQUENCE_CLASS_ID,
Symbol::qualified("stream", "Stream"),
)
}
fn as_sequence(&self) -> Option<&dyn Sequence> {
Some(self)
}
}
impl Sequence for StreamValue {
fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
self.next_packet()?
.map(|item| item.sequence_item(cx))
.transpose()
}
fn close(&self, _cx: &mut Cx) -> Result<()> {
self.cancel()
}
fn peek_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
self.peek_packet()?
.map(|item| item.sequence_item(cx))
.transpose()
}
fn is_done(&self, _cx: &mut Cx) -> Result<bool> {
self.is_done()
}
}
pub fn stream_next_bang(stream: &StreamValue) -> Result<Option<StreamItem>> {
stream.next_packet()
}
pub fn stream_peek_bang(stream: &StreamValue) -> Result<Option<StreamItem>> {
stream.peek_packet()
}
pub fn stream_done_q(stream: &StreamValue) -> Result<bool> {
stream.is_done()
}
pub fn stream_take(stream: &StreamValue, limit: usize) -> Result<Vec<StreamItem>> {
stream.take_packets(limit)
}
pub fn stream_run_bang(
stream: &StreamValue,
cx: &mut Cx,
run: Ref,
start_seq: u64,
) -> Result<Vec<Event>> {
stream.run_events(cx, run, start_seq)
}
pub fn stream_cancel_bang(stream: &StreamValue) -> Result<()> {
stream.cancel()
}
pub fn stream_stats(stream: &StreamValue) -> Result<StreamStats> {
stream.stats()
}
pub fn stream_metadata(stream: &StreamValue) -> &StreamMetadata {
stream.metadata()
}
pub fn stream_next_symbol() -> Symbol {
Symbol::qualified("stream", "next!")
}
pub fn stream_peek_symbol() -> Symbol {
Symbol::qualified("stream", "peek!")
}
pub fn stream_done_symbol() -> Symbol {
Symbol::qualified("stream", "done?")
}
pub fn stream_take_symbol() -> Symbol {
Symbol::qualified("stream", "take")
}
pub fn stream_run_symbol() -> Symbol {
Symbol::qualified("stream", "run!")
}
pub fn stream_cancel_symbol() -> Symbol {
Symbol::qualified("stream", "cancel!")
}
pub fn stream_stats_symbol() -> Symbol {
Symbol::qualified("stream", "stats")
}
pub fn stream_metadata_symbol() -> Symbol {
Symbol::qualified("stream", "metadata")
}