use std::sync::Arc;
use std::time::Duration;
use super::registry_service::{
RegistryGroupSummary, RegistryRequest, RegistryResponse, RegistryRpcError, REGISTRY_SERVICE,
};
use crate::adapter::net::mesh_rpc::{typed_call, RpcError, TypedCallError};
use crate::adapter::net::MeshNode;
pub const DEFAULT_REGISTRY_DEADLINE: Duration = Duration::from_secs(3);
#[derive(Debug, thiserror::Error)]
pub enum RegistryClientError {
#[error("transport: {0}")]
Transport(RpcError),
#[error("codec: {0}")]
Codec(String),
#[error("server: {0:?}")]
Server(RegistryRpcError),
}
impl From<RpcError> for RegistryClientError {
fn from(e: RpcError) -> Self {
Self::Transport(e)
}
}
impl From<TypedCallError> for RegistryClientError {
fn from(e: TypedCallError) -> Self {
match e {
TypedCallError::Transport(t) => Self::Transport(t),
TypedCallError::Codec(c) => Self::Codec(c),
}
}
}
#[derive(Clone)]
pub struct RegistryClient {
mesh: Arc<MeshNode>,
deadline: Duration,
}
impl RegistryClient {
pub fn new(mesh: Arc<MeshNode>) -> Self {
Self {
mesh,
deadline: DEFAULT_REGISTRY_DEADLINE,
}
}
pub fn with_deadline(mut self, deadline: Duration) -> Self {
self.deadline = deadline;
self
}
pub fn set_deadline_mut(&mut self, deadline: Duration) {
self.deadline = deadline;
}
pub async fn list(
&self,
target_node_id: u64,
) -> Result<Vec<RegistryGroupSummary>, RegistryClientError> {
self.list_with_service(target_node_id, REGISTRY_SERVICE)
.await
}
pub async fn list_with_service(
&self,
target_node_id: u64,
service: &str,
) -> Result<Vec<RegistryGroupSummary>, RegistryClientError> {
let response = self
.send(target_node_id, service, RegistryRequest::List)
.await?;
match response {
RegistryResponse::Groups(groups) => Ok(groups),
RegistryResponse::Error(e) => Err(RegistryClientError::Server(e)),
other => Err(RegistryClientError::Codec(format!(
"unexpected response for List: {other:?}"
))),
}
}
pub async fn spawn(
&self,
target_node_id: u64,
template_name: impl Into<String>,
group_name: impl Into<String>,
replica_count: u8,
) -> Result<RegistryGroupSummary, RegistryClientError> {
self.spawn_with_service(
target_node_id,
REGISTRY_SERVICE,
template_name,
group_name,
replica_count,
)
.await
}
pub async fn spawn_with_service(
&self,
target_node_id: u64,
service: &str,
template_name: impl Into<String>,
group_name: impl Into<String>,
replica_count: u8,
) -> Result<RegistryGroupSummary, RegistryClientError> {
let request = RegistryRequest::Spawn {
template_name: template_name.into(),
group_name: group_name.into(),
replica_count,
};
let response = self.send(target_node_id, service, request).await?;
match response {
RegistryResponse::Spawned(summary) => Ok(summary),
RegistryResponse::Error(e) => Err(RegistryClientError::Server(e)),
other => Err(RegistryClientError::Codec(format!(
"unexpected response for Spawn: {other:?}"
))),
}
}
pub async fn unregister(
&self,
target_node_id: u64,
group_name: impl Into<String>,
) -> Result<bool, RegistryClientError> {
self.unregister_with_service(target_node_id, REGISTRY_SERVICE, group_name)
.await
}
pub async fn scale(
&self,
target_node_id: u64,
group_name: impl Into<String>,
template_name: impl Into<String>,
target_replica_count: u8,
) -> Result<RegistryGroupSummary, RegistryClientError> {
self.scale_with_service(
target_node_id,
REGISTRY_SERVICE,
group_name,
template_name,
target_replica_count,
)
.await
}
pub async fn scale_with_service(
&self,
target_node_id: u64,
service: &str,
group_name: impl Into<String>,
template_name: impl Into<String>,
target_replica_count: u8,
) -> Result<RegistryGroupSummary, RegistryClientError> {
let request = RegistryRequest::Scale {
group_name: group_name.into(),
template_name: template_name.into(),
target_replica_count,
};
let response = self.send(target_node_id, service, request).await?;
match response {
RegistryResponse::Scaled(summary) => Ok(summary),
RegistryResponse::Error(e) => Err(RegistryClientError::Server(e)),
other => Err(RegistryClientError::Codec(format!(
"unexpected response for Scale: {other:?}"
))),
}
}
pub async fn unregister_with_service(
&self,
target_node_id: u64,
service: &str,
group_name: impl Into<String>,
) -> Result<bool, RegistryClientError> {
let request = RegistryRequest::Unregister {
group_name: group_name.into(),
};
let response = self.send(target_node_id, service, request).await?;
match response {
RegistryResponse::Unregistered { existed } => Ok(existed),
RegistryResponse::Error(e) => Err(RegistryClientError::Server(e)),
other => Err(RegistryClientError::Codec(format!(
"unexpected response for Unregister: {other:?}"
))),
}
}
async fn send(
&self,
target_node_id: u64,
service: &str,
request: RegistryRequest,
) -> Result<RegistryResponse, RegistryClientError> {
Ok(typed_call::<RegistryRequest, RegistryResponse>(
&self.mesh,
target_node_id,
service,
&request,
self.deadline,
)
.await?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::identity::EntityKeypair;
use crate::adapter::net::MeshNodeConfig;
use std::net::SocketAddr;
async fn build_mesh() -> Arc<MeshNode> {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let cfg = MeshNodeConfig::new(addr, [0x17u8; 32]);
Arc::new(
MeshNode::new(EntityKeypair::generate(), cfg)
.await
.expect("MeshNode::new"),
)
}
#[tokio::test]
async fn new_carries_default_deadline() {
let mesh = build_mesh().await;
let client = RegistryClient::new(mesh);
assert_eq!(client.deadline, DEFAULT_REGISTRY_DEADLINE);
}
#[tokio::test]
async fn with_deadline_overrides_default() {
let mesh = build_mesh().await;
let client = RegistryClient::new(mesh).with_deadline(Duration::from_secs(7));
assert_eq!(client.deadline, Duration::from_secs(7));
}
}