1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
//! Pipe behavior modifiers

mod resend;
use dyn_clone::DynClone;
pub use resend::{ReceiverConfirm, SenderConfirm};

mod ordering;
pub use ordering::ReceiverOrdering;

mod handshake;
pub use handshake::HandshakeInit;

use crate::{
    protocols::pipe::{internal::InternalCmd, PipeMessage},
    Context,
};
use ockam_core::compat::{boxed::Box, vec::Vec};
use ockam_core::{async_trait, Address, Result, Route};

/// Define the behavior of a pipe
#[async_trait]
pub trait BehaviorHook: DynClone + Send {
    /// This function MUST be run for every incoming user message
    ///
    /// * Access to mutable self
    /// * Access to own internal address
    /// * Access to peer internal route
    /// * Access to mutable context
    /// * Access to incoming or outgoing PipeMessage
    async fn on_external(
        &mut self,
        this: Address,
        peer: Route,
        ctx: &mut Context,
        msg: &PipeMessage,
    ) -> Result<PipeModifier>;

    /// This function MUST be run for every incoming internal message
    ///
    /// An internal message is one sent to the private API address of
    /// this worker to issue commands, such as re-sending payloads
    async fn on_internal(
        &mut self,
        this: Address,
        peer: Route,
        ctx: &mut Context,
        msg: &InternalCmd,
    ) -> Result<()>;
}

dyn_clone::clone_trait_object!(BehaviorHook);

/// Indicate to the pipe whether to modify its default behaviour
#[derive(Clone, Copy, Debug)]
pub enum PipeModifier {
    /// No behaviour modification required
    None,
    /// Drop the currently handled message
    Drop,
}

/// Structure to combine a set of pipe BehaviorHooks
pub struct PipeBehavior {
    hooks: Vec<Box<dyn BehaviorHook + Send + Sync + 'static>>,
}

impl Clone for PipeBehavior {
    fn clone(&self) -> Self {
        Self {
            hooks: self
                .hooks
                .iter()
                .map(|b| *dyn_clone::clone_box(&*b))
                .collect(),
        }
    }
}

impl<T: BehaviorHook + Send + Sync + 'static> From<T> for PipeBehavior {
    fn from(hook: T) -> Self {
        Self::with(hook)
    }
}

impl PipeBehavior {
    pub fn with<T: BehaviorHook + Send + Sync + 'static>(t: T) -> Self {
        Self {
            hooks: vec![Box::new(t)],
        }
    }

    pub fn empty() -> Self {
        Self { hooks: vec![] }
    }

    /// Attach a new BehaviorHook in a chainable manner
    pub fn attach<T: BehaviorHook + Send + Sync + 'static>(mut self, t: T) -> Self {
        self.insert(t);
        self
    }

    /// Insert a new BehaviorHook in place
    pub fn insert<T: BehaviorHook + Send + Sync + 'static>(&mut self, t: T) {
        self.hooks.push(Box::new(t));
    }

    /// Run all external message hooks associated with this pipe
    pub async fn external_all(
        &mut self,
        this: Address,
        peer: Route,
        ctx: &mut Context,
        msg: &PipeMessage,
    ) -> Result<PipeModifier> {
        let mut acc = Vec::with_capacity(self.hooks.len());
        for hook in self.hooks.iter_mut() {
            acc.push(
                hook.on_external(this.clone(), peer.clone(), ctx, msg)
                    .await?,
            );
        }

        // Propagate any drop behaviour
        use PipeModifier as Pm;
        Ok(acc
            .into_iter()
            .fold(Pm::None, |acc, _mod| match (acc, _mod) {
                (Pm::None, Pm::None) => Pm::None,
                (_, Pm::Drop) => Pm::Drop,
                (Pm::Drop, _) => Pm::Drop,
            }))
    }

    /// Run all internal message hooks associated with this pipe
    pub async fn internal_all(
        &mut self,
        this: Address,
        peer: Route,
        ctx: &mut Context,
        msg: &InternalCmd,
    ) -> Result<()> {
        for hook in self.hooks.iter_mut() {
            hook.on_internal(this.clone(), peer.clone(), ctx, msg)
                .await?;
        }

        Ok(())
    }
}