zus_rpc_server/
mobile_server.rs

1//! Mobile RPC Server
2//!
3//! Server for mobile protocol (16-byte header) with:
4//! - Automatic heartbeat handling
5//! - 256-byte compression threshold
6//! - Same Service trait as standard server
7
8use std::{collections::HashMap, sync::Arc};
9
10use tokio::{
11  net::{TcpListener, TcpStream},
12  sync::Semaphore,
13};
14use tokio_util::codec::Framed;
15use tracing::{error, info};
16
17use zus_common::{MobileCodec, Result};
18use zus_discovery::ZusZooClient;
19
20use crate::{
21  mobile_handler::MobileRpcHandler,
22  service::{Service, ServiceRegistry, ServiceStatistics},
23};
24
25/// Mobile RPC Server
26///
27/// Similar to ZusServerManager but uses the mobile 16-byte protocol.
28/// Automatically handles heartbeat packets.
29///
30/// # Example
31/// ```ignore
32/// use zus_rpc_server::{MobileRpcServer, Service};
33///
34/// struct MyService;
35/// impl Service for MyService {
36///     fn service_name(&self) -> &str { "MyService" }
37///     async fn do_work(&self, method: &str, params: Bytes, ctx: RequestContext) -> Result<Bytes> {
38///         Ok(params)
39///     }
40/// }
41///
42/// let mut server = MobileRpcServer::new("0.0.0.0".to_string(), 9528);
43/// server.register_service(Arc::new(MyService));
44/// server.start().await?;
45/// ```
46pub struct MobileRpcServer {
47  address: String,
48  port: u16,
49  service_registry: Arc<ServiceRegistry>,
50  max_connections: usize,
51  zoo_client: Option<Arc<ZusZooClient>>,
52}
53
54impl MobileRpcServer {
55  pub fn new(address: String, port: u16) -> Self {
56    Self {
57      address,
58      port,
59      service_registry: Arc::new(ServiceRegistry::new()),
60      max_connections: 1000,
61      zoo_client: None,
62    }
63  }
64
65  pub fn with_max_connections(mut self, max: usize) -> Self {
66    self.max_connections = max;
67    self
68  }
69
70  pub fn register_service(&mut self, service: Arc<dyn Service>) {
71    Arc::get_mut(&mut self.service_registry).unwrap().register(service);
72  }
73
74  pub fn with_zoo_client(mut self, zoo_client: Arc<ZusZooClient>) -> Self {
75    self.zoo_client = Some(zoo_client);
76    self
77  }
78
79  /// Get statistics for a specific service
80  pub fn get_statistics(&self, service_name: &str) -> Option<ServiceStatistics> {
81    self.service_registry.get_statistics(service_name)
82  }
83
84  /// Get statistics for all registered services
85  pub fn get_all_statistics(&self) -> HashMap<String, ServiceStatistics> {
86    self.service_registry.get_all_statistics()
87  }
88
89  /// Reset statistics for a specific service
90  pub fn reset_statistics(&self, service_name: &str) {
91    self.service_registry.reset_statistics(service_name);
92  }
93
94  /// Reset statistics for all services
95  pub fn reset_all_statistics(&self) {
96    self.service_registry.reset_all_statistics();
97  }
98
99  /// Start the mobile server
100  pub async fn start(self) -> Result<()> {
101    let addr = format!("{}:{}", self.address, self.port);
102    let listener = TcpListener::bind(&addr).await?;
103    info!(
104      "Mobile RPC Server v{} listening on {} (16-byte protocol)",
105      env!("CARGO_PKG_VERSION"),
106      addr
107    );
108
109    // Register with ZooServer if configured
110    if let Some(ref zoo_client) = self.zoo_client {
111      let service_path = format!("/zus/services/mobile/{addr}");
112      zoo_client
113        .create_path(
114          service_path.clone(),
115          bytes::Bytes::from(addr.clone()),
116          zus_proto::constants::PATH_FLAG_EPHEMERAL,
117        )
118        .await?;
119      info!("Registered mobile server with ZooServer at {}", service_path);
120    }
121
122    let semaphore = Arc::new(Semaphore::new(self.max_connections));
123    let service_registry = self.service_registry.clone();
124
125    loop {
126      let permit = semaphore.clone().acquire_owned().await.unwrap();
127      let (stream, peer_addr) = listener.accept().await?;
128
129      info!("New mobile connection from {}", peer_addr);
130
131      let service_registry = service_registry.clone();
132      tokio::spawn(async move {
133        if let Err(e) = handle_mobile_connection(stream, service_registry).await {
134          error!("Mobile connection error: {:?}", e);
135        }
136        drop(permit);
137      });
138    }
139  }
140}
141
142async fn handle_mobile_connection(stream: TcpStream, service_registry: Arc<ServiceRegistry>) -> Result<()> {
143  let framed = Framed::new(stream, MobileCodec::new());
144  let handler = MobileRpcHandler::new(service_registry);
145  handler.handle(framed).await
146}