use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use aion_store::EventStore;
use crate::publish::PublishingEventStore;
use crate::signal::SignalResumeHandoff;
use crate::{EngineError, RuntimeHandle};
use super::delegated::{DelegatedSeams, EventPublisher, SignalRouter};
pub(super) type SignalRouterFactory = Arc<
dyn Fn(Arc<RuntimeHandle>, Arc<SignalResumeHandoff>) -> Arc<dyn SignalRouter> + Send + Sync,
>;
pub(super) type EventStreamingParts = (Arc<dyn EventStore>, Option<Arc<dyn EventPublisher>>);
pub(super) fn wrap_event_streaming(
store: Arc<dyn EventStore>,
capacity: Option<NonZeroUsize>,
event_publisher_overridden: bool,
) -> Result<EventStreamingParts, EngineError> {
let Some(capacity) = capacity else {
return Ok((store, None));
};
if event_publisher_overridden {
return Err(EngineError::ConflictingEventPublisher);
}
let publishing = PublishingEventStore::new(store, capacity)?;
let publisher: Arc<dyn EventPublisher> = Arc::new(publishing.publisher());
Ok((Arc::new(publishing), Some(publisher)))
}
pub(super) struct SeamAssembly {
pub(super) configured: DelegatedSeams,
pub(super) signal_router_factory: Option<SignalRouterFactory>,
pub(super) runtime: Arc<RuntimeHandle>,
pub(super) signal_handoff: Arc<SignalResumeHandoff>,
pub(super) streaming_publisher: Option<Arc<dyn EventPublisher>>,
pub(super) query_mailbox_engine: Arc<dyn crate::engine_seam::EngineHandle>,
pub(super) query_timeout: Option<Duration>,
pub(super) query_service_overridden: bool,
}
pub(super) fn assemble_delegated_seams(assembly: SeamAssembly) -> DelegatedSeams {
let delegated = if let Some(factory) = assembly.signal_router_factory {
DelegatedSeams::new(
factory(assembly.runtime, assembly.signal_handoff),
assembly.configured.query_service_arc(),
assembly.configured.event_publisher_arc(),
)
} else {
assembly.configured
};
let delegated = install_streaming_publisher(delegated, assembly.streaming_publisher);
install_concrete_query_service(
delegated,
assembly.query_mailbox_engine,
assembly.query_timeout,
assembly.query_service_overridden,
)
}
fn install_concrete_query_service(
delegated: DelegatedSeams,
query_mailbox_engine: Arc<dyn crate::engine_seam::EngineHandle>,
query_timeout: Option<Duration>,
query_service_overridden: bool,
) -> DelegatedSeams {
match query_timeout {
Some(timeout) if !query_service_overridden => DelegatedSeams::new(
delegated.signal_router_arc(),
Arc::new(crate::query::ConcreteQueryService::new(
query_mailbox_engine,
timeout,
)),
delegated.event_publisher_arc(),
),
_ => delegated,
}
}
fn install_streaming_publisher(
delegated: DelegatedSeams,
streaming_publisher: Option<Arc<dyn EventPublisher>>,
) -> DelegatedSeams {
match streaming_publisher {
Some(publisher) => DelegatedSeams::new(
delegated.signal_router_arc(),
delegated.query_service_arc(),
publisher,
),
None => delegated,
}
}