zus_rpc_server/
manager.rs1use {
2 std::sync::Arc,
3 tokio::{
4 net::{TcpListener, TcpStream},
5 sync::Semaphore,
6 },
7 tokio_util::codec::Framed,
8 tracing::{error, info},
9};
10
11use {
12 zus_common::{Result, RpcCodec},
13 zus_discovery::ZusZooClient,
14};
15
16use {
17 crate::{
18 handler::RpcHandler,
19 service::{Service, ServiceRegistry, ServiceStatistics},
20 },
21 std::collections::HashMap,
22};
23
24pub struct ZusServerManager {
26 address: String,
27 port: u16,
28 service_registry: Arc<ServiceRegistry>,
29 max_connections: usize,
30 zoo_client: Option<Arc<ZusZooClient>>,
31}
32
33impl ZusServerManager {
34 pub fn new(address: String, port: u16) -> Self {
35 Self {
36 address,
37 port,
38 service_registry: Arc::new(ServiceRegistry::new()),
39 max_connections: 1000,
40 zoo_client: None,
41 }
42 }
43
44 pub fn with_max_connections(mut self, max: usize) -> Self {
45 self.max_connections = max;
46 self
47 }
48
49 pub fn register_service(&mut self, service: Arc<dyn Service>) {
50 Arc::get_mut(&mut self.service_registry).unwrap().register(service);
51 }
52
53 pub fn with_zoo_client(mut self, zoo_client: Arc<ZusZooClient>) -> Self {
54 self.zoo_client = Some(zoo_client);
55 self
56 }
57
58 pub fn get_statistics(&self, service_name: &str) -> Option<ServiceStatistics> {
60 self.service_registry.get_statistics(service_name)
61 }
62
63 pub fn get_all_statistics(&self) -> HashMap<String, ServiceStatistics> {
65 self.service_registry.get_all_statistics()
66 }
67
68 pub fn reset_statistics(&self, service_name: &str) {
70 self.service_registry.reset_statistics(service_name);
71 }
72
73 pub fn reset_all_statistics(&self) {
75 self.service_registry.reset_all_statistics();
76 }
77
78 pub async fn start(self) -> Result<()> {
80 let addr = format!("{}:{}", self.address, self.port);
81 let listener = TcpListener::bind(&addr).await?;
82 info!("ZUS RPC Server v{} listening on {}", env!("CARGO_PKG_VERSION"), addr);
83
84 if let Some(ref zoo_client) = self.zoo_client {
86 let service_path = format!("/zus/services/rust/{addr}");
87 zoo_client
88 .create_path(
89 service_path.clone(),
90 bytes::Bytes::from(addr.clone()),
91 zus_proto::constants::PATH_FLAG_EPHEMERAL,
92 )
93 .await?;
94 info!("Registered with ZooServer at {}", service_path);
95 }
96
97 let semaphore = Arc::new(Semaphore::new(self.max_connections));
98 let service_registry = self.service_registry.clone();
99
100 loop {
101 let permit = semaphore.clone().acquire_owned().await.unwrap();
102 let (stream, peer_addr) = listener.accept().await?;
103
104 info!("New connection from {}", peer_addr);
105
106 let service_registry = service_registry.clone();
107 tokio::spawn(async move {
108 if let Err(e) = handle_connection(stream, service_registry).await {
109 error!("Connection error: {:?}", e);
110 }
111 drop(permit);
112 });
113 }
114 }
115}
116
117async fn handle_connection(stream: TcpStream, service_registry: Arc<ServiceRegistry>) -> Result<()> {
118 let framed = Framed::new(stream, RpcCodec::new());
119 let handler = RpcHandler::new(service_registry);
120 handler.handle(framed).await
121}