use std::any::type_name;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::future::FutureExt;
use futures::{ready, Sink};
mod error;
mod events;
mod filter;
mod observable;
mod shared;
pub use self::error::{ErrorKind, PharErr};
pub use self::events::Events;
use self::events::Sender;
pub use self::filter::Filter;
pub use self::observable::{Observable, ObserveConfig};
pub use self::shared::SharedPharos;
pub type Observe<'a, Event, Error> =
Pin<Box<dyn Future<Output = Result<Events<Event>, Error>> + 'a + Send>>;
pub struct Pharos<Event>
where
Event: 'static + Clone + Send,
{
observers: Vec<Option<Sender<Event>>>,
free_slots: Vec<usize>,
closed: bool,
}
impl<Event> fmt::Debug for Pharos<Event>
where
Event: 'static + Clone + Send,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Pharos<{}>", type_name::<Event>())
}
}
impl<Event> Pharos<Event>
where
Event: 'static + Clone + Send,
{
pub fn new(capacity: usize) -> Self {
Self {
observers: Vec::with_capacity(capacity),
free_slots: Vec::with_capacity(capacity),
closed: false,
}
}
}
impl<Event> Default for Pharos<Event>
where
Event: 'static + Clone + Send,
{
fn default() -> Self {
Self::new(10)
}
}
impl<Event> Observable<Event> for Pharos<Event>
where
Event: 'static + Clone + Send,
{
type Error = PharErr;
fn observe(&mut self, options: ObserveConfig<Event>) -> Observe<'_, Event, Self::Error> {
async move {
if self.closed {
return Err(ErrorKind::Closed.into());
}
let (events, sender) = Events::new(options);
if let Some(i) = self.free_slots.pop() {
self.observers[i] = Some(sender);
} else {
self.observers.push(Some(sender));
}
Ok(events)
}
.boxed()
}
}
impl<Event> Sink<Event> for Pharos<Event>
where
Event: Clone + 'static + Send,
{
type Error = PharErr;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.closed {
return Err(ErrorKind::Closed.into()).into();
}
#[allow(clippy::manual_flatten)]
for obs in self.get_mut().observers.iter_mut() {
if let Some(o) = obs {
let res = ready!(Pin::new(o).poll_ready(cx));
if res.is_err() {
*obs = None;
}
}
}
Ok(()).into()
}
fn start_send(self: Pin<&mut Self>, evt: Event) -> Result<(), Self::Error> {
if self.closed {
return Err(ErrorKind::Closed.into());
}
let this = self.get_mut();
for (i, opt) in this.observers.iter_mut().enumerate() {
if let Some(obs) = opt {
if obs.is_closed() {
this.free_slots.push(i);
*opt = None;
} else if obs.filter(&evt) {
if Pin::new(obs).start_send(evt.clone()).is_err() {
this.free_slots.push(i);
*opt = None;
}
}
}
}
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.closed {
return Err(ErrorKind::Closed.into()).into();
}
let mut pending = false;
let this = self.get_mut();
for (i, opt) in this.observers.iter_mut().enumerate() {
if let Some(obs) = opt {
match Pin::new(obs).poll_flush(cx) {
Poll::Pending => pending = true,
Poll::Ready(Ok(_)) => continue,
Poll::Ready(Err(_)) => {
this.free_slots.push(i);
*opt = None;
}
}
}
}
if pending {
Poll::Pending
} else {
Ok(()).into()
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.closed {
return Ok(()).into();
} else {
self.closed = true;
}
let this = self.get_mut();
for (i, opt) in this.observers.iter_mut().enumerate() {
if let Some(obs) = opt {
let res = ready!(Pin::new(obs).poll_close(cx));
if res.is_err() {
this.free_slots.push(i);
*opt = None;
}
}
}
Ok(()).into()
}
}