pub use net::adapter::net::behavior::aggregator::{
FoldQueryClient, FoldQueryClientError, FoldQueryError, FoldQueryOp, FoldQueryRequest,
FoldQueryResponse, RegistryClient, RegistryClientError, RegistryGroupSummary,
RegistryReplicaSummary, RegistryRequest, RegistryResponse, RegistryRpcError, ScaleFn,
ScaleRequest, SpawnFn, SpawnRequest, DEFAULT_QUERY_CACHE_TTL, DEFAULT_QUERY_DEADLINE,
DEFAULT_REGISTRY_DEADLINE, FOLD_QUERY_SERVICE, REGISTRY_SERVICE,
};
pub use net::adapter::net::behavior::aggregator::{
snapshot_group, AggregatorConfig, AggregatorDaemon, AggregatorError, AggregatorGroupEntry,
AggregatorPublishError, AggregatorRegistry, AggregatorRegistryError, CapabilityFoldSummarizer,
EntrySnapshot, RegistryHandler, RegistryReadHandler, ReservationFoldSummarizer, Summarizer,
SummaryAnnouncement,
};
pub use net::adapter::net::behavior::lifecycle::{
HealthMonitor, HealthMonitorStats, LifecycleDaemon, LifecycleError, LifecycleGroup,
LifecycleGroupError, LifecycleHandle, ReplicaContext, ReplicaHealth,
};
use std::sync::Arc;
use std::time::Duration;
use ::net::adapter::net::channel::{ChannelId, ChannelName};
use ::net::adapter::net::mesh_rpc::{ServeError, ServeHandle};
use ::net::adapter::net::ChannelConfig;
use ::net::adapter::net::MeshNode;
use crate::mesh::Mesh;
pub fn install_aggregator_registry_service(
mesh: &Mesh,
registry: &Arc<AggregatorRegistry>,
) -> Result<ServeHandle, ServeError> {
auto_register_rpc_channels(mesh, REGISTRY_SERVICE);
registry.install_registry_service(&mesh.node_arc())
}
pub fn install_aggregator_registry_service_with_spawner(
mesh: &Mesh,
registry: &Arc<AggregatorRegistry>,
spawner: SpawnFn,
) -> Result<ServeHandle, ServeError> {
auto_register_rpc_channels(mesh, REGISTRY_SERVICE);
registry.install_registry_service_with_spawner(&mesh.node_arc(), spawner)
}
pub fn install_fold_query_service(
aggregator: &Arc<AggregatorDaemon>,
mesh: &Mesh,
) -> Result<ServeHandle, ServeError> {
auto_register_rpc_channels(mesh, FOLD_QUERY_SERVICE);
aggregator.install_query_service(&mesh.node_arc())
}
fn auto_register_rpc_channels(mesh: &Mesh, service: &str) {
if let Ok(req_channel) = ChannelName::new(&format!("{service}.requests")) {
mesh.register_channel(ChannelConfig::new(ChannelId::new(req_channel)));
}
if let Some(configs) = mesh.inner().channel_configs() {
if let Ok(sentinel_name) = ChannelName::new(&format!("{service}.replies.prefix")) {
configs.insert_prefix(
format!("{service}.replies."),
ChannelConfig::new(ChannelId::new(sentinel_name)),
);
}
}
}
#[derive(Clone)]
pub struct BoundRegistryClient {
inner: RegistryClient,
target_node_id: u64,
}
impl BoundRegistryClient {
pub fn new(mesh: Arc<MeshNode>, target_node_id: u64) -> Self {
Self {
inner: RegistryClient::new(mesh),
target_node_id,
}
}
pub fn with_deadline(mut self, deadline: Duration) -> Self {
self.inner = self.inner.with_deadline(deadline);
self
}
pub fn target_node_id(&self) -> u64 {
self.target_node_id
}
pub fn unbound(&self) -> &RegistryClient {
&self.inner
}
pub async fn list(&self) -> Result<Vec<RegistryGroupSummary>, RegistryClientError> {
self.inner.list(self.target_node_id).await
}
pub async fn spawn(
&self,
template_name: impl Into<String>,
group_name: impl Into<String>,
replica_count: u8,
) -> Result<RegistryGroupSummary, RegistryClientError> {
self.inner
.spawn(
self.target_node_id,
template_name,
group_name,
replica_count,
)
.await
}
pub async fn unregister(
&self,
group_name: impl Into<String>,
) -> Result<bool, RegistryClientError> {
self.inner.unregister(self.target_node_id, group_name).await
}
pub async fn scale(
&self,
group_name: impl Into<String>,
template_name: impl Into<String>,
target_replica_count: u8,
) -> Result<RegistryGroupSummary, RegistryClientError> {
self.inner
.scale(
self.target_node_id,
group_name,
template_name,
target_replica_count,
)
.await
}
}
#[derive(Clone)]
pub struct BoundFoldQueryClient {
inner: FoldQueryClient,
target_node_id: u64,
}
impl BoundFoldQueryClient {
pub fn new(mesh: Arc<MeshNode>, target_node_id: u64) -> Self {
Self {
inner: FoldQueryClient::new(mesh),
target_node_id,
}
}
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.inner = self.inner.with_ttl(ttl);
self
}
pub fn with_deadline(mut self, deadline: Duration) -> Self {
self.inner = self.inner.with_deadline(deadline);
self
}
pub fn target_node_id(&self) -> u64 {
self.target_node_id
}
pub async fn query_latest(
&self,
kind: u16,
) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
self.inner.query_latest(self.target_node_id, kind).await
}
pub async fn query_summarize_now(
&self,
kind: u16,
) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
self.inner
.query_summarize_now(self.target_node_id, kind)
.await
}
pub fn invalidate_cache(&self) {
self.inner.invalidate_cache();
}
}