rabbitmesh/
service.rs

1use std::sync::Arc;
2use tokio::sync::RwLock;
3use lapin::message::Delivery;
4use tracing::{info, error, debug, warn};
5use uuid::Uuid;
6use futures_util::StreamExt;
7
8use crate::connection::{ConnectionManager, ConnectionConfig};
9use crate::rpc::{RpcFramework, RpcHandler};
10use crate::message::Message;
11use crate::error::Result;
12
13/// Configuration for microservice
14#[derive(Debug, Clone)]
15pub struct ServiceConfig {
16    /// Service name (used for queue naming)
17    pub service_name: String,
18    /// AMQP connection configuration
19    pub connection: ConnectionConfig,
20    /// Maximum concurrent message processing
21    pub max_concurrent_messages: usize,
22    /// Health check interval in seconds
23    pub health_check_interval_seconds: u64,
24}
25
26impl ServiceConfig {
27    /// Create new service configuration
28    pub fn new(service_name: impl Into<String>, amqp_url: impl Into<String>) -> Self {
29        let mut connection_config = ConnectionConfig::default();
30        connection_config.url = amqp_url.into();
31        
32        Self {
33            service_name: service_name.into(),
34            connection: connection_config,
35            max_concurrent_messages: 100,
36            health_check_interval_seconds: 30,
37        }
38    }
39}
40
41/// Core microservice that processes messages via RabbitMQ
42/// 
43/// This is the heart of the framework - it:
44/// - Connects to RabbitMQ (no ports exposed)
45/// - Processes messages concurrently with tokio-stream
46/// - Handles RPC requests and responses
47/// - Provides health checking and monitoring
48#[derive(Debug)]
49pub struct MicroService {
50    /// Service configuration
51    config: ServiceConfig,
52    /// AMQP connection manager
53    connection: Arc<ConnectionManager>,
54    /// RPC framework for handling requests
55    rpc: Arc<RpcFramework>,
56    /// Service status
57    status: Arc<RwLock<ServiceStatus>>,
58}
59
60/// Service operational status
61#[derive(Debug, Clone)]
62pub enum ServiceStatus {
63    /// Service is starting up
64    Starting,
65    /// Service is running and processing messages
66    Running,
67    /// Service is shutting down
68    ShuttingDown,
69    /// Service has stopped
70    Stopped,
71    /// Service encountered an error
72    Error(String),
73}
74
75impl MicroService {
76    /// Create a new microservice
77    pub async fn new(config: ServiceConfig) -> Result<Self> {
78        info!("๐Ÿš€ Creating microservice: {}", config.service_name);
79        
80        let connection = Arc::new(ConnectionManager::with_config(config.connection.clone()));
81        let rpc = Arc::new(RpcFramework::new(connection.clone(), config.service_name.clone()));
82        
83        Ok(Self {
84            config,
85            connection,
86            rpc,
87            status: Arc::new(RwLock::new(ServiceStatus::Starting)),
88        })
89    }
90
91    /// Convenience constructor with just service name and AMQP URL
92    pub async fn new_simple(service_name: impl Into<String>, amqp_url: impl Into<String>) -> Result<Self> {
93        let config = ServiceConfig::new(service_name, amqp_url);
94        Self::new(config).await
95    }
96
97    /// Register an RPC handler for a specific method
98    /// 
99    /// This is how you add business logic to your service
100    pub async fn register_handler<H>(&self, method: impl Into<String>, handler: H)
101    where
102        H: RpcHandler + 'static,
103    {
104        self.rpc.register_handler(method, handler).await;
105    }
106
107    /// Register a function-based handler (more convenient)
108    pub async fn register_function<F, Fut>(&self, method: impl Into<String>, handler: F)
109    where
110        F: Fn(Message) -> Fut + Send + Sync + 'static,
111        Fut: std::future::Future<Output = Result<crate::message::RpcResponse>> + Send + 'static,
112    {
113        self.rpc.register_function(method, handler).await;
114    }
115
116    /// Start the microservice (this is non-blocking and processes messages concurrently)
117    pub async fn start(&self) -> Result<()> {
118        info!("๐Ÿ Starting microservice: {}", self.config.service_name);
119        
120        // Update status
121        *self.status.write().await = ServiceStatus::Running;
122        
123        // Connect to RabbitMQ
124        self.connection.connect().await?;
125        info!("โœ… Connected to RabbitMQ");
126
127        // Set up queues
128        let request_queue = format!("rabbitmesh.{}", self.config.service_name);
129        let response_queue = format!("rabbitmesh.{}.responses", self.config.service_name);
130        
131        self.connection.declare_queue(&request_queue).await?;
132        self.connection.declare_queue(&response_queue).await?;
133        info!("โœ… Declared queues: {}, {}", request_queue, response_queue);
134
135        // Start message processors concurrently
136        let request_processor = self.start_request_processor().await?;
137        let response_processor = self.start_response_processor().await?;
138        let cleanup_task = self.start_cleanup_task().await?;
139        let health_check_task = self.start_health_check_task().await?;
140
141        info!("๐ŸŽฏ Microservice {} is running and ready to process messages", self.config.service_name);
142        info!("๐Ÿ“Š Max concurrent messages: {}", self.config.max_concurrent_messages);
143
144        // Wait for all tasks to complete (they run indefinitely)
145        tokio::try_join!(
146            request_processor,
147            response_processor, 
148            cleanup_task,
149            health_check_task,
150        )?;
151
152        Ok(())
153    }
154
155    /// Start processing incoming RPC requests
156    async fn start_request_processor(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
157        let queue_name = format!("rabbitmesh.{}", self.config.service_name);
158        let consumer_tag = format!("{}-requests-{}", self.config.service_name, Uuid::new_v4());
159        
160        let consumer = self.connection.create_consumer(&queue_name, &consumer_tag).await?;
161        let rpc = self.rpc.clone();
162        let service_name = self.config.service_name.clone();
163        let max_concurrent = self.config.max_concurrent_messages;
164
165        let handle = tokio::spawn(async move {
166            info!("๐Ÿ“ฅ Request processor started for {}", service_name);
167            
168            // Process messages from consumer stream
169            let mut stream = consumer;
170            while let Some(delivery_result) = stream.next().await {
171                match delivery_result {
172                    Ok(delivery) => {
173                        let rpc_clone = rpc.clone();
174                        tokio::spawn(async move {
175                            if let Err(e) = Self::process_request_message(delivery, rpc_clone).await {
176                                error!("Error processing request: {}", e);
177                            }
178                        });
179                    }
180                    Err(e) => {
181                        error!("Error receiving message: {}", e);
182                    }
183                }
184            }
185
186            warn!("Request processor stopped for {}", service_name);
187            Ok(())
188        });
189
190        info!("โœ… Request processor spawned for queue: {}", queue_name);
191        Ok(handle)
192    }
193
194    /// Start processing incoming RPC responses
195    async fn start_response_processor(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
196        let queue_name = format!("rabbitmesh.{}.responses", self.config.service_name);
197        let consumer_tag = format!("{}-responses-{}", self.config.service_name, Uuid::new_v4());
198        
199        let consumer = self.connection.create_consumer(&queue_name, &consumer_tag).await?;
200        let rpc = self.rpc.clone();
201        let service_name = self.config.service_name.clone();
202
203        let handle = tokio::spawn(async move {
204            info!("๐Ÿ“ค Response processor started for {}", service_name);
205            
206            // Process messages from consumer stream
207            let mut stream = consumer;
208            while let Some(delivery_result) = stream.next().await {
209                match delivery_result {
210                    Ok(delivery) => {
211                        let rpc_clone = rpc.clone();
212                        tokio::spawn(async move {
213                            if let Err(e) = Self::process_response_message(delivery, rpc_clone).await {
214                                error!("Error processing response: {}", e);
215                            }
216                        });
217                    }
218                    Err(e) => {
219                        error!("Error receiving response: {}", e);
220                    }
221                }
222            }
223            
224            warn!("Response processor stopped for {}", service_name);
225            Ok(())
226        });
227
228        info!("โœ… Response processor spawned for queue: {}", queue_name);
229        Ok(handle)
230    }
231
232    /// Process a single request message
233    async fn process_request_message(
234        delivery: Delivery,
235        rpc: Arc<RpcFramework>,
236    ) -> Result<()> {
237        let message = Message::from_bytes(&delivery.data)?;
238        
239        debug!("๐Ÿ“จ Processing request: {} from {}", message.method, message.from);
240        
241        // Handle the request (this spawns async task, never blocks)
242        let result = rpc.handle_request(message).await;
243        
244        // Acknowledge message
245        match result {
246            Ok(_) => {
247                delivery.ack(lapin::options::BasicAckOptions::default()).await?;
248                debug!("โœ… Request processed and acknowledged");
249            }
250            Err(e) => {
251                error!("โŒ Request processing failed: {}", e);
252                delivery.nack(lapin::options::BasicNackOptions {
253                    multiple: false,
254                    requeue: true, // Requeue for retry
255                }).await?;
256            }
257        }
258
259        Ok(())
260    }
261
262    /// Process a single response message
263    async fn process_response_message(
264        delivery: Delivery,
265        rpc: Arc<RpcFramework>,
266    ) -> Result<()> {
267        let message = Message::from_bytes(&delivery.data)?;
268        
269        debug!("๐Ÿ“จ Processing response for correlation_id: {:?}", message.correlation_id);
270        
271        let result = rpc.handle_response(message).await;
272        
273        // Always acknowledge responses
274        delivery.ack(lapin::options::BasicAckOptions::default()).await?;
275        
276        if let Err(e) = result {
277            warn!("Response processing warning: {}", e);
278        }
279
280        Ok(())
281    }
282
283    /// Start cleanup task for expired RPC calls
284    async fn start_cleanup_task(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
285        let rpc = self.rpc.clone();
286        let service_name = self.config.service_name.clone();
287
288        let handle = tokio::spawn(async move {
289            info!("๐Ÿงน Cleanup task started for {}", service_name);
290            
291            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
292            
293            loop {
294                interval.tick().await;
295                rpc.cleanup_expired_calls().await;
296            }
297        });
298
299        Ok(handle)
300    }
301
302    /// Start health check task
303    async fn start_health_check_task(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
304        let connection = self.connection.clone();
305        let service_name = self.config.service_name.clone();
306        let interval_seconds = self.config.health_check_interval_seconds;
307
308        let handle = tokio::spawn(async move {
309            info!("๐Ÿ’“ Health check task started for {}", service_name);
310            
311            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(interval_seconds));
312            
313            loop {
314                interval.tick().await;
315                
316                let stats = connection.get_stats().await;
317                if !stats.is_connected {
318                    warn!("โš ๏ธ  Service {} lost connection to RabbitMQ", service_name);
319                    // Connection manager will handle reconnection automatically
320                } else {
321                    debug!("๐Ÿ’“ Health check OK for {}", service_name);
322                }
323            }
324        });
325
326        Ok(handle)
327    }
328
329    /// Get RPC client for calling other services
330    pub fn get_client(&self) -> ServiceClient {
331        ServiceClient::new(self.rpc.clone())
332    }
333
334    /// Get service statistics
335    pub async fn get_stats(&self) -> ServiceStats {
336        let connection_stats = self.connection.get_stats().await;
337        let rpc_stats = self.rpc.get_stats().await;
338        let status = self.status.read().await.clone();
339
340        ServiceStats {
341            service_name: self.config.service_name.clone(),
342            status,
343            connection_stats,
344            rpc_stats,
345        }
346    }
347
348    /// Check if service is healthy
349    pub async fn is_healthy(&self) -> bool {
350        matches!(*self.status.read().await, ServiceStatus::Running) 
351            && self.connection.is_connected().await
352    }
353}
354
355/// Client for making RPC calls to other services
356#[derive(Debug, Clone)]
357pub struct ServiceClient {
358    rpc: Arc<RpcFramework>,
359}
360
361impl ServiceClient {
362    pub fn new(rpc: Arc<RpcFramework>) -> Self {
363        Self { rpc }
364    }
365
366    /// Call another service method
367    pub async fn call_service(
368        &self,
369        target_service: impl Into<String>,
370        method: impl Into<String>,
371        params: impl serde::Serialize,
372    ) -> Result<crate::message::RpcResponse> {
373        self.rpc.call_service(target_service, method, params).await
374    }
375
376    /// Call with custom timeout
377    pub async fn call_service_with_timeout(
378        &self,
379        target_service: impl Into<String>,
380        method: impl Into<String>,
381        params: impl serde::Serialize,
382        timeout: tokio::time::Duration,
383    ) -> Result<crate::message::RpcResponse> {
384        self.rpc.call_service_with_timeout(target_service, method, params, timeout).await
385    }
386}
387
388/// Service statistics for monitoring
389#[derive(Debug, Clone)]
390pub struct ServiceStats {
391    pub service_name: String,
392    pub status: ServiceStatus,
393    pub connection_stats: crate::connection::ConnectionStats,
394    pub rpc_stats: crate::rpc::RpcStats,
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use crate::message::RpcResponse;
401
402    #[tokio::test]
403    async fn test_service_creation() {
404        let service = MicroService::new_simple("test-service", "amqp://localhost:5672").await.unwrap();
405        assert!(matches!(*service.status.read().await, ServiceStatus::Starting));
406    }
407
408    #[tokio::test] 
409    async fn test_handler_registration() {
410        let service = MicroService::new_simple("test-service", "amqp://localhost:5672").await.unwrap();
411        
412        service.register_function("test_method", |_msg| async {
413            Ok(RpcResponse::success("test result", 10).unwrap())
414        }).await;
415
416        let stats = service.get_stats().await;
417        assert_eq!(stats.rpc_stats.registered_handlers, 1);
418    }
419}