use std::sync::Arc;
use tokio::sync::RwLock;
use lapin::message::Delivery;
use tracing::{info, error, debug, warn};
use uuid::Uuid;
use futures_util::StreamExt;
use crate::connection::{ConnectionManager, ConnectionConfig};
use crate::rpc::{RpcFramework, RpcHandler};
use crate::message::Message;
use crate::error::Result;
#[derive(Debug, Clone)]
pub struct ServiceConfig {
pub service_name: String,
pub connection: ConnectionConfig,
pub max_concurrent_messages: usize,
pub health_check_interval_seconds: u64,
}
impl ServiceConfig {
pub fn new(service_name: impl Into<String>, amqp_url: impl Into<String>) -> Self {
let mut connection_config = ConnectionConfig::default();
connection_config.url = amqp_url.into();
Self {
service_name: service_name.into(),
connection: connection_config,
max_concurrent_messages: 100,
health_check_interval_seconds: 30,
}
}
}
#[derive(Debug)]
pub struct MicroService {
config: ServiceConfig,
connection: Arc<ConnectionManager>,
rpc: Arc<RpcFramework>,
status: Arc<RwLock<ServiceStatus>>,
}
#[derive(Debug, Clone)]
pub enum ServiceStatus {
Starting,
Running,
ShuttingDown,
Stopped,
Error(String),
}
impl MicroService {
pub async fn new(config: ServiceConfig) -> Result<Self> {
info!("🚀 Creating microservice: {}", config.service_name);
let connection = Arc::new(ConnectionManager::with_config(config.connection.clone()));
let rpc = Arc::new(RpcFramework::new(connection.clone(), config.service_name.clone()));
Ok(Self {
config,
connection,
rpc,
status: Arc::new(RwLock::new(ServiceStatus::Starting)),
})
}
pub async fn new_simple(service_name: impl Into<String>, amqp_url: impl Into<String>) -> Result<Self> {
let config = ServiceConfig::new(service_name, amqp_url);
Self::new(config).await
}
pub async fn register_handler<H>(&self, method: impl Into<String>, handler: H)
where
H: RpcHandler + 'static,
{
self.rpc.register_handler(method, handler).await;
}
pub async fn register_function<F, Fut>(&self, method: impl Into<String>, handler: F)
where
F: Fn(Message) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<crate::message::RpcResponse>> + Send + 'static,
{
self.rpc.register_function(method, handler).await;
}
pub async fn start(&self) -> Result<()> {
info!("🏁 Starting microservice: {}", self.config.service_name);
*self.status.write().await = ServiceStatus::Running;
self.connection.connect().await?;
info!("✅ Connected to RabbitMQ");
let request_queue = format!("rabbitmesh.{}", self.config.service_name);
let response_queue = format!("rabbitmesh.{}.responses", self.config.service_name);
self.connection.declare_queue(&request_queue).await?;
self.connection.declare_queue(&response_queue).await?;
info!("✅ Declared queues: {}, {}", request_queue, response_queue);
let request_processor = self.start_request_processor().await?;
let response_processor = self.start_response_processor().await?;
let cleanup_task = self.start_cleanup_task().await?;
let health_check_task = self.start_health_check_task().await?;
info!("🎯 Microservice {} is running and ready to process messages", self.config.service_name);
info!("📊 Max concurrent messages: {}", self.config.max_concurrent_messages);
tokio::try_join!(
request_processor,
response_processor,
cleanup_task,
health_check_task,
)?;
Ok(())
}
async fn start_request_processor(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
let queue_name = format!("rabbitmesh.{}", self.config.service_name);
let consumer_tag = format!("{}-requests-{}", self.config.service_name, Uuid::new_v4());
let consumer = self.connection.create_consumer(&queue_name, &consumer_tag).await?;
let rpc = self.rpc.clone();
let service_name = self.config.service_name.clone();
let max_concurrent = self.config.max_concurrent_messages;
let handle = tokio::spawn(async move {
info!("📥 Request processor started for {}", service_name);
let mut stream = consumer;
while let Some(delivery_result) = stream.next().await {
match delivery_result {
Ok(delivery) => {
let rpc_clone = rpc.clone();
tokio::spawn(async move {
if let Err(e) = Self::process_request_message(delivery, rpc_clone).await {
error!("Error processing request: {}", e);
}
});
}
Err(e) => {
error!("Error receiving message: {}", e);
}
}
}
warn!("Request processor stopped for {}", service_name);
Ok(())
});
info!("✅ Request processor spawned for queue: {}", queue_name);
Ok(handle)
}
async fn start_response_processor(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
let queue_name = format!("rabbitmesh.{}.responses", self.config.service_name);
let consumer_tag = format!("{}-responses-{}", self.config.service_name, Uuid::new_v4());
let consumer = self.connection.create_consumer(&queue_name, &consumer_tag).await?;
let rpc = self.rpc.clone();
let service_name = self.config.service_name.clone();
let handle = tokio::spawn(async move {
info!("📤 Response processor started for {}", service_name);
let mut stream = consumer;
while let Some(delivery_result) = stream.next().await {
match delivery_result {
Ok(delivery) => {
let rpc_clone = rpc.clone();
tokio::spawn(async move {
if let Err(e) = Self::process_response_message(delivery, rpc_clone).await {
error!("Error processing response: {}", e);
}
});
}
Err(e) => {
error!("Error receiving response: {}", e);
}
}
}
warn!("Response processor stopped for {}", service_name);
Ok(())
});
info!("✅ Response processor spawned for queue: {}", queue_name);
Ok(handle)
}
async fn process_request_message(
delivery: Delivery,
rpc: Arc<RpcFramework>,
) -> Result<()> {
let message = Message::from_bytes(&delivery.data)?;
debug!("📨 Processing request: {} from {}", message.method, message.from);
let result = rpc.handle_request(message).await;
match result {
Ok(_) => {
delivery.ack(lapin::options::BasicAckOptions::default()).await?;
debug!("✅ Request processed and acknowledged");
}
Err(e) => {
error!("❌ Request processing failed: {}", e);
delivery.nack(lapin::options::BasicNackOptions {
multiple: false,
requeue: true, }).await?;
}
}
Ok(())
}
async fn process_response_message(
delivery: Delivery,
rpc: Arc<RpcFramework>,
) -> Result<()> {
let message = Message::from_bytes(&delivery.data)?;
debug!("📨 Processing response for correlation_id: {:?}", message.correlation_id);
let result = rpc.handle_response(message).await;
delivery.ack(lapin::options::BasicAckOptions::default()).await?;
if let Err(e) = result {
warn!("Response processing warning: {}", e);
}
Ok(())
}
async fn start_cleanup_task(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
let rpc = self.rpc.clone();
let service_name = self.config.service_name.clone();
let handle = tokio::spawn(async move {
info!("🧹 Cleanup task started for {}", service_name);
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
loop {
interval.tick().await;
rpc.cleanup_expired_calls().await;
}
});
Ok(handle)
}
async fn start_health_check_task(&self) -> Result<tokio::task::JoinHandle<Result<()>>> {
let connection = self.connection.clone();
let service_name = self.config.service_name.clone();
let interval_seconds = self.config.health_check_interval_seconds;
let handle = tokio::spawn(async move {
info!("💓 Health check task started for {}", service_name);
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(interval_seconds));
loop {
interval.tick().await;
let stats = connection.get_stats().await;
if !stats.is_connected {
warn!("⚠️ Service {} lost connection to RabbitMQ", service_name);
} else {
debug!("💓 Health check OK for {}", service_name);
}
}
});
Ok(handle)
}
pub fn get_client(&self) -> ServiceClient {
ServiceClient::new(self.rpc.clone())
}
pub async fn get_stats(&self) -> ServiceStats {
let connection_stats = self.connection.get_stats().await;
let rpc_stats = self.rpc.get_stats().await;
let status = self.status.read().await.clone();
ServiceStats {
service_name: self.config.service_name.clone(),
status,
connection_stats,
rpc_stats,
}
}
pub async fn is_healthy(&self) -> bool {
matches!(*self.status.read().await, ServiceStatus::Running)
&& self.connection.is_connected().await
}
}
#[derive(Debug, Clone)]
pub struct ServiceClient {
rpc: Arc<RpcFramework>,
}
impl ServiceClient {
pub fn new(rpc: Arc<RpcFramework>) -> Self {
Self { rpc }
}
pub async fn call_service(
&self,
target_service: impl Into<String>,
method: impl Into<String>,
params: impl serde::Serialize,
) -> Result<crate::message::RpcResponse> {
self.rpc.call_service(target_service, method, params).await
}
pub async fn call_service_with_timeout(
&self,
target_service: impl Into<String>,
method: impl Into<String>,
params: impl serde::Serialize,
timeout: tokio::time::Duration,
) -> Result<crate::message::RpcResponse> {
self.rpc.call_service_with_timeout(target_service, method, params, timeout).await
}
}
#[derive(Debug, Clone)]
pub struct ServiceStats {
pub service_name: String,
pub status: ServiceStatus,
pub connection_stats: crate::connection::ConnectionStats,
pub rpc_stats: crate::rpc::RpcStats,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::RpcResponse;
#[tokio::test]
async fn test_service_creation() {
let service = MicroService::new_simple("test-service", "amqp://localhost:5672").await.unwrap();
assert!(matches!(*service.status.read().await, ServiceStatus::Starting));
}
#[tokio::test]
async fn test_handler_registration() {
let service = MicroService::new_simple("test-service", "amqp://localhost:5672").await.unwrap();
service.register_function("test_method", |_msg| async {
Ok(RpcResponse::success("test result", 10).unwrap())
}).await;
let stats = service.get_stats().await;
assert_eq!(stats.rpc_stats.registered_handlers, 1);
}
}