use std::sync::{Arc, Mutex};
use sim_kernel::{Cx, Error, Event, EventSource, Ref, Result};
use super::StreamValue;
pub struct StreamEventSource {
stream: Arc<StreamValue>,
run: Ref,
state: Mutex<EventSourceState>,
}
struct EventSourceState {
next_seq: u64,
done_sent: bool,
}
impl StreamEventSource {
pub fn new(stream: Arc<StreamValue>, run: Ref, start_seq: u64) -> Self {
Self {
stream,
run,
state: Mutex::new(EventSourceState {
next_seq: start_seq,
done_sent: false,
}),
}
}
}
impl EventSource for StreamEventSource {
fn next(&self, cx: &mut Cx) -> Result<Option<Event>> {
if let Some(item) = self.stream.next_packet()? {
let mut state = self
.state
.lock()
.map_err(|_| Error::PoisonedLock("stream event source"))?;
let event = item.chunk_event(cx, self.run.clone(), state.next_seq)?;
state.next_seq = state.next_seq.saturating_add(1);
return Ok(Some(event));
}
let mut state = self
.state
.lock()
.map_err(|_| Error::PoisonedLock("stream event source"))?;
if self.stream.is_done()? && !state.done_sent {
state.done_sent = true;
let event = Event::done(self.run.clone(), state.next_seq)?;
state.next_seq = state.next_seq.saturating_add(1);
Ok(Some(event))
} else {
Ok(None)
}
}
fn close(&self, _cx: &mut Cx) -> Result<()> {
self.stream.cancel()
}
}