zus_rpc_server/
manager.rs

1use {
2  std::sync::Arc,
3  tokio::{
4    net::{TcpListener, TcpStream},
5    sync::Semaphore,
6  },
7  tokio_util::codec::Framed,
8  tracing::{error, info},
9};
10
11use {
12  zus_common::{Result, RpcCodec},
13  zus_discovery::ZusZooClient,
14};
15
16use {
17  crate::{
18    handler::RpcHandler,
19    service::{Service, ServiceRegistry, ServiceStatistics},
20  },
21  std::collections::HashMap,
22};
23
24/// ZUS Server Manager (replacing Java's ZusServerManager)
25pub struct ZusServerManager {
26  address: String,
27  port: u16,
28  service_registry: Arc<ServiceRegistry>,
29  max_connections: usize,
30  zoo_client: Option<Arc<ZusZooClient>>,
31}
32
33impl ZusServerManager {
34  pub fn new(address: String, port: u16) -> Self {
35    Self {
36      address,
37      port,
38      service_registry: Arc::new(ServiceRegistry::new()),
39      max_connections: 1000,
40      zoo_client: None,
41    }
42  }
43
44  pub fn with_max_connections(mut self, max: usize) -> Self {
45    self.max_connections = max;
46    self
47  }
48
49  pub fn register_service(&mut self, service: Arc<dyn Service>) {
50    Arc::get_mut(&mut self.service_registry).unwrap().register(service);
51  }
52
53  pub fn with_zoo_client(mut self, zoo_client: Arc<ZusZooClient>) -> Self {
54    self.zoo_client = Some(zoo_client);
55    self
56  }
57
58  /// Get statistics for a specific service
59  pub fn get_statistics(&self, service_name: &str) -> Option<ServiceStatistics> {
60    self.service_registry.get_statistics(service_name)
61  }
62
63  /// Get statistics for all registered services
64  pub fn get_all_statistics(&self) -> HashMap<String, ServiceStatistics> {
65    self.service_registry.get_all_statistics()
66  }
67
68  /// Reset statistics for a specific service
69  pub fn reset_statistics(&self, service_name: &str) {
70    self.service_registry.reset_statistics(service_name);
71  }
72
73  /// Reset statistics for all services
74  pub fn reset_all_statistics(&self) {
75    self.service_registry.reset_all_statistics();
76  }
77
78  /// Start the server
79  pub async fn start(self) -> Result<()> {
80    let addr = format!("{}:{}", self.address, self.port);
81    let listener = TcpListener::bind(&addr).await?;
82    info!("ZUS RPC Server v{} listening on {}", env!("CARGO_PKG_VERSION"), addr);
83
84    // Register with ZooServer if configured
85    if let Some(ref zoo_client) = self.zoo_client {
86      let service_path = format!("/zus/services/rust/{addr}");
87      zoo_client
88        .create_path(
89          service_path.clone(),
90          bytes::Bytes::from(addr.clone()),
91          zus_proto::constants::PATH_FLAG_EPHEMERAL,
92        )
93        .await?;
94      info!("Registered with ZooServer at {}", service_path);
95    }
96
97    let semaphore = Arc::new(Semaphore::new(self.max_connections));
98    let service_registry = self.service_registry.clone();
99
100    loop {
101      let permit = semaphore.clone().acquire_owned().await.unwrap();
102      let (stream, peer_addr) = listener.accept().await?;
103
104      info!("New connection from {}", peer_addr);
105
106      let service_registry = service_registry.clone();
107      tokio::spawn(async move {
108        if let Err(e) = handle_connection(stream, service_registry).await {
109          error!("Connection error: {:?}", e);
110        }
111        drop(permit);
112      });
113    }
114  }
115}
116
117async fn handle_connection(stream: TcpStream, service_registry: Arc<ServiceRegistry>) -> Result<()> {
118  let framed = Framed::new(stream, RpcCodec::new());
119  let handler = RpcHandler::new(service_registry);
120  handler.handle(framed).await
121}