Skip to main content

atomr_patterns/acl/
mod.rs

1//! Anti-Corruption Layer pattern.
2//!
3//! Translates events / commands between two bounded contexts. The user
4//! provides a [`Translator`] mapping `External -> Option<Internal>`
5//! (returning `None` drops the message).
6//!
7//! v1 implementation: a tokio task that reads from an
8//! [`tokio::sync::mpsc::UnboundedReceiver<External>`] input, applies
9//! the translator, and pushes survivors to an output
10//! [`tokio::sync::mpsc::UnboundedSender<Internal>`].
11
12use std::marker::PhantomData;
13use std::sync::Arc;
14
15use async_trait::async_trait;
16use atomr_core::actor::ActorSystem;
17use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
18
19use crate::topology::Topology;
20use crate::PatternError;
21
22/// Translate a value from one bounded context's vocabulary to
23/// another's. Return `None` to drop the value.
24pub trait Translator: Send + Sync + 'static {
25    type External: Send + 'static;
26    type Internal: Send + 'static;
27    fn translate(&self, ext: Self::External) -> Option<Self::Internal>;
28}
29
30/// Public handle to the ACL pattern.
31pub struct AntiCorruption<X, I>(PhantomData<(X, I)>);
32
33impl<X: Send + 'static, I: Send + 'static> AntiCorruption<X, I> {
34    pub fn builder<T>(translator: T) -> AclBuilder<T>
35    where
36        T: Translator<External = X, Internal = I>,
37    {
38        AclBuilder { name: None, translator: Arc::new(translator) }
39    }
40}
41
42pub struct AclBuilder<T: Translator> {
43    name: Option<String>,
44    translator: Arc<T>,
45}
46
47impl<T: Translator> AclBuilder<T> {
48    pub fn name(mut self, n: impl Into<String>) -> Self {
49        self.name = Some(n.into());
50        self
51    }
52
53    pub fn build(self) -> AclTopology<T> {
54        AclTopology { name: self.name.unwrap_or_else(|| "acl".into()), translator: self.translator }
55    }
56}
57
58pub struct AclTopology<T: Translator> {
59    #[allow(dead_code)]
60    name: String,
61    translator: Arc<T>,
62}
63
64/// Handles handed back after [`Topology::materialize`].
65pub struct AclHandles<X, I> {
66    pub input: UnboundedSender<X>,
67    pub output: UnboundedReceiver<I>,
68}
69
70#[async_trait]
71impl<T: Translator> Topology for AclTopology<T> {
72    type Handles = AclHandles<T::External, T::Internal>;
73
74    async fn materialize(self, _system: &ActorSystem) -> Result<Self::Handles, PatternError<()>> {
75        let (in_tx, mut in_rx) = unbounded_channel::<T::External>();
76        let (out_tx, out_rx) = unbounded_channel::<T::Internal>();
77        let translator = self.translator.clone();
78        tokio::spawn(async move {
79            while let Some(ext) = in_rx.recv().await {
80                if let Some(int) = translator.translate(ext) {
81                    if out_tx.send(int).is_err() {
82                        break;
83                    }
84                }
85            }
86        });
87        Ok(AclHandles { input: in_tx, output: out_rx })
88    }
89}