use std::{
collections::{BTreeMap, HashMap},
error::Error as StdError,
future::Future,
sync::Arc,
time::Duration,
};
use crate::codec::Codec;
use crate::{Broker, Publisher, ServerSpec};
use tokio_util::task::TaskTracker;
use crate::runtime::context::State;
use crate::runtime::dispatch::{Delivery, Publishers};
use crate::runtime::lifecycle::{BoxError, BrokerLifecycle};
use crate::runtime::metadata::HandlerMetadata;
use crate::runtime::middleware::{Identity, Stack};
use crate::runtime::publish::PublishMiddleware;
use crate::runtime::router::RouterSink;
use super::scope::BrokerScope;
use super::{AppInfo, LifecycleHook, LifecyclePhase, Starter, StartupHook};
pub struct RustStream<L = Identity> {
pub(super) info: AppInfo,
pub(super) brokers: Vec<Arc<dyn BrokerLifecycle>>,
pub(super) starters: Vec<Starter>,
pub(super) handlers: Vec<HandlerMetadata>,
pub(super) servers: BTreeMap<String, ServerSpec>,
pub(super) publishers: Publishers,
pub(super) publish_layers: Vec<Arc<dyn PublishMiddleware>>,
pub(super) state: State,
pub(super) on_startup: Vec<StartupHook>,
pub(super) after_startup: Vec<LifecycleHook>,
pub(super) on_shutdown: Vec<LifecycleHook>,
pub(super) after_shutdown: Vec<LifecycleHook>,
pub(super) shutdown_timeout: Option<Duration>,
pub(super) continuations: TaskTracker,
pub(super) global: L,
}
impl<L> std::fmt::Debug for RustStream<L> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RustStream")
.field("info", &self.info)
.field("brokers", &self.brokers.len())
.field("handlers", &self.handlers.len())
.finish_non_exhaustive()
}
}
impl RustStream<Identity> {
#[must_use]
pub fn new(info: AppInfo) -> Self {
Self {
info,
brokers: Vec::new(),
starters: Vec::new(),
handlers: Vec::new(),
servers: BTreeMap::new(),
publishers: HashMap::new(),
publish_layers: Vec::new(),
state: State::default(),
on_startup: Vec::new(),
after_startup: Vec::new(),
on_shutdown: Vec::new(),
after_shutdown: Vec::new(),
shutdown_timeout: None,
continuations: TaskTracker::new(),
global: Identity,
}
}
}
impl<L> RustStream<L> {
#[must_use]
pub fn layer<N>(self, layer: N) -> RustStream<Stack<N, L>> {
RustStream {
info: self.info,
brokers: self.brokers,
starters: self.starters,
handlers: self.handlers,
servers: self.servers,
publishers: self.publishers,
publish_layers: self.publish_layers,
state: self.state,
on_startup: self.on_startup,
after_startup: self.after_startup,
on_shutdown: self.on_shutdown,
after_shutdown: self.after_shutdown,
shutdown_timeout: self.shutdown_timeout,
continuations: self.continuations,
global: Stack::new(layer, self.global),
}
}
#[must_use]
pub fn insert_state<T>(mut self, value: T) -> Self
where
T: std::any::Any + Send + Sync,
{
self.state.insert(value);
self
}
#[must_use]
pub fn on_startup<F, Fut, E>(mut self, hook: F) -> Self
where
F: FnOnce(State) -> Fut + Send + 'static,
Fut: Future<Output = Result<State, E>> + Send,
E: StdError + Send + Sync + 'static,
{
self.on_startup.push(Box::new(move |state| {
Box::pin(async move { hook(state).await.map_err(|e| Box::new(e) as BoxError) })
}));
self
}
#[must_use]
pub fn after_startup<F, Fut, E>(self, hook: F) -> Self
where
F: FnOnce(Arc<State>) -> Fut + Send + 'static,
Fut: Future<Output = Result<(), E>> + Send,
E: StdError + Send + Sync + 'static,
{
self.push_lifecycle_hook(LifecyclePhase::AfterStartup, hook)
}
#[must_use]
pub fn on_shutdown<F, Fut, E>(self, hook: F) -> Self
where
F: FnOnce(Arc<State>) -> Fut + Send + 'static,
Fut: Future<Output = Result<(), E>> + Send,
E: StdError + Send + Sync + 'static,
{
self.push_lifecycle_hook(LifecyclePhase::OnShutdown, hook)
}
#[must_use]
pub fn after_shutdown<F, Fut, E>(self, hook: F) -> Self
where
F: FnOnce(Arc<State>) -> Fut + Send + 'static,
Fut: Future<Output = Result<(), E>> + Send,
E: StdError + Send + Sync + 'static,
{
self.push_lifecycle_hook(LifecyclePhase::AfterShutdown, hook)
}
fn push_lifecycle_hook<F, Fut, E>(mut self, phase: LifecyclePhase, hook: F) -> Self
where
F: FnOnce(Arc<State>) -> Fut + Send + 'static,
Fut: Future<Output = Result<(), E>> + Send,
E: StdError + Send + Sync + 'static,
{
let boxed: LifecycleHook = Box::new(move |state| {
Box::pin(async move { hook(state).await.map_err(|e| Box::new(e) as BoxError) })
});
match phase {
LifecyclePhase::AfterStartup => self.after_startup.push(boxed),
LifecyclePhase::OnShutdown => self.on_shutdown.push(boxed),
LifecyclePhase::AfterShutdown => self.after_shutdown.push(boxed),
}
self
}
#[must_use]
pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
self.shutdown_timeout = Some(timeout);
self
}
#[must_use]
pub fn publisher<P>(mut self, name: impl Into<String>, publisher: P) -> Self
where
P: Publisher + 'static,
{
self.publishers.insert(name.into(), Arc::new(publisher));
self
}
#[must_use]
pub fn publish_layer<M>(mut self, middleware: M) -> Self
where
M: PublishMiddleware + 'static,
{
self.publish_layers.push(Arc::new(middleware));
self
}
#[must_use]
pub fn register_broker<B>(mut self, broker: B) -> Self
where
B: Broker + 'static,
{
self.brokers.push(Arc::new(broker));
self
}
#[must_use]
pub fn server(mut self, name: impl Into<String>, spec: ServerSpec) -> Self {
self.servers.insert(name.into(), spec);
self
}
#[must_use]
pub fn with_broker<B, F>(mut self, broker: B, build: F) -> Self
where
B: Broker + 'static,
L: Clone,
F: FnOnce(&mut BrokerScope<B, L>),
{
let broker = Arc::new(broker);
let mut scope = self.new_scope(&broker, ());
build(&mut scope);
self.collect_scope(&broker, scope);
self
}
#[must_use]
pub fn with_broker_codec<B, C, F>(mut self, broker: B, codec: C, build: F) -> Self
where
B: Broker + 'static,
C: Codec + Clone + 'static,
L: Clone,
F: FnOnce(&mut BrokerScope<B, L, C>),
{
let broker = Arc::new(broker);
let mut scope = self.new_scope(&broker, codec);
build(&mut scope);
self.collect_scope(&broker, scope);
self
}
fn new_scope<B, C>(&self, broker: &Arc<B>, codec: C) -> BrokerScope<B, L, C>
where
B: Broker + 'static,
L: Clone,
{
BrokerScope {
broker: broker.clone(),
sink: RouterSink::new(),
publishers: self.publishers.clone(),
pipeline: self.publish_layers.iter().cloned().collect(),
retry_publisher: None,
global: self.global.clone(),
codec,
}
}
fn collect_scope<B, C>(&mut self, broker: &Arc<B>, scope: BrokerScope<B, L, C>)
where
B: Broker + 'static,
{
let lifecycle: Arc<dyn BrokerLifecycle> = broker.clone();
let delivery = Arc::new(Delivery {
publishers: self.publishers.clone(),
pipeline: scope.pipeline.clone(),
retry_publisher: scope.retry_publisher.clone(),
tasks: self.continuations.clone(),
});
let (starters, handlers) = scope.sink.into_parts();
for (bound, meta) in starters.into_iter().zip(handlers) {
let broker = broker.clone();
let delivery = delivery.clone();
self.starters.push(Box::new(move |state, shutdown, token| {
bound(broker, state, delivery, shutdown, token)
}));
self.handlers.push(meta);
}
self.brokers.push(lifecycle);
}
#[must_use]
pub fn handlers(&self) -> &[HandlerMetadata] {
&self.handlers
}
#[must_use]
pub fn info(&self) -> &AppInfo {
&self.info
}
#[must_use]
pub fn servers(&self) -> &BTreeMap<String, ServerSpec> {
&self.servers
}
}