use std::any::{self, Any, TypeId};
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::fmt;
use std::marker::PhantomData;
#[cfg(feature = "server")]
use std::pin::Pin;
#[cfg(feature = "server")]
use std::task::{Context, Poll};
#[cfg(feature = "server")]
use ciborium;
#[cfg(feature = "server")]
use futures_core::Stream;
use serde::Serialize;
use crate::path::Path;
use crate::ports::EventSinkReader;
use super::EndpointError;
#[cfg(feature = "server")]
type SerializationError = ciborium::ser::Error<std::io::Error>;
#[derive(Default)]
pub(crate) struct EventSinkRegistry(HashMap<Path, Option<Box<dyn EventSinkReaderEntryAny>>>);
impl EventSinkRegistry {
pub(crate) fn add<S, T>(&mut self, sink: S, path: Path) -> Result<(), (Path, S)>
where
S: EventSinkReader<T> + Send + Sync + 'static,
T: Serialize + 'static,
{
match self.0.entry(path) {
Entry::Vacant(s) => {
s.insert(Some(Box::new(EventSinkReaderEntry {
sink,
_phantom: PhantomData,
})));
Ok(())
}
Entry::Occupied(e) => Err((e.key().clone(), sink)),
}
}
pub(crate) fn take<T>(
&mut self,
path: Path,
) -> Result<Box<dyn EventSinkReader<T>>, EndpointError>
where
T: Clone + Send + 'static,
{
match self.0.entry(path) {
Entry::Occupied(entry) => {
if let Some(inner) = entry.get() {
if inner.event_type_id() == TypeId::of::<T>() {
let sink = entry
.remove_entry()
.1
.unwrap()
.into_event_sink_reader()
.downcast::<Box<dyn EventSinkReader<T>>>()
.unwrap();
return Ok(*sink);
}
return Err(EndpointError::InvalidEventSinkType {
path: entry.key().clone(),
found_event_type: any::type_name::<T>(),
expected_event_type: inner.event_type_name(),
});
}
Err(EndpointError::EventSinkNotFound {
path: entry.key().clone(),
})
}
Entry::Vacant(entry) => Err(EndpointError::EventSinkNotFound {
path: entry.key().clone(),
}),
}
}
#[cfg(feature = "server")]
pub(crate) fn has_sink(&mut self, path: &Path) -> bool {
self.0.contains_key(path)
}
#[cfg(feature = "server")]
pub(crate) fn get_entry_mut(
&mut self,
path: &Path,
) -> Result<&mut Box<dyn EventSinkReaderEntryAny>, EndpointError> {
self.0
.get_mut(path)
.and_then(|s| s.as_mut())
.ok_or_else(|| EndpointError::EventSinkNotFound { path: path.clone() })
}
#[cfg(feature = "server")]
pub(crate) fn rent_entry(
&mut self,
path: &Path,
) -> Result<Box<dyn EventSinkReaderEntryAny>, EndpointError> {
self.0
.get_mut(path)
.and_then(|s| s.take())
.ok_or_else(|| EndpointError::EventSinkNotFound { path: path.clone() })
}
#[cfg(feature = "server")]
pub(crate) fn replace_entry(
&mut self,
path: &Path,
entry: Box<dyn EventSinkReaderEntryAny>,
) -> Result<(), EndpointError> {
self.0
.get_mut(path)
.map(|s| {
*s = Some(entry);
})
.ok_or_else(|| EndpointError::EventSinkNotFound { path: path.clone() })
}
}
impl fmt::Debug for EventSinkRegistry {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "EventSinkRegistry ({} sinks)", self.0.len())
}
}
#[cfg(feature = "server")]
pub(crate) trait EventSinkReaderEntryAny:
Stream<Item = Result<Vec<u8>, SerializationError>> + Send + Unpin + 'static
{
fn enable(&mut self);
fn disable(&mut self);
fn try_read(&mut self) -> Option<Result<Vec<u8>, SerializationError>>;
fn event_type_id(&self) -> TypeId;
fn event_type_name(&self) -> &'static str;
fn into_event_sink_reader(self: Box<Self>) -> Box<dyn Any>;
}
#[cfg(not(feature = "server"))]
pub(crate) trait EventSinkReaderEntryAny {
fn event_type_id(&self) -> TypeId;
fn event_type_name(&self) -> &'static str;
fn into_event_sink_reader(self: Box<Self>) -> Box<dyn Any>;
}
struct EventSinkReaderEntry<S, T>
where
S: EventSinkReader<T> + Send + Sync + 'static,
T: 'static,
{
sink: S,
_phantom: PhantomData<fn(T)>,
}
impl<S, T> EventSinkReaderEntryAny for EventSinkReaderEntry<S, T>
where
S: EventSinkReader<T> + Send + Sync + 'static,
T: Serialize + 'static,
{
#[cfg(feature = "server")]
fn enable(&mut self) {
self.sink.enable();
}
#[cfg(feature = "server")]
fn disable(&mut self) {
self.sink.disable();
}
#[cfg(feature = "server")]
fn try_read(&mut self) -> Option<Result<Vec<u8>, SerializationError>> {
self.sink.try_read().map(|event| {
let mut buffer = Vec::new();
ciborium::into_writer(&event, &mut buffer).map(|_| buffer)
})
}
fn event_type_id(&self) -> TypeId {
TypeId::of::<T>()
}
fn event_type_name(&self) -> &'static str {
any::type_name::<T>()
}
fn into_event_sink_reader(self: Box<Self>) -> Box<dyn Any> {
let event_sink_reader: Box<dyn EventSinkReader<T>> = Box::new(self.sink);
Box::new(event_sink_reader)
}
}
#[cfg(feature = "server")]
impl<S, T> Stream for EventSinkReaderEntry<S, T>
where
S: EventSinkReader<T> + Send + Sync + 'static,
T: Serialize + 'static,
{
type Item = Result<Vec<u8>, SerializationError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let sink = &mut self.get_mut().sink;
Pin::new(sink).poll_next(cx).map(|e| {
e.map(|event| {
let mut buffer = Vec::new();
ciborium::into_writer(&event, &mut buffer).map(|_| buffer)
})
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.sink.size_hint()
}
}