use std::marker::PhantomData;
use std::sync::Arc;
use async_trait::async_trait;
use atomr_core::actor::ActorSystem;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::topology::Topology;
use crate::PatternError;
pub trait Translator: Send + Sync + 'static {
type External: Send + 'static;
type Internal: Send + 'static;
fn translate(&self, ext: Self::External) -> Option<Self::Internal>;
}
pub struct AntiCorruption<X, I>(PhantomData<(X, I)>);
impl<X: Send + 'static, I: Send + 'static> AntiCorruption<X, I> {
pub fn builder<T>(translator: T) -> AclBuilder<T>
where
T: Translator<External = X, Internal = I>,
{
AclBuilder { name: None, translator: Arc::new(translator) }
}
}
pub struct AclBuilder<T: Translator> {
name: Option<String>,
translator: Arc<T>,
}
impl<T: Translator> AclBuilder<T> {
pub fn name(mut self, n: impl Into<String>) -> Self {
self.name = Some(n.into());
self
}
pub fn build(self) -> AclTopology<T> {
AclTopology { name: self.name.unwrap_or_else(|| "acl".into()), translator: self.translator }
}
}
pub struct AclTopology<T: Translator> {
#[allow(dead_code)]
name: String,
translator: Arc<T>,
}
pub struct AclHandles<X, I> {
pub input: UnboundedSender<X>,
pub output: UnboundedReceiver<I>,
}
#[async_trait]
impl<T: Translator> Topology for AclTopology<T> {
type Handles = AclHandles<T::External, T::Internal>;
async fn materialize(self, _system: &ActorSystem) -> Result<Self::Handles, PatternError<()>> {
let (in_tx, mut in_rx) = unbounded_channel::<T::External>();
let (out_tx, out_rx) = unbounded_channel::<T::Internal>();
let translator = self.translator.clone();
tokio::spawn(async move {
while let Some(ext) = in_rx.recv().await {
if let Some(int) = translator.translate(ext) {
if out_tx.send(int).is_err() {
break;
}
}
}
});
Ok(AclHandles { input: in_tx, output: out_rx })
}
}