use async_trait::async_trait;
use bytes::Bytes;
use coreon_core::{
message::{Body, Message},
uri::CamelUri,
CamelError, Component, Consumer, Endpoint, Exchange, Processor, Producer, Result,
};
use dashmap::DashMap;
use futures::StreamExt;
use lapin::{
options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
QueueDeclareOptions,
},
types::FieldTable,
BasicProperties, Connection, ConnectionProperties,
};
use std::sync::Arc;
use tokio::{sync::Mutex, task::JoinHandle};
use tracing::{debug, warn};
pub struct AmqpComponent {
endpoints: DashMap<String, Arc<AmqpEndpoint>>,
}
impl AmqpComponent {
pub fn new() -> Arc<Self> {
Arc::new(Self {
endpoints: DashMap::new(),
})
}
}
impl Default for AmqpComponent {
fn default() -> Self {
Self {
endpoints: DashMap::new(),
}
}
}
#[async_trait]
impl Component for AmqpComponent {
fn scheme(&self) -> &str {
"amqp"
}
async fn create_endpoint(&self, uri: &CamelUri) -> Result<Arc<dyn Endpoint>> {
let key = uri.as_str().to_owned();
if let Some(ep) = self.endpoints.get(&key) {
return Ok(ep.clone());
}
let url = uri
.get_param("url")
.ok_or_else(|| CamelError::InvalidUri {
uri: key.clone(),
reason: "missing 'url' param".into(),
})?
.to_owned();
let exchange = uri.get_param("exchange").unwrap_or("").to_owned();
let prefetch: u16 = uri
.get_param("prefetch")
.map(str::parse::<u16>)
.transpose()
.map_err(|e| CamelError::InvalidUri {
uri: key.clone(),
reason: format!("prefetch: {e}"),
})?
.unwrap_or(32);
let ep = Arc::new(AmqpEndpoint {
uri: key.clone(),
queue: uri.path.clone(),
url,
exchange,
prefetch,
});
self.endpoints.insert(key, ep.clone());
Ok(ep)
}
}
pub struct AmqpEndpoint {
uri: String,
queue: String,
url: String,
exchange: String,
prefetch: u16,
}
async fn open_channel(url: &str) -> Result<lapin::Channel> {
let conn = Connection::connect(url, ConnectionProperties::default())
.await
.map_err(|e| CamelError::Endpoint(format!("amqp connect: {e}")))?;
conn.create_channel()
.await
.map_err(|e| CamelError::Endpoint(format!("amqp create_channel: {e}")))
}
#[async_trait]
impl Endpoint for AmqpEndpoint {
fn uri(&self) -> &str {
&self.uri
}
async fn create_producer(&self) -> Result<Arc<dyn Producer>> {
let channel = open_channel(&self.url).await?;
Ok(Arc::new(AmqpProducer {
channel,
exchange: self.exchange.clone(),
routing_key: self.queue.clone(),
}))
}
async fn create_consumer(&self, pipeline: Arc<dyn Processor>) -> Result<Arc<dyn Consumer>> {
let channel = open_channel(&self.url).await?;
channel
.queue_declare(
&self.queue,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.map_err(|e| CamelError::Endpoint(format!("amqp queue_declare: {e}")))?;
channel
.basic_qos(self.prefetch, lapin::options::BasicQosOptions::default())
.await
.map_err(|e| CamelError::Endpoint(format!("amqp basic_qos: {e}")))?;
Ok(Arc::new(AmqpConsumer {
channel,
queue: self.queue.clone(),
pipeline,
task: Mutex::new(None),
}))
}
}
pub struct AmqpProducer {
channel: lapin::Channel,
exchange: String,
routing_key: String,
}
#[async_trait]
impl Producer for AmqpProducer {
async fn send(&self, exchange: &mut Exchange) -> Result<()> {
let payload = match &exchange.r#in.body {
Body::Empty => Bytes::new(),
Body::Text(s) => Bytes::from(s.clone().into_bytes()),
Body::Bytes(b) => b.clone(),
Body::Custom(_) => {
return Err(CamelError::Processor(
"amqp: cannot serialize Body::Custom".into(),
))
}
};
self.channel
.basic_publish(
&self.exchange,
&self.routing_key,
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
.map_err(|e| CamelError::Processor(format!("amqp publish: {e}")))?
.await
.map_err(|e| CamelError::Processor(format!("amqp confirm: {e}")))?;
debug!(
exchange = %self.exchange,
routing_key = %self.routing_key,
bytes = payload.len(),
"amqp: published"
);
Ok(())
}
}
pub struct AmqpConsumer {
channel: lapin::Channel,
queue: String,
pipeline: Arc<dyn Processor>,
task: Mutex<Option<JoinHandle<()>>>,
}
#[async_trait]
impl Consumer for AmqpConsumer {
async fn start(&self) -> Result<()> {
let mut stream = self
.channel
.basic_consume(
&self.queue,
"camel-rs",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.map_err(|e| CamelError::Endpoint(format!("amqp basic_consume: {e}")))?;
let pipeline = self.pipeline.clone();
let queue = self.queue.clone();
let handle = tokio::spawn(async move {
while let Some(delivery_res) = stream.next().await {
match delivery_res {
Err(e) => warn!(queue = %queue, error = %e, "amqp: delivery error"),
Ok(delivery) => {
let msg = Message {
headers: std::iter::once((
"CamelAmqpQueue".to_owned(),
queue.clone(),
))
.collect(),
body: Body::Bytes(Bytes::from(delivery.data.clone())),
};
let mut ex = Exchange {
r#in: msg,
..Exchange::default()
};
match pipeline.process(&mut ex).await {
Ok(()) => {
if let Err(e) =
delivery.ack(BasicAckOptions::default()).await
{
warn!(queue = %queue, error = %e, "amqp: ack failed");
}
}
Err(e) => {
warn!(queue = %queue, error = %e, "amqp: pipeline failed, NACK");
let _ = delivery
.nack(BasicNackOptions {
multiple: false,
requeue: false,
})
.await;
}
}
}
}
}
});
*self.task.lock().await = Some(handle);
Ok(())
}
async fn stop(&self) -> Result<()> {
if let Some(h) = self.task.lock().await.take() {
h.abort();
}
Ok(())
}
}