use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use crate::ids::{AureliaError, ErrorId, TabernaId};
use crate::peering::address::DomusAddr;
use crate::peering::auth::DomusAuthConfig;
use crate::peering::codec::MessageCodec;
use crate::peering::config::{DomusConfig, DomusConfigAccess, DomusConfigStore};
use crate::peering::observability::{
new_observability, DomusReporting, DomusReportingFeeds, ObservabilityHandle,
};
use crate::peering::peering::{RouteLocalRemote, RouteLocalRemoteBuilder};
use crate::peering::routing::RouteResolver;
use crate::peering::send::{SendOptions, SendOutcome};
use crate::peering::taberna::{
Taberna, TabernaInboxHandle, TabernaRegistry, TabernaShutdownReport,
};
use crate::peering::transport::Transport;
use caducus::MpscBuilder;
type DomusPeering<RR> = RouteLocalRemote<RR>;
type DomusTransport = Arc<Transport>;
pub struct DomusBuilder<RR>
where
RR: RouteResolver,
{
config: DomusConfig,
local_addr: DomusAddr,
auth: DomusAuthConfig,
resolver: Arc<RR>,
runtime_handle: tokio::runtime::Handle,
}
impl<RR> DomusBuilder<RR>
where
RR: RouteResolver,
{
pub fn new(
config: DomusConfig,
local_addr: DomusAddr,
auth: DomusAuthConfig,
resolver: Arc<RR>,
runtime_handle: tokio::runtime::Handle,
) -> Self {
Self {
config,
local_addr,
auth,
resolver,
runtime_handle,
}
}
pub async fn build(self) -> Result<Domus<RR>, AureliaError> {
let (domus, _) = self.build_internal(false).await?;
Ok(domus)
}
pub async fn build_with_reporting(
self,
) -> Result<(Domus<RR>, DomusReportingFeeds), AureliaError> {
let (domus, feeds) = self.build_internal(true).await?;
let feeds = feeds.ok_or_else(|| {
AureliaError::with_message(ErrorId::ProtocolViolation, "reporting feeds missing")
})?;
Ok((domus, feeds))
}
async fn build_internal(
self,
with_feeds: bool,
) -> Result<(Domus<RR>, Option<DomusReportingFeeds>), AureliaError> {
let registry = Arc::new(TabernaRegistry::new());
let runtime_handle = self.runtime_handle.clone();
let (reporting, observability) = new_observability(runtime_handle.clone());
let feeds = if with_feeds {
Some(reporting.feeds())
} else {
None
};
let mut config = self.config;
config.apply_transport_defaults(self.local_addr.kind());
let config = Arc::new(DomusConfigStore::new(config));
let config_access =
DomusConfigAccess::new(Arc::clone(&config), Some(observability.clone()));
let transport = Arc::new(
Transport::bind(
self.local_addr,
Arc::clone(®istry),
config_access.clone(),
observability.clone(),
runtime_handle.clone(),
self.auth,
)
.await?,
);
let peering = RouteLocalRemoteBuilder::new(
config_access.clone(),
Arc::clone(®istry),
Arc::clone(&self.resolver),
Arc::clone(&transport),
)
.build();
let transport_handle = transport.start().await?;
let domus = Domus {
config: config_access,
registry,
peering,
transport,
transport_handle: Mutex::new(Some(transport_handle)),
reporting,
observability,
runtime_handle,
};
Ok((domus, feeds))
}
}
pub struct Domus<RR>
where
RR: RouteResolver,
{
config: DomusConfigAccess,
registry: Arc<TabernaRegistry>,
peering: DomusPeering<RR>,
transport: DomusTransport,
transport_handle: Mutex<Option<JoinHandle<()>>>,
reporting: DomusReporting,
observability: ObservabilityHandle,
runtime_handle: tokio::runtime::Handle,
}
impl<RR> Domus<RR>
where
RR: RouteResolver,
{
pub fn config(&self) -> DomusConfigAccess {
self.config.clone()
}
pub fn local_addr(&self) -> DomusAddr {
self.transport.local_addr()
}
pub fn reporting(&self) -> DomusReporting {
self.reporting.clone()
}
pub async fn taberna<Codec: MessageCodec>(
&self,
id: TabernaId,
codec: Codec,
) -> Result<Taberna<Codec>, AureliaError>
where
Codec::AppMessage: Send + Sync + 'static,
{
let config = self.config.snapshot().await;
let (sender, receiver) =
MpscBuilder::new(config.taberna_accept_queue_size, config.accept_timeout)
.shutdown_channel(Arc::new(TabernaShutdownReport::<Codec>::new()))
.runtime(self.runtime_handle.clone())
.build()
.map_err(|err| {
AureliaError::with_message(ErrorId::ProtocolViolation, err.to_string())
})?;
let inbox = Arc::new(TabernaInboxHandle::new(
codec,
sender,
self.config.clone(),
config.taberna_accept_queue_size,
config.accept_timeout,
));
self.registry.register(id, inbox).await?;
Ok(Taberna::new(
id,
receiver,
Arc::clone(&self.registry),
self.runtime_handle.clone(),
))
}
pub async fn send<Codec: MessageCodec>(
&self,
codec: &Codec,
taberna_id: TabernaId,
message: &Codec::AppMessage,
options: SendOptions,
) -> Result<SendOutcome, AureliaError> {
self.peering.send(codec, taberna_id, message, options).await
}
pub async fn reload_auth(&self, auth: DomusAuthConfig) -> Result<(), AureliaError> {
self.transport.reload_auth(auth).await?;
self.observability.auth_reloaded().await;
Ok(())
}
pub async fn shutdown(&self) {
self.observability.shutdown_started().await;
self.registry.shutdown().await;
self.transport.shutdown().await;
let handle = self.transport_handle.lock().await.take();
if let Some(handle) = handle {
handle.abort();
}
self.observability.shutdown_complete().await;
}
}