1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
use crate::channel_types::SmallReceiver;
use crate::relay::CtrlSignal;
use crate::tokio::runtime::Handle;
use crate::{parser, Context};
use ockam_core::{Message, RelayMessage, Result, Routed, Worker};

/// Worker relay machinery
///
/// Every worker in the Ockam runtime needs a certain amount of logic
/// and state attached to the lifecycle of the user's worker code.
/// The relay manages this state and runtime behaviour.
pub struct WorkerRelay<W> {
    worker: W,
    ctx: Context,
}

impl<W: Worker> WorkerRelay<W> {
    pub fn new(worker: W, ctx: Context) -> Self {
        Self { worker, ctx }
    }
}

impl<W, M> WorkerRelay<W>
where
    W: Worker<Context = Context, Message = M>,
    M: Message + Send + 'static,
{
    /// Convenience function to parse an incoming direct message and
    /// wrap it in a [`Routed`]
    ///
    /// This provides return route information for workers via a
    /// composition side-channel.
    ///
    /// This is currently called twice, once when the message is
    /// dispatched to the worker for authorization and again for
    /// handling. Two unpleasant ways to avoid this are:
    ///
    /// 1. Introduce a Sync bound on the Worker trait that allows us
    ///    to pass the message by reference.
    ///
    /// 2. Introduce a Clone bound on the Message trait that allows us
    ///    to perform a cheaper clone on the message.
    ///
    fn wrap_direct_message(relay_msg: RelayMessage) -> Result<Routed<M>> {
        let payload = relay_msg.local_message().transport().payload.as_slice();
        let msg = parser::message::<M>(payload).map_err(|e| {
            error!("Failed to decode message payload for worker" /* FIXME */);
            e
        })?;
        let routed = Routed::new(
            msg,
            relay_msg.destination().clone(),
            relay_msg.source().clone(),
            relay_msg.into_local_message(),
        );
        Ok(routed)
    }

    /// Receive and handle a single message
    ///
    /// Report errors as they occur, and signal whether the loop should
    /// continue running or not
    async fn recv_message(&mut self) -> Result<bool> {
        let relay_msg = match self.ctx.receiver_next().await? {
            Some(msg) => msg,
            None => {
                trace!("No more messages for worker {}", self.ctx.address());
                return Ok(false);
            }
        };

        // Call the worker handle function - pass errors up
        let routed = Self::wrap_direct_message(relay_msg)?;
        self.worker.handle_message(&mut self.ctx, routed).await?;

        // Signal to the outer loop that we would like to run again
        Ok(true)
    }

    #[cfg_attr(not(feature = "std"), allow(unused_mut))]
    #[cfg_attr(not(feature = "std"), allow(unused_variables))]
    async fn run(mut self, mut ctrl_rx: SmallReceiver<CtrlSignal>) {
        match self.worker.initialize(&mut self.ctx).await {
            Ok(()) => {}
            Err(e) => {
                error!(
                    "Failure during '{}' worker initialisation: {}",
                    self.ctx.address(),
                    e
                );
            }
        }

        let address = self.ctx.address();

        if let Err(e) = self.ctx.set_ready().await {
            error!("Failed to mark worker '{}' as 'ready': {}", address, e);
        }

        #[cfg(feature = "std")]
        loop {
            crate::tokio::select! {
                result = self.recv_message() => {
                    match result {
                        // Successful message handling -- keep running
                        Ok(true) => {},
                        // Successful message handling -- stop now
                        Ok(false) => {
                            break;
                        },
                        // An error occurred -- log and continue
                        Err(e) => {
                            #[cfg(feature = "debugger")]
                            error!("Error encountered during '{}' message handling: {:?}", address, e);
                            #[cfg(not(feature = "debugger"))]
                            error!("Error encountered during '{}' message handling: {}", address, e);
                        }
                    }
                },
                result = ctrl_rx.recv() => {
                    if result.is_some() {
                        debug!("Relay received shutdown signal, terminating!");
                        break;
                    }

                    // We are stopping
                }
            };
        }
        #[cfg(not(feature = "std"))]
        loop {
            match self.recv_message().await {
                // Successful message handling -- keep running
                Ok(true) => {}
                // Successful message handling -- stop now
                Ok(false) => {
                    break;
                }
                // An error occurred -- log and continue
                Err(e) => error!(
                    "Error encountered during '{}' message handling: {}",
                    address, e
                ),
            }
        }

        // Run the shutdown hook for this worker
        match self.worker.shutdown(&mut self.ctx).await {
            Ok(()) => {}
            Err(e) => {
                error!(
                    "Failure during '{}' worker shutdown: {}",
                    self.ctx.address(),
                    e
                );
            }
        }

        // Finally send the router a stop ACK -- log errors
        trace!("Sending shutdown ACK");
        if let Err(e) = self.ctx.send_stop_ack().await {
            error!("Error occurred during stop ACK sending: {}", e);
        }
    }

    /// Build and spawn a new worker relay, returning a send handle to it
    pub(crate) fn init(rt: &Handle, worker: W, ctx: Context, ctrl_rx: SmallReceiver<CtrlSignal>) {
        let relay = WorkerRelay::new(worker, ctx);
        rt.spawn(relay.run(ctrl_rx));
    }
}