zus_rpc_server/
service.rs1use {
2 async_trait::async_trait,
3 bytes::Bytes,
4 dashmap::DashMap,
5 std::{
6 collections::HashMap,
7 sync::{
8 Arc,
9 atomic::{AtomicU64, Ordering},
10 },
11 },
12};
13
14use zus_common::Result;
15
16#[derive(Debug, Clone, Default)]
18pub struct RequestContext {
19 pub user_id: u64,
20 pub device_type: String,
21 pub client_ip: String,
22 pub app_id: String,
23 pub client_version: String,
24 pub session_id: String,
25}
26
27#[async_trait]
29pub trait Service: Send + Sync {
30 fn service_name(&self) -> &str;
32
33 async fn do_work(&self, method: &str, params: Bytes, context: RequestContext) -> Result<Bytes>;
35}
36
37#[derive(Debug, Clone, Default)]
39pub struct ServiceStatistics {
40 pub request_count: u64,
41 pub error_count: u64,
42 pub total_latency_ms: u64,
43 pub avg_latency_ms: u64,
44}
45
46#[derive(Debug, Default)]
48pub struct ServiceStats {
49 request_count: AtomicU64,
50 error_count: AtomicU64,
51 total_latency_ms: AtomicU64,
52}
53
54impl ServiceStats {
55 pub fn new() -> Self {
56 Self {
57 request_count: AtomicU64::new(0),
58 error_count: AtomicU64::new(0),
59 total_latency_ms: AtomicU64::new(0),
60 }
61 }
62
63 pub fn record_request(&self) {
65 self.request_count.fetch_add(1, Ordering::Relaxed);
66 }
67
68 pub fn record_response(&self, latency_ms: u64, success: bool) {
70 self.total_latency_ms.fetch_add(latency_ms, Ordering::Relaxed);
71 if !success {
72 self.error_count.fetch_add(1, Ordering::Relaxed);
73 }
74 }
75
76 pub fn get_snapshot(&self) -> ServiceStatistics {
78 let request_count = self.request_count.load(Ordering::Relaxed);
79 let error_count = self.error_count.load(Ordering::Relaxed);
80 let total_latency_ms = self.total_latency_ms.load(Ordering::Relaxed);
81
82 let avg_latency_ms = if request_count > 0 {
83 total_latency_ms / request_count
84 } else {
85 0
86 };
87
88 ServiceStatistics {
89 request_count,
90 error_count,
91 total_latency_ms,
92 avg_latency_ms,
93 }
94 }
95
96 pub fn reset(&self) {
98 self.request_count.store(0, Ordering::Relaxed);
99 self.error_count.store(0, Ordering::Relaxed);
100 self.total_latency_ms.store(0, Ordering::Relaxed);
101 }
102}
103
104pub type ServiceMethodHandler = fn(&dyn Service, Bytes, RequestContext) -> Result<Bytes>;
106
107pub struct ServiceRegistry {
109 services: HashMap<String, Arc<dyn Service>>,
110 stats: Arc<DashMap<String, ServiceStats>>,
112}
113
114impl ServiceRegistry {
115 pub fn new() -> Self {
116 Self {
117 services: HashMap::new(),
118 stats: Arc::new(DashMap::new()),
119 }
120 }
121
122 pub fn register(&mut self, service: Arc<dyn Service>) {
123 let name = service.service_name().to_string();
124 self.stats.insert(name.clone(), ServiceStats::new());
126 self.services.insert(name, service);
127 }
128
129 pub fn get(&self, name: &str) -> Option<&Arc<dyn Service>> {
130 self.services.get(name)
131 }
132
133 pub fn services(&self) -> &HashMap<String, Arc<dyn Service>> {
134 &self.services
135 }
136
137 pub fn get_statistics(&self, service_name: &str) -> Option<ServiceStatistics> {
139 self.stats.get(service_name).map(|s| s.get_snapshot())
140 }
141
142 pub fn get_all_statistics(&self) -> HashMap<String, ServiceStatistics> {
144 self
145 .stats
146 .iter()
147 .map(|entry| (entry.key().clone(), entry.value().get_snapshot()))
148 .collect()
149 }
150
151 pub fn record_request(&self, service_name: &str) {
153 if let Some(stats) = self.stats.get(service_name) {
154 stats.record_request();
155 }
156 }
157
158 pub fn record_response(&self, service_name: &str, latency_ms: u64, success: bool) {
160 if let Some(stats) = self.stats.get(service_name) {
161 stats.record_response(latency_ms, success);
162 }
163 }
164
165 pub fn reset_statistics(&self, service_name: &str) {
167 if let Some(stats) = self.stats.get(service_name) {
168 stats.reset();
169 }
170 }
171
172 pub fn reset_all_statistics(&self) {
174 for entry in self.stats.iter() {
175 entry.value().reset();
176 }
177 }
178}
179
180impl Default for ServiceRegistry {
181 fn default() -> Self {
182 Self::new()
183 }
184}