use actix::prelude::{Recipient, SendError};
use crate::ids::ErrorId;
use crate::peering::codec::MessageCodec;
use crate::peering::taberna::Taberna;
use crate::peering::BlobReceiver;
pub struct ActixTabernaDelivery<M>
where
M: Send + 'static,
{
pub message: M,
pub blob_receiver: Option<BlobReceiver>,
}
impl<M> actix::Message for ActixTabernaDelivery<M>
where
M: Send + 'static,
{
type Result = ();
}
pub struct ActixTaberna {
bridge: tokio::task::JoinHandle<()>,
}
impl ActixTaberna {
pub(crate) fn new<Codec>(
taberna: Taberna<Codec>,
recipient: Recipient<ActixTabernaDelivery<Codec::AppMessage>>,
) -> Self
where
Codec: MessageCodec + 'static,
Codec::AppMessage: Send + Sync + 'static,
{
let bridge = actix::spawn(run_bridge(taberna, recipient));
Self { bridge }
}
}
impl Drop for ActixTaberna {
fn drop(&mut self) {
self.bridge.abort();
}
}
async fn run_bridge<Codec>(
taberna: Taberna<Codec>,
recipient: Recipient<ActixTabernaDelivery<Codec::AppMessage>>,
) where
Codec: MessageCodec + 'static,
Codec::AppMessage: Send + Sync + 'static,
{
loop {
let request = match taberna.next(None).await {
Ok(request) => request,
Err(err) if err.kind == ErrorId::ReceiveTimeout => continue,
Err(_) => break,
};
let parts = request.into_parts();
let delivery = ActixTabernaDelivery {
message: parts.message,
blob_receiver: parts.blob_receiver,
};
match recipient.try_send(delivery) {
Ok(()) => {
parts.completion.accept();
}
Err(SendError::Full(_delivery)) => {
parts.completion.busy();
}
Err(SendError::Closed(_delivery)) => {
parts.completion.taberna_shutdown();
break;
}
}
}
}
#[cfg(test)]
#[path = "tests/actix_adapter.rs"]
mod tests;