use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll};
use futures_channel::mpsc;
use futures_core::Stream;
use futures_util::stream::StreamExt;
use pin_project::pin_project;
use serde::Serialize;
use crate::model::Message;
use crate::path::Path;
use crate::simulation::{DuplicateEventSinkError, SimInit};
use super::{EventSinkReader, EventSinkWriter, SinkState};
pub fn event_queue<T: Send>(state: SinkState) -> (EventQueueWriter<T>, EventQueueReader<T>) {
let (sender, receiver) = mpsc::unbounded();
let is_enabled = Arc::new(AtomicBool::new(state == SinkState::Enabled));
let reader = EventQueueReader {
receiver,
is_enabled: is_enabled.clone(),
};
let writer = EventQueueWriter { sender, is_enabled };
(writer, reader)
}
pub fn event_queue_endpoint<T: Message + Serialize + Send + 'static>(
sim_init: &mut SimInit,
state: SinkState,
path: impl Into<Path>,
) -> Result<EventQueueWriter<T>, DuplicateEventSinkError> {
let (writer, reader) = event_queue(state);
sim_init.bind_event_sink(reader, path).map(|()| writer)
}
pub fn event_queue_endpoint_raw<T: Serialize + Send + 'static>(
sim_init: &mut SimInit,
state: SinkState,
path: impl Into<Path>,
) -> Result<EventQueueWriter<T>, DuplicateEventSinkError> {
let (writer, reader) = event_queue(state);
sim_init.bind_event_sink_raw(reader, path).map(|()| writer)
}
#[pin_project]
pub struct EventQueueReader<T: Send> {
is_enabled: Arc<AtomicBool>,
#[pin]
receiver: mpsc::UnboundedReceiver<T>,
}
impl<T: Send> Stream for EventQueueReader<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.receiver.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.receiver.size_hint()
}
}
impl<T: Send + 'static> EventSinkReader<T> for EventQueueReader<T> {
fn enable(&mut self) {
self.is_enabled.store(true, Ordering::Relaxed);
}
fn disable(&mut self) {
self.is_enabled.store(false, Ordering::Relaxed);
}
fn try_read(&mut self) -> Option<T> {
self.receiver.try_next().ok().and_then(|event| event)
}
fn read(&mut self) -> Option<T> {
pollster::block_on(self.receiver.next())
}
}
impl<T: Send> fmt::Debug for EventQueueReader<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EventQueueReader")
.field("is_enabled", &self.is_enabled.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
pub struct EventQueueWriter<T: Send> {
is_enabled: Arc<AtomicBool>,
sender: mpsc::UnboundedSender<T>,
}
impl<T: Send + 'static> EventSinkWriter<T> for EventQueueWriter<T> {
fn write(&self, event: T) {
if !self.is_enabled.load(Ordering::Relaxed) {
return;
}
let _ = self.sender.unbounded_send(event);
}
}
impl<T: Send> Clone for EventQueueWriter<T> {
fn clone(&self) -> Self {
Self {
is_enabled: self.is_enabled.clone(),
sender: self.sender.clone(),
}
}
}
impl<T: Send> fmt::Debug for EventQueueWriter<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EventQueueWriter")
.field("is_enabled", &self.is_enabled.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}