use std::sync::Arc;
use async_trait::async_trait;
use mockforge_core::config::AmqpConfig;
use mockforge_core::protocol_abstraction::Protocol;
use mockforge_core::protocol_server::MockProtocolServer;
use crate::broker::AmqpBroker;
use crate::spec_registry::AmqpSpecRegistry;
pub struct AmqpMockServer {
config: AmqpConfig,
}
impl AmqpMockServer {
pub fn new(config: AmqpConfig) -> Self {
Self { config }
}
}
#[async_trait]
impl MockProtocolServer for AmqpMockServer {
fn protocol(&self) -> Protocol {
Protocol::Amqp
}
async fn start(
&self,
mut shutdown: tokio::sync::watch::Receiver<()>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let spec_registry = Arc::new(AmqpSpecRegistry::new(self.config.clone()).await?);
let broker = AmqpBroker::new(self.config.clone(), spec_registry);
tokio::select! {
result = broker.start() => {
result.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
}
_ = shutdown.changed() => {
tracing::info!("Shutting down AMQP broker on port {}", self.config.port);
Ok(())
}
}
}
fn port(&self) -> u16 {
self.config.port
}
fn description(&self) -> String {
format!("AMQP broker on {}:{}", self.config.host, self.config.port)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_amqp_mock_server_new() {
let config = AmqpConfig::default();
let server = AmqpMockServer::new(config.clone());
assert_eq!(server.port(), config.port);
}
#[test]
fn test_amqp_mock_server_protocol() {
let server = AmqpMockServer::new(AmqpConfig::default());
assert_eq!(server.protocol(), Protocol::Amqp);
}
#[test]
fn test_amqp_mock_server_description() {
let config = AmqpConfig {
host: "127.0.0.1".to_string(),
port: 5672,
..Default::default()
};
let server = AmqpMockServer::new(config);
assert_eq!(server.description(), "AMQP broker on 127.0.0.1:5672");
}
}