use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
use tokio::task::JoinHandle;
use crate::data::DomusAddr;
use crate::data::RouteResolver;
use crate::ids::{AureliaError, ErrorId, TabernaId};
use crate::peering::auth::Pkcs8AuthConfig;
use crate::peering::codec::MessageCodec;
use crate::peering::config::{
normalize_domus_config_for_transport, DomusConfig, DomusConfigAccess, DomusConfigStore,
};
use crate::peering::observability::{
new_observability, DomusReporting, DomusReportingFeeds, ObservabilityHandle,
};
use crate::peering::peering::RouteLocalRemote;
use crate::peering::send::{SendOptions, SendOutcome};
use crate::peering::taberna::{
Taberna, TabernaInboxHandle, TabernaRegistry, TabernaShutdownReport,
};
use crate::peering::transport::Transport;
use caducus::MpscBuilder;
#[cfg(feature = "actix")]
use crate::peering::actix_adapter::{ActixTaberna, ActixTabernaDelivery};
pub struct DomusBuilder<RR>
where
RR: RouteResolver + 'static,
{
config: DomusConfig,
local_addr: DomusAddr,
auth: Pkcs8AuthConfig,
resolver: Arc<RR>,
runtime_handle: tokio::runtime::Handle,
}
impl<RR> DomusBuilder<RR>
where
RR: RouteResolver + 'static,
{
pub fn new(
config: DomusConfig,
local_addr: DomusAddr,
auth: Pkcs8AuthConfig,
resolver: Arc<RR>,
) -> Self {
Self {
config,
local_addr,
auth,
resolver,
runtime_handle: crate::platform::runtime::handle(),
}
}
pub async fn build(self) -> Result<Domus<RR>, AureliaError> {
let (tx, rx) = oneshot::channel();
let runtime_handle = self.runtime_handle.clone();
runtime_handle.spawn(async move {
let result = self.build_internal(false).await.map(|(domus, _)| domus);
let _ = tx.send(result);
});
rx.await.unwrap_or_else(|_| {
Err(AureliaError::with_message(
ErrorId::PeerUnavailable,
"aurelia runtime unavailable",
))
})
}
pub async fn build_with_reporting(
self,
) -> Result<(Domus<RR>, DomusReportingFeeds), AureliaError> {
let (tx, rx) = oneshot::channel();
let runtime_handle = self.runtime_handle.clone();
runtime_handle.spawn(async move {
let result = self.build_internal(true).await.and_then(|(domus, feeds)| {
let feeds = feeds.ok_or_else(|| {
AureliaError::with_message(
ErrorId::ProtocolViolation,
"reporting feeds missing",
)
})?;
Ok((domus, feeds))
});
let _ = tx.send(result);
});
rx.await.unwrap_or_else(|_| {
Err(AureliaError::with_message(
ErrorId::PeerUnavailable,
"aurelia runtime unavailable",
))
})
}
async fn build_internal(
self,
with_feeds: bool,
) -> Result<(Domus<RR>, Option<DomusReportingFeeds>), AureliaError> {
let registry = Arc::new(TabernaRegistry::new());
let runtime_handle = self.runtime_handle.clone();
let (reporting, observability) = new_observability(runtime_handle.clone());
let feeds = if with_feeds {
Some(reporting.feeds())
} else {
None
};
let config = normalize_domus_config_for_transport(self.config, self.local_addr.kind())?;
let config = Arc::new(DomusConfigStore::new(config));
let config_access =
DomusConfigAccess::new(Arc::clone(&config), Some(observability.clone()));
let transport = Arc::new(
Transport::bind(
self.local_addr,
Arc::clone(®istry),
config_access.clone(),
observability.clone(),
runtime_handle.clone(),
self.auth,
)
.await?,
);
let peering = RouteLocalRemote::new(
config_access.clone(),
Arc::clone(®istry),
Arc::clone(&self.resolver),
Arc::clone(&transport),
);
let transport_handle = transport.start().await?;
let domus = Domus {
config: config_access,
registry,
peering,
transport,
transport_handle: Mutex::new(Some(transport_handle)),
reporting,
observability,
runtime_handle,
};
Ok((domus, feeds))
}
}
pub struct Domus<RR>
where
RR: RouteResolver,
{
config: DomusConfigAccess,
registry: Arc<TabernaRegistry>,
peering: RouteLocalRemote<RR>,
transport: Arc<Transport>,
transport_handle: Mutex<Option<JoinHandle<()>>>,
reporting: DomusReporting,
observability: ObservabilityHandle,
runtime_handle: tokio::runtime::Handle,
}
impl<RR> Domus<RR>
where
RR: RouteResolver,
{
pub fn config(&self) -> DomusConfigAccess {
self.config.clone()
}
pub fn local_addr(&self) -> DomusAddr {
self.transport.local_addr()
}
pub fn reporting(&self) -> DomusReporting {
self.reporting.clone()
}
pub async fn taberna<Codec: MessageCodec>(
&self,
id: TabernaId,
codec: Codec,
) -> Result<Taberna<Codec>, AureliaError>
where
Codec::AppMessage: Send + Sync + 'static,
{
let config = self.config.snapshot().await;
let (sender, receiver) =
MpscBuilder::new(config.taberna_accept_queue_size, config.accept_timeout)
.shutdown_channel(Arc::new(TabernaShutdownReport::<Codec>::new()))
.runtime(self.runtime_handle.clone())
.build()
.map_err(|err| {
AureliaError::with_message(ErrorId::ProtocolViolation, err.to_string())
})?;
let inbox = Arc::new(TabernaInboxHandle::new(
codec,
sender,
self.config.clone(),
config.taberna_accept_queue_size,
config.accept_timeout,
));
self.registry.register(id, inbox).await?;
Ok(Taberna::new(
id,
receiver,
Arc::clone(&self.registry),
self.runtime_handle.clone(),
))
}
#[cfg(feature = "actix")]
pub async fn actix_taberna<Codec>(
&self,
id: TabernaId,
codec: Codec,
recipient: actix::prelude::Recipient<ActixTabernaDelivery<Codec::AppMessage>>,
) -> Result<ActixTaberna, AureliaError>
where
Codec: MessageCodec + 'static,
Codec::AppMessage: Send + Sync + 'static,
{
let taberna = self.taberna(id, codec).await?;
Ok(ActixTaberna::new(taberna, recipient))
}
pub async fn send<Codec: MessageCodec>(
&self,
codec: &Codec,
taberna_id: TabernaId,
message: &Codec::AppMessage,
options: SendOptions,
) -> Result<SendOutcome, AureliaError> {
self.peering.send(codec, taberna_id, message, options).await
}
pub async fn reload_auth(&self, auth: Pkcs8AuthConfig) -> Result<(), AureliaError> {
self.transport.reload_auth(auth).await?;
self.observability.auth_reloaded();
Ok(())
}
pub async fn shutdown(&self) {
self.observability.shutdown_started();
self.registry.shutdown().await;
self.transport.shutdown().await;
let handle = self.transport_handle.lock().await.take();
if let Some(handle) = handle {
handle.abort();
}
self.observability.shutdown_complete();
}
}