use crate::connector::StreamDescriptor;
use crate::connector::errors::StreamResult;
use crate::connector::hook::IntoHook;
use crate::io::base::{BaseTx, TxPairExt};
use crate::utils::*;
use std::sync::Arc;
pub struct RuntimeCtx<C, D, A, E, R, S, T>
where
C: Send + 'static,
D: StreamDescriptor<T>,
A: BaseTx + TxPairExt,
S: StateMarker,
E: BaseTx,
R: Reducer,
{
pub desc: D,
pub cfg: C,
pub reducer: R,
pub state: Arc<StateCell<S>>,
pub action_rx: <A as TxPairExt>::RxHalf,
pub event_tx: E,
pub cancel: CancelToken,
pub health: HealthFlag,
_marker: std::marker::PhantomData<T>,
}
impl<C, D, A, E, R, S, T> RuntimeCtx<C, D, A, E, R, S, T>
where
C: Send + 'static,
D: StreamDescriptor<T>,
A: BaseTx + TxPairExt,
S: StateMarker,
E: BaseTx,
R: Reducer,
{
#[inline]
pub fn new(
cfg: C,
desc: D,
action_rx: <A as TxPairExt>::RxHalf,
event_tx: E,
reducer: R,
state: Arc<StateCell<S>>,
cancel: CancelToken,
health: HealthFlag,
) -> Self {
Self {
desc,
cfg,
reducer,
action_rx,
event_tx,
state,
cancel,
health,
_marker: std::marker::PhantomData,
}
}
}
pub trait StreamRunner<D, E, R, S, T>: Sized + Send + 'static
where
D: StreamDescriptor<T>,
S: StateMarker,
E: BaseTx,
R: Reducer,
{
type Config: Send + 'static;
type ActionTx: BaseTx + TxPairExt;
type RawEvent: Send + 'static;
type HookResult;
fn build_config(&mut self, desc: &D) -> anyhow::Result<Self::Config>;
fn run<H>(
ctx: RuntimeCtx<Self::Config, D, Self::ActionTx, E, R, S, T>,
hook: H,
) -> StreamResult<()>
where
H: IntoHook<Self::RawEvent, E, R, S, D, Self::HookResult, T>;
}