use crate::connector::hook::IntoHook;
use crate::connector::{EventTxType, Stream, StreamDescriptor, StreamSpawner};
use crate::io::base::{BaseTx, TxPairExt};
use crate::utils::*;
use serde::Deserialize;
use std::fmt::{Debug, Display};
use std::sync::Arc;
pub trait BaseConnector: Sized + Send + 'static {
type MainConfig: Clone + Send + for<'a> Deserialize<'a> + 'static;
fn init(
config: Self::MainConfig,
cancel_token: CancelToken,
reserved_core_ids: Option<Vec<usize>>,
) -> anyhow::Result<Self>;
fn name(&self) -> impl AsRef<str> + Display;
fn config(&self) -> &Self::MainConfig;
fn cancel_token(&self) -> &CancelToken;
fn cores_stats(&self) -> Option<Arc<CoreStats>>;
fn spawn_stream<D, E, R, S, H, T>(
&mut self,
desc: D,
event_tx_type: EventTxType<E>,
hook: H,
) -> anyhow::Result<Stream<Self::ActionTx, Option<E::RxHalf>, S>>
where
Self: StreamSpawner<D, E, R, S, T>,
D: StreamDescriptor<T>,
S: StateMarker,
E: BaseTx + TxPairExt,
H: IntoHook<Self::RawEvent, E, R, S, D, Self::HookResult, T>,
R: Reducer,
T: Debug + Clone + Send + 'static,
{
<Self as StreamSpawner<D, E, R, S, T>>::spawn(
self,
desc,
event_tx_type,
hook,
self.cancel_token().new_child(),
self.cores_stats(),
)
}
}
pub struct AnyConnector<T>(pub T)
where
T: BaseConnector;
impl<T> std::ops::Deref for AnyConnector<T>
where
T: BaseConnector,
{
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> std::ops::DerefMut for AnyConnector<T>
where
T: BaseConnector,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T> AnyConnector<T>
where
T: BaseConnector,
{
pub fn new(inner: T) -> Self {
Self(inner)
}
pub fn init(
config: T::MainConfig,
cancel_token: CancelToken,
reserved_core_ids: Option<Vec<usize>>,
) -> anyhow::Result<Self> {
T::init(config, cancel_token, reserved_core_ids).map(Self)
}
pub fn into_inner(self) -> T {
self.0
}
pub fn inner(&self) -> &T {
&self.0
}
pub fn inner_mut(&mut self) -> &mut T {
&mut self.0
}
pub fn name(&self) -> impl AsRef<str> + Display {
self.0.name()
}
pub fn config(&self) -> &T::MainConfig {
self.0.config()
}
}