zus_rpc_server/
mobile_server.rs1use std::{collections::HashMap, sync::Arc};
9
10use tokio::{
11 net::{TcpListener, TcpStream},
12 sync::Semaphore,
13};
14use tokio_util::codec::Framed;
15use tracing::{error, info};
16
17use zus_common::{MobileCodec, Result};
18use zus_discovery::ZusZooClient;
19
20use crate::{
21 mobile_handler::MobileRpcHandler,
22 service::{Service, ServiceRegistry, ServiceStatistics},
23};
24
25pub struct MobileRpcServer {
47 address: String,
48 port: u16,
49 service_registry: Arc<ServiceRegistry>,
50 max_connections: usize,
51 zoo_client: Option<Arc<ZusZooClient>>,
52}
53
54impl MobileRpcServer {
55 pub fn new(address: String, port: u16) -> Self {
56 Self {
57 address,
58 port,
59 service_registry: Arc::new(ServiceRegistry::new()),
60 max_connections: 1000,
61 zoo_client: None,
62 }
63 }
64
65 pub fn with_max_connections(mut self, max: usize) -> Self {
66 self.max_connections = max;
67 self
68 }
69
70 pub fn register_service(&mut self, service: Arc<dyn Service>) {
71 Arc::get_mut(&mut self.service_registry).unwrap().register(service);
72 }
73
74 pub fn with_zoo_client(mut self, zoo_client: Arc<ZusZooClient>) -> Self {
75 self.zoo_client = Some(zoo_client);
76 self
77 }
78
79 pub fn get_statistics(&self, service_name: &str) -> Option<ServiceStatistics> {
81 self.service_registry.get_statistics(service_name)
82 }
83
84 pub fn get_all_statistics(&self) -> HashMap<String, ServiceStatistics> {
86 self.service_registry.get_all_statistics()
87 }
88
89 pub fn reset_statistics(&self, service_name: &str) {
91 self.service_registry.reset_statistics(service_name);
92 }
93
94 pub fn reset_all_statistics(&self) {
96 self.service_registry.reset_all_statistics();
97 }
98
99 pub async fn start(self) -> Result<()> {
101 let addr = format!("{}:{}", self.address, self.port);
102 let listener = TcpListener::bind(&addr).await?;
103 info!(
104 "Mobile RPC Server v{} listening on {} (16-byte protocol)",
105 env!("CARGO_PKG_VERSION"),
106 addr
107 );
108
109 if let Some(ref zoo_client) = self.zoo_client {
111 let service_path = format!("/zus/services/mobile/{addr}");
112 zoo_client
113 .create_path(
114 service_path.clone(),
115 bytes::Bytes::from(addr.clone()),
116 zus_proto::constants::PATH_FLAG_EPHEMERAL,
117 )
118 .await?;
119 info!("Registered mobile server with ZooServer at {}", service_path);
120 }
121
122 let semaphore = Arc::new(Semaphore::new(self.max_connections));
123 let service_registry = self.service_registry.clone();
124
125 loop {
126 let permit = semaphore.clone().acquire_owned().await.unwrap();
127 let (stream, peer_addr) = listener.accept().await?;
128
129 info!("New mobile connection from {}", peer_addr);
130
131 let service_registry = service_registry.clone();
132 tokio::spawn(async move {
133 if let Err(e) = handle_mobile_connection(stream, service_registry).await {
134 error!("Mobile connection error: {:?}", e);
135 }
136 drop(permit);
137 });
138 }
139 }
140}
141
142async fn handle_mobile_connection(stream: TcpStream, service_registry: Arc<ServiceRegistry>) -> Result<()> {
143 let framed = Framed::new(stream, MobileCodec::new());
144 let handler = MobileRpcHandler::new(service_registry);
145 handler.handle(framed).await
146}