1use std::sync::Arc;
2use tokio::sync::RwLock;
3use lapin::message::Delivery;
4use tracing::{info, error, debug, warn};
5use uuid::Uuid;
6use futures_util::StreamExt;
7
8use crate::connection::{ConnectionManager, ConnectionConfig};
9use crate::rpc::{RpcFramework, RpcHandler};
10use crate::message::Message;
11use crate::error::Result;
12
13#[derive(Debug, Clone)]
15pub struct ServiceConfig {
16 pub service_name: String,
18 pub connection: ConnectionConfig,
20 pub max_concurrent_messages: usize,
22 pub health_check_interval_seconds: u64,
24}
25
26impl ServiceConfig {
27 pub fn new(service_name: impl Into<String>, amqp_url: impl Into<String>) -> Self {
29 let mut connection_config = ConnectionConfig::default();
30 connection_config.url = amqp_url.into();
31
32 Self {
33 service_name: service_name.into(),
34 connection: connection_config,
35 max_concurrent_messages: 100,
36 health_check_interval_seconds: 30,
37 }
38 }
39}
40
41#[derive(Debug)]
49pub struct MicroService {
50 config: ServiceConfig,
52 connection: Arc<ConnectionManager>,
54 rpc: Arc<RpcFramework>,
56 status: Arc<RwLock<ServiceStatus>>,
58}
59
60#[derive(Debug, Clone)]
62pub enum ServiceStatus {
63 Starting,
65 Running,
67 ShuttingDown,
69 Stopped,
71 Error(String),
73}
74
75impl MicroService {
76 pub async fn new(config: ServiceConfig) -> Result<Self> {
78 info!("๐ Creating microservice: {}", config.service_name);
79
80 let connection = Arc::new(ConnectionManager::with_config(config.connection.clone()));
81 let rpc = Arc::new(RpcFramework::new(connection.clone(), config.service_name.clone()));
82
83 Ok(Self {
84 config,
85 connection,
86 rpc,
87 status: Arc::new(RwLock::new(ServiceStatus::Starting)),
88 })
89 }
90
91 pub async fn new_simple(service_name: impl Into<String>, amqp_url: impl Into<String>) -> Result<Self> {
93 let config = ServiceConfig::new(service_name, amqp_url);
94 Self::new(config).await
95 }
96
97 pub async fn register_handler<H>(&self, method: impl Into<String>, handler: H)
101 where
102 H: RpcHandler + 'static,
103 {
104 self.rpc.register_handler(method, handler).await;
105 }
106
107 pub async fn register_function<F, Fut>(&self, method: impl Into<String>, handler: F)
109 where
110 F: Fn(Message) -> Fut + Send + Sync + 'static,
111 Fut: std::future::Future<Output = Result<crate::message::RpcResponse>> + Send + 'static,
112 {
113 self.rpc.register_function(method, handler).await;
114 }
115
116 pub async fn start(&self) -> Result<()> {
118 info!("๐ Starting microservice: {}", self.config.service_name);
119
120 *self.status.write().await = ServiceStatus::Running;
122
123 self.connection.connect().await?;
125 info!("โ
Connected to RabbitMQ");
126
127 let request_queue = format!("rabbitmesh.{}", self.config.service_name);
129 let response_queue = format!("rabbitmesh.{}.responses", self.config.service_name);
130
131 self.connection.declare_queue(&request_queue).await?;
132 self.connection.declare_queue(&response_queue).await?;
133 info!("โ
Declared queues: {}, {}", request_queue, response_queue);
134
135 let request_processor = self.start_request_processor().await?;
137 let response_processor = self.start_response_processor().await?;
138 let cleanup_task = self.start_cleanup_task().await?;
139 let health_check_task = self.start_health_check_task().await?;
140
141 info!("๐ฏ Microservice {} is running and ready to process messages", self.config.service_name);
142 info!("๐ Max concurrent messages: {}", self.config.max_concurrent_messages);
143
144 tokio::try_join!(
146 request_processor,
147 response_processor,
148 cleanup_task,
149 health_check_task,
150 )?;
151
152 Ok(())
153 }
154
155 async fn start_request_processor(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
157 let queue_name = format!("rabbitmesh.{}", self.config.service_name);
158 let consumer_tag = format!("{}-requests-{}", self.config.service_name, Uuid::new_v4());
159
160 let consumer = self.connection.create_consumer(&queue_name, &consumer_tag).await?;
161 let rpc = self.rpc.clone();
162 let service_name = self.config.service_name.clone();
163 let max_concurrent = self.config.max_concurrent_messages;
164
165 let handle = tokio::spawn(async move {
166 info!("๐ฅ Request processor started for {}", service_name);
167
168 let mut stream = consumer;
170 while let Some(delivery_result) = stream.next().await {
171 match delivery_result {
172 Ok(delivery) => {
173 let rpc_clone = rpc.clone();
174 tokio::spawn(async move {
175 if let Err(e) = Self::process_request_message(delivery, rpc_clone).await {
176 error!("Error processing request: {}", e);
177 }
178 });
179 }
180 Err(e) => {
181 error!("Error receiving message: {}", e);
182 }
183 }
184 }
185
186 warn!("Request processor stopped for {}", service_name);
187 Ok(())
188 });
189
190 info!("โ
Request processor spawned for queue: {}", queue_name);
191 Ok(handle)
192 }
193
194 async fn start_response_processor(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
196 let queue_name = format!("rabbitmesh.{}.responses", self.config.service_name);
197 let consumer_tag = format!("{}-responses-{}", self.config.service_name, Uuid::new_v4());
198
199 let consumer = self.connection.create_consumer(&queue_name, &consumer_tag).await?;
200 let rpc = self.rpc.clone();
201 let service_name = self.config.service_name.clone();
202
203 let handle = tokio::spawn(async move {
204 info!("๐ค Response processor started for {}", service_name);
205
206 let mut stream = consumer;
208 while let Some(delivery_result) = stream.next().await {
209 match delivery_result {
210 Ok(delivery) => {
211 let rpc_clone = rpc.clone();
212 tokio::spawn(async move {
213 if let Err(e) = Self::process_response_message(delivery, rpc_clone).await {
214 error!("Error processing response: {}", e);
215 }
216 });
217 }
218 Err(e) => {
219 error!("Error receiving response: {}", e);
220 }
221 }
222 }
223
224 warn!("Response processor stopped for {}", service_name);
225 Ok(())
226 });
227
228 info!("โ
Response processor spawned for queue: {}", queue_name);
229 Ok(handle)
230 }
231
232 async fn process_request_message(
234 delivery: Delivery,
235 rpc: Arc<RpcFramework>,
236 ) -> Result<()> {
237 let message = Message::from_bytes(&delivery.data)?;
238
239 debug!("๐จ Processing request: {} from {}", message.method, message.from);
240
241 let result = rpc.handle_request(message).await;
243
244 match result {
246 Ok(_) => {
247 delivery.ack(lapin::options::BasicAckOptions::default()).await?;
248 debug!("โ
Request processed and acknowledged");
249 }
250 Err(e) => {
251 error!("โ Request processing failed: {}", e);
252 delivery.nack(lapin::options::BasicNackOptions {
253 multiple: false,
254 requeue: true, }).await?;
256 }
257 }
258
259 Ok(())
260 }
261
262 async fn process_response_message(
264 delivery: Delivery,
265 rpc: Arc<RpcFramework>,
266 ) -> Result<()> {
267 let message = Message::from_bytes(&delivery.data)?;
268
269 debug!("๐จ Processing response for correlation_id: {:?}", message.correlation_id);
270
271 let result = rpc.handle_response(message).await;
272
273 delivery.ack(lapin::options::BasicAckOptions::default()).await?;
275
276 if let Err(e) = result {
277 warn!("Response processing warning: {}", e);
278 }
279
280 Ok(())
281 }
282
283 async fn start_cleanup_task(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
285 let rpc = self.rpc.clone();
286 let service_name = self.config.service_name.clone();
287
288 let handle = tokio::spawn(async move {
289 info!("๐งน Cleanup task started for {}", service_name);
290
291 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
292
293 loop {
294 interval.tick().await;
295 rpc.cleanup_expired_calls().await;
296 }
297 });
298
299 Ok(handle)
300 }
301
302 async fn start_health_check_task(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
304 let connection = self.connection.clone();
305 let service_name = self.config.service_name.clone();
306 let interval_seconds = self.config.health_check_interval_seconds;
307
308 let handle = tokio::spawn(async move {
309 info!("๐ Health check task started for {}", service_name);
310
311 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(interval_seconds));
312
313 loop {
314 interval.tick().await;
315
316 let stats = connection.get_stats().await;
317 if !stats.is_connected {
318 warn!("โ ๏ธ Service {} lost connection to RabbitMQ", service_name);
319 } else {
321 debug!("๐ Health check OK for {}", service_name);
322 }
323 }
324 });
325
326 Ok(handle)
327 }
328
329 pub fn get_client(&self) -> ServiceClient {
331 ServiceClient::new(self.rpc.clone())
332 }
333
334 pub async fn get_stats(&self) -> ServiceStats {
336 let connection_stats = self.connection.get_stats().await;
337 let rpc_stats = self.rpc.get_stats().await;
338 let status = self.status.read().await.clone();
339
340 ServiceStats {
341 service_name: self.config.service_name.clone(),
342 status,
343 connection_stats,
344 rpc_stats,
345 }
346 }
347
348 pub async fn is_healthy(&self) -> bool {
350 matches!(*self.status.read().await, ServiceStatus::Running)
351 && self.connection.is_connected().await
352 }
353}
354
355#[derive(Debug, Clone)]
357pub struct ServiceClient {
358 rpc: Arc<RpcFramework>,
359}
360
361impl ServiceClient {
362 pub fn new(rpc: Arc<RpcFramework>) -> Self {
363 Self { rpc }
364 }
365
366 pub async fn call_service(
368 &self,
369 target_service: impl Into<String>,
370 method: impl Into<String>,
371 params: impl serde::Serialize,
372 ) -> Result<crate::message::RpcResponse> {
373 self.rpc.call_service(target_service, method, params).await
374 }
375
376 pub async fn call_service_with_timeout(
378 &self,
379 target_service: impl Into<String>,
380 method: impl Into<String>,
381 params: impl serde::Serialize,
382 timeout: tokio::time::Duration,
383 ) -> Result<crate::message::RpcResponse> {
384 self.rpc.call_service_with_timeout(target_service, method, params, timeout).await
385 }
386}
387
388#[derive(Debug, Clone)]
390pub struct ServiceStats {
391 pub service_name: String,
392 pub status: ServiceStatus,
393 pub connection_stats: crate::connection::ConnectionStats,
394 pub rpc_stats: crate::rpc::RpcStats,
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 use crate::message::RpcResponse;
401
402 #[tokio::test]
403 async fn test_service_creation() {
404 let service = MicroService::new_simple("test-service", "amqp://localhost:5672").await.unwrap();
405 assert!(matches!(*service.status.read().await, ServiceStatus::Starting));
406 }
407
408 #[tokio::test]
409 async fn test_handler_registration() {
410 let service = MicroService::new_simple("test-service", "amqp://localhost:5672").await.unwrap();
411
412 service.register_function("test_method", |_msg| async {
413 Ok(RpcResponse::success("test result", 10).unwrap())
414 }).await;
415
416 let stats = service.get_stats().await;
417 assert_eq!(stats.rpc_stats.registered_handlers, 1);
418 }
419}