zus_rpc_server/
handler.rs

1use {
2  futures::{SinkExt, StreamExt},
3  std::{sync::Arc, time::Instant},
4  tokio::net::TcpStream,
5  tokio_util::codec::Framed,
6  tracing::{debug, error, instrument},
7};
8
9use {
10  zus_common::{Result, RpcCodec, RpcMessage},
11  zus_proto::constants::*,
12};
13
14use crate::service::{RequestContext, ServiceRegistry};
15
16/// RPC Handler (replacing Java's ZusMinaServerHandler)
17pub struct RpcHandler {
18  service_registry: Arc<ServiceRegistry>,
19}
20
21impl RpcHandler {
22  pub fn new(service_registry: Arc<ServiceRegistry>) -> Self {
23    Self { service_registry }
24  }
25
26  pub async fn handle(&self, mut framed: Framed<TcpStream, RpcCodec>) -> Result<()> {
27    while let Some(result) = framed.next().await {
28      match result {
29        | Ok(msg) => {
30          if msg.header.msg_type == MSG_TYPE_REQ {
31            let response = self.process_request(msg).await;
32            framed.send(response).await?;
33          } else if msg.header.msg_type == MSG_TYPE_NOTIFY {
34            // Fire and forget, no response
35            let _ = self.process_request(msg).await;
36          }
37        }
38        | Err(e) => {
39          error!("Decode error: {:?}", e);
40          return Err(e);
41        }
42      }
43    }
44
45    Ok(())
46  }
47
48  #[instrument(name = "rpc_process_request", skip(self, msg), fields(sequence = msg.header.sequence))]
49  async fn process_request(&self, msg: RpcMessage) -> RpcMessage {
50    let sequence = msg.header.sequence;
51    let method_name = String::from_utf8_lossy(&msg.method);
52
53    debug!("Processing request: {}", method_name);
54
55    // Start timing
56    let start = Instant::now();
57
58    // Extract request context (simplified)
59    let context = RequestContext::default();
60
61    // Handle special methods
62    if method_name == ".CheckServerStatus" {
63      return RpcMessage::new_response(sequence, bytes::Bytes::from("OK"));
64    }
65
66    // Route to service (using first service for now - can be improved with service name in protocol)
67    let (response_body, service_name) =
68      if let Some((service_name, service)) = self.service_registry.services().iter().next() {
69        // Automatically record request
70        self.service_registry.record_request(service_name);
71
72        let result = service.do_work(&method_name, msg.body, context).await;
73
74        // Calculate latency
75        let latency_ms = start.elapsed().as_millis() as u64;
76
77        match result {
78          | Ok(response) => {
79            // Automatically record successful response
80            self.service_registry.record_response(service_name, latency_ms, true);
81            (response, service_name.clone())
82          }
83          | Err(e) => {
84            error!("Service error: {:?}", e);
85            // Automatically record failed response
86            self.service_registry.record_response(service_name, latency_ms, false);
87            (bytes::Bytes::from(format!("Error: {e}")), service_name.clone())
88          }
89        }
90      } else {
91        (bytes::Bytes::from("No service registered"), String::new())
92      };
93
94    debug!("Request processed for service: {}", service_name);
95
96    RpcMessage::new_response(sequence, response_body)
97  }
98}