zus_rpc_server/
service.rs

1use {
2  async_trait::async_trait,
3  bytes::Bytes,
4  dashmap::DashMap,
5  std::{
6    collections::HashMap,
7    sync::{
8      Arc,
9      atomic::{AtomicU64, Ordering},
10    },
11  },
12};
13
14use zus_common::Result;
15
16/// Request context (replacing Java's RequestContext)
17#[derive(Debug, Clone, Default)]
18pub struct RequestContext {
19  pub user_id: u64,
20  pub device_type: String,
21  pub client_ip: String,
22  pub app_id: String,
23  pub client_version: String,
24  pub session_id: String,
25}
26
27/// Service trait (replacing Java's ServerService interface)
28#[async_trait]
29pub trait Service: Send + Sync {
30  /// Get service name
31  fn service_name(&self) -> &str;
32
33  /// Process RPC request
34  async fn do_work(&self, method: &str, params: Bytes, context: RequestContext) -> Result<Bytes>;
35}
36
37/// Service statistics (snapshot at a point in time)
38#[derive(Debug, Clone, Default)]
39pub struct ServiceStatistics {
40  pub request_count: u64,
41  pub error_count: u64,
42  pub total_latency_ms: u64,
43  pub avg_latency_ms: u64,
44}
45
46/// Thread-safe service statistics tracker
47#[derive(Debug, Default)]
48pub struct ServiceStats {
49  request_count: AtomicU64,
50  error_count: AtomicU64,
51  total_latency_ms: AtomicU64,
52}
53
54impl ServiceStats {
55  pub fn new() -> Self {
56    Self {
57      request_count: AtomicU64::new(0),
58      error_count: AtomicU64::new(0),
59      total_latency_ms: AtomicU64::new(0),
60    }
61  }
62
63  /// Record a request (called automatically by handler)
64  pub fn record_request(&self) {
65    self.request_count.fetch_add(1, Ordering::Relaxed);
66  }
67
68  /// Record a response with latency (called automatically by handler)
69  pub fn record_response(&self, latency_ms: u64, success: bool) {
70    self.total_latency_ms.fetch_add(latency_ms, Ordering::Relaxed);
71    if !success {
72      self.error_count.fetch_add(1, Ordering::Relaxed);
73    }
74  }
75
76  /// Get current statistics snapshot
77  pub fn get_snapshot(&self) -> ServiceStatistics {
78    let request_count = self.request_count.load(Ordering::Relaxed);
79    let error_count = self.error_count.load(Ordering::Relaxed);
80    let total_latency_ms = self.total_latency_ms.load(Ordering::Relaxed);
81
82    let avg_latency_ms = if request_count > 0 {
83      total_latency_ms / request_count
84    } else {
85      0
86    };
87
88    ServiceStatistics {
89      request_count,
90      error_count,
91      total_latency_ms,
92      avg_latency_ms,
93    }
94  }
95
96  /// Reset all statistics
97  pub fn reset(&self) {
98    self.request_count.store(0, Ordering::Relaxed);
99    self.error_count.store(0, Ordering::Relaxed);
100    self.total_latency_ms.store(0, Ordering::Relaxed);
101  }
102}
103
104/// Service method handler
105pub type ServiceMethodHandler = fn(&dyn Service, Bytes, RequestContext) -> Result<Bytes>;
106
107/// Service registry with automatic statistics tracking
108pub struct ServiceRegistry {
109  services: HashMap<String, Arc<dyn Service>>,
110  // Automatically track statistics for each service by name
111  stats: Arc<DashMap<String, ServiceStats>>,
112}
113
114impl ServiceRegistry {
115  pub fn new() -> Self {
116    Self {
117      services: HashMap::new(),
118      stats: Arc::new(DashMap::new()),
119    }
120  }
121
122  pub fn register(&mut self, service: Arc<dyn Service>) {
123    let name = service.service_name().to_string();
124    // Initialize statistics tracker for this service
125    self.stats.insert(name.clone(), ServiceStats::new());
126    self.services.insert(name, service);
127  }
128
129  pub fn get(&self, name: &str) -> Option<&Arc<dyn Service>> {
130    self.services.get(name)
131  }
132
133  pub fn services(&self) -> &HashMap<String, Arc<dyn Service>> {
134    &self.services
135  }
136
137  /// Get statistics for a specific service
138  pub fn get_statistics(&self, service_name: &str) -> Option<ServiceStatistics> {
139    self.stats.get(service_name).map(|s| s.get_snapshot())
140  }
141
142  /// Get statistics for all services
143  pub fn get_all_statistics(&self) -> HashMap<String, ServiceStatistics> {
144    self
145      .stats
146      .iter()
147      .map(|entry| (entry.key().clone(), entry.value().get_snapshot()))
148      .collect()
149  }
150
151  /// Record a request for a service (called automatically by handler)
152  pub fn record_request(&self, service_name: &str) {
153    if let Some(stats) = self.stats.get(service_name) {
154      stats.record_request();
155    }
156  }
157
158  /// Record a response for a service (called automatically by handler)
159  pub fn record_response(&self, service_name: &str, latency_ms: u64, success: bool) {
160    if let Some(stats) = self.stats.get(service_name) {
161      stats.record_response(latency_ms, success);
162    }
163  }
164
165  /// Reset statistics for a specific service
166  pub fn reset_statistics(&self, service_name: &str) {
167    if let Some(stats) = self.stats.get(service_name) {
168      stats.reset();
169    }
170  }
171
172  /// Reset statistics for all services
173  pub fn reset_all_statistics(&self) {
174    for entry in self.stats.iter() {
175      entry.value().reset();
176    }
177  }
178}
179
180impl Default for ServiceRegistry {
181  fn default() -> Self {
182    Self::new()
183  }
184}