1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
use crate::channel_types::SmallReceiver;
use crate::relay::CtrlSignal;
use crate::tokio::runtime::Handle;
use crate::{parser, Context};
use ockam_core::{Message, RelayMessage, Result, Routed, Worker};
/// Worker relay machinery
///
/// Every worker in the Ockam runtime needs a certain amount of logic
/// and state attached to the lifecycle of the user's worker code.
/// The relay manages this state and runtime behaviour.
pub struct WorkerRelay<W> {
worker: W,
ctx: Context,
}
impl<W: Worker> WorkerRelay<W> {
pub fn new(worker: W, ctx: Context) -> Self {
Self { worker, ctx }
}
}
impl<W, M> WorkerRelay<W>
where
W: Worker<Context = Context, Message = M>,
M: Message + Send + 'static,
{
/// Convenience function to parse an incoming direct message and
/// wrap it in a [`Routed`]
///
/// This provides return route information for workers via a
/// composition side-channel.
///
/// This is currently called twice, once when the message is
/// dispatched to the worker for authorization and again for
/// handling. Two unpleasant ways to avoid this are:
///
/// 1. Introduce a Sync bound on the Worker trait that allows us
/// to pass the message by reference.
///
/// 2. Introduce a Clone bound on the Message trait that allows us
/// to perform a cheaper clone on the message.
///
fn wrap_direct_message(relay_msg: RelayMessage) -> Result<Routed<M>> {
let payload = relay_msg.local_message().transport().payload.as_slice();
let msg = parser::message::<M>(payload).map_err(|e| {
error!("Failed to decode message payload for worker" /* FIXME */);
e
})?;
let routed = Routed::new(
msg,
relay_msg.destination().clone(),
relay_msg.source().clone(),
relay_msg.into_local_message(),
);
Ok(routed)
}
/// Receive and handle a single message
///
/// Report errors as they occur, and signal whether the loop should
/// continue running or not
async fn recv_message(&mut self) -> Result<bool> {
let relay_msg = match self.ctx.receiver_next().await? {
Some(msg) => msg,
None => {
trace!("No more messages for worker {}", self.ctx.address());
return Ok(false);
}
};
// Call the worker handle function - pass errors up
let routed = Self::wrap_direct_message(relay_msg)?;
self.worker.handle_message(&mut self.ctx, routed).await?;
// Signal to the outer loop that we would like to run again
Ok(true)
}
#[cfg_attr(not(feature = "std"), allow(unused_mut))]
#[cfg_attr(not(feature = "std"), allow(unused_variables))]
async fn run(mut self, mut ctrl_rx: SmallReceiver<CtrlSignal>) {
match self.worker.initialize(&mut self.ctx).await {
Ok(()) => {}
Err(e) => {
error!(
"Failure during '{}' worker initialisation: {}",
self.ctx.address(),
e
);
}
}
let address = self.ctx.address();
if let Err(e) = self.ctx.set_ready().await {
error!("Failed to mark worker '{}' as 'ready': {}", address, e);
}
#[cfg(feature = "std")]
loop {
crate::tokio::select! {
result = self.recv_message() => {
match result {
// Successful message handling -- keep running
Ok(true) => {},
// Successful message handling -- stop now
Ok(false) => {
break;
},
// An error occurred -- log and continue
Err(e) => {
#[cfg(feature = "debugger")]
error!("Error encountered during '{}' message handling: {:?}", address, e);
#[cfg(not(feature = "debugger"))]
error!("Error encountered during '{}' message handling: {}", address, e);
}
}
},
result = ctrl_rx.recv() => {
if result.is_some() {
debug!("Relay received shutdown signal, terminating!");
break;
}
// We are stopping
}
};
}
#[cfg(not(feature = "std"))]
loop {
match self.recv_message().await {
// Successful message handling -- keep running
Ok(true) => {}
// Successful message handling -- stop now
Ok(false) => {
break;
}
// An error occurred -- log and continue
Err(e) => error!(
"Error encountered during '{}' message handling: {}",
address, e
),
}
}
// Run the shutdown hook for this worker
match self.worker.shutdown(&mut self.ctx).await {
Ok(()) => {}
Err(e) => {
error!(
"Failure during '{}' worker shutdown: {}",
self.ctx.address(),
e
);
}
}
// Finally send the router a stop ACK -- log errors
trace!("Sending shutdown ACK");
if let Err(e) = self.ctx.send_stop_ack().await {
error!("Error occurred during stop ACK sending: {}", e);
}
}
/// Build and spawn a new worker relay, returning a send handle to it
pub(crate) fn init(rt: &Handle, worker: W, ctx: Context, ctrl_rx: SmallReceiver<CtrlSignal>) {
let relay = WorkerRelay::new(worker, ctx);
rt.spawn(relay.run(ctrl_rx));
}
}