1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
use crate::channel_types::{small_channel, MessageSender, SmallReceiver, SmallSender};
use crate::{
error::{NodeError, NodeReason, RouterReason, WorkerReason},
router::SenderPair,
};
use core::{fmt, sync::atomic::AtomicUsize};
use ockam_core::compat::{string::String, sync::Arc, vec::Vec};
use ockam_core::{Address, Error, RelayMessage, Result, TransportType};
/// Messages sent from the Node to the Executor
#[derive(Debug)]
pub enum NodeMessage {
/// Start a new worker and store the send handle
StartWorker {
/// The set of addresses in use by this worker
addrs: Vec<Address>,
/// Pair of senders to the worker relay (msgs and ctrl)
senders: SenderPair,
/// A detached context/ "worker" runs no relay state
detached: bool,
/// A mechanism to read channel fill-state for a worker
mailbox_count: Arc<AtomicUsize>,
/// Reply channel for command confirmation
reply: SmallSender<NodeReplyResult>,
},
/// Return a list of all worker addresses
ListWorkers(SmallSender<NodeReplyResult>),
/// Add an existing address to a cluster
SetCluster(Address, String, SmallSender<NodeReplyResult>),
/// Stop an existing worker
StopWorker(Address, bool, SmallSender<NodeReplyResult>),
/// Start a new processor
StartProcessor(Address, SenderPair, SmallSender<NodeReplyResult>),
/// Stop an existing processor
StopProcessor(Address, SmallSender<NodeReplyResult>),
/// Stop the node (and all workers)
StopNode(ShutdownType, SmallSender<NodeReplyResult>),
/// Immediately stop the node runtime
AbortNode,
/// Let the router know a particular address has stopped
StopAck(Address),
/// Request the sender for a worker address
SenderReq(Address, SmallSender<NodeReplyResult>),
/// Register a new router for a route id type
Router(TransportType, Address, SmallSender<NodeReplyResult>),
/// Message the router to set an address as "ready"
SetReady(Address),
/// Check whether an address has been marked as "ready"
CheckReady(Address, SmallSender<NodeReplyResult>),
}
impl fmt::Display for NodeMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> core::fmt::Result {
match self {
NodeMessage::StartWorker { .. } => write!(f, "StartWorker"),
NodeMessage::ListWorkers(_) => write!(f, "ListWorkers"),
NodeMessage::SetCluster(_, _, _) => write!(f, "SetCluster"),
NodeMessage::StopWorker(_, _, _) => write!(f, "StopWorker"),
NodeMessage::StartProcessor(_, _, _) => write!(f, "StartProcessor"),
NodeMessage::StopProcessor(_, _) => write!(f, "StopProcessor"),
NodeMessage::StopNode(_, _) => write!(f, "StopNode"),
NodeMessage::AbortNode => write!(f, "AbortNode"),
NodeMessage::StopAck(_) => write!(f, "StopAck"),
NodeMessage::SenderReq(_, _) => write!(f, "SenderReq"),
NodeMessage::Router(_, _, _) => write!(f, "Router"),
NodeMessage::SetReady(_) => write!(f, "SetReady"),
NodeMessage::CheckReady(_, _) => write!(f, "CheckReady"),
}
}
}
impl NodeMessage {
/// Create a start worker message
///
/// * `senders`: message and command senders for the relay
///
/// * `detached`: indicate whether this worker address has a full
/// relay behind it that can respond to shutdown
/// commands. Setting this to `true` will disable
/// stop ACK support in the router
pub fn start_worker(
addrs: Vec<Address>,
senders: SenderPair,
detached: bool,
mailbox_count: Arc<AtomicUsize>,
) -> (Self, SmallReceiver<NodeReplyResult>) {
let (reply, rx) = small_channel();
(
Self::StartWorker {
addrs,
senders,
detached,
mailbox_count,
reply,
},
rx,
)
}
/// Create a start worker message
pub fn start_processor(
address: Address,
senders: SenderPair,
) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::StartProcessor(address, senders, tx), rx)
}
/// Create a stop worker message and reply receiver
pub fn stop_processor(address: Address) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::StopProcessor(address, tx), rx)
}
/// Create a list worker message and reply receiver
pub fn list_workers() -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::ListWorkers(tx), rx)
}
/// Create a set cluster message and reply receiver
pub fn set_cluster(addr: Address, label: String) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::SetCluster(addr, label, tx), rx)
}
/// Create a stop worker message and reply receiver
pub fn stop_worker(address: Address, detached: bool) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::StopWorker(address, detached, tx), rx)
}
/// Create a stop node message
pub fn stop_node(tt: ShutdownType) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::StopNode(tt, tx), rx)
}
/// Create a sender request message and reply receiver
pub fn sender_request(route: Address) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::SenderReq(route, tx), rx)
}
/// Create a SetReady message and reply receiver
pub fn set_ready(addr: Address) -> Self {
Self::SetReady(addr)
}
/// Create a GetReady message and reply receiver
pub fn get_ready(addr: Address) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::CheckReady(addr, tx), rx)
}
}
/// The reply/result of a Node
pub type NodeReplyResult = core::result::Result<RouterReply, Error>;
/// Successful return values from a router command
#[derive(Debug)]
pub enum RouterReply {
/// Success with no payload
Ok,
/// A list of worker addresses
Workers(Vec<Address>),
/// Message sender to a specific worker
Sender {
/// The address a message is being sent to
addr: Address,
/// The relay sender
sender: MessageSender<RelayMessage>,
},
/// Indicate the 'ready' state of an address
State(bool),
}
/// Specify the type of node shutdown
///
/// For most users `ShutdownType::Graceful()` is recommended. The
/// `Default` implementation uses a 1 second timeout.
#[derive(Debug, Copy, Clone)]
#[non_exhaustive]
pub enum ShutdownType {
/// Execute a graceful shutdown given a maximum timeout
///
/// The following steps will be taken by the internal router
/// during graceful shutdown procedure:
///
/// * Signal clusterless workers to stop
/// * Wait for shutdown ACK hooks from worker set
/// * Signal worker clusters in reverse-creation order to stop
/// * Wait for shutdown ACK hooks from each cluster before moving onto the
/// next
/// * All shutdown-signaled workers may process their entire mailbox,
/// while not allowing new messages to be queued
///
/// Graceful shutdown procedure will be pre-maturely terminated
/// when reaching the timeout (failover into `Immediate`
/// strategy). **A given timeout of `0` will wait forever!**
Graceful(u8),
/// Immediately shutdown workers and run shutdown hooks
///
/// This strategy can lead to data loss:
///
/// * Unhandled mailbox messages will be dropped
/// * Shutdown hooks may not be able to send messages
///
/// This strategy is not recommended for general use, but will be
/// selected as a failover, if the `Graceful` strategy reaches its
/// timeout limit.
Immediate,
}
impl Default for ShutdownType {
fn default() -> Self {
Self::Graceful(1)
}
}
impl RouterReply {
/// Return [RouterReply::Ok]
pub fn ok() -> NodeReplyResult {
Ok(RouterReply::Ok)
}
/// Return [RouterReply::State]
pub fn state(b: bool) -> NodeReplyResult {
Ok(RouterReply::State(b))
}
/// Return [NodeError::Address] not found
#[track_caller]
pub fn no_such_address(a: Address) -> NodeReplyResult {
Err(NodeError::Address(a).not_found())
}
/// Return [NodeError::Address] already exists for the given address
#[track_caller]
pub fn worker_exists(a: Address) -> NodeReplyResult {
Err(NodeError::Address(a).already_exists())
}
/// Return [NodeError::RouterState] already exists
#[track_caller]
pub fn router_exists() -> NodeReplyResult {
Err(NodeError::RouterState(RouterReason::Duplicate).already_exists())
}
/// Return [NodeError::NodeState] conflict
#[track_caller]
pub fn node_rejected(reason: NodeReason) -> NodeReplyResult {
Err(NodeError::NodeState(reason).conflict())
}
/// Return [NodeError::WorkerState] conflict
#[track_caller]
pub fn worker_rejected(reason: WorkerReason) -> NodeReplyResult {
Err(NodeError::WorkerState(reason).conflict())
}
/// Return [RouterReply::Workers] for the given addresses
pub fn workers(v: Vec<Address>) -> NodeReplyResult {
Ok(Self::Workers(v))
}
/// Return [RouterReply::Sender] for the given information
pub fn sender(addr: Address, sender: MessageSender<RelayMessage>) -> NodeReplyResult {
Ok(RouterReply::Sender { addr, sender })
}
/// Consume the wrapper and return [RouterReply::Sender]
pub fn take_sender(self) -> Result<(Address, MessageSender<RelayMessage>)> {
match self {
Self::Sender { addr, sender } => Ok((addr, sender)),
_ => Err(NodeError::NodeState(NodeReason::Unknown).internal()),
}
}
/// Consume the wrapper and return [RouterReply::Workers]
pub fn take_workers(self) -> Result<Vec<Address>> {
match self {
Self::Workers(w) => Ok(w),
_ => Err(NodeError::NodeState(NodeReason::Unknown).internal()),
}
}
/// Consume the wrapper and return [RouterReply::State]
pub fn take_state(self) -> Result<bool> {
match self {
Self::State(b) => Ok(b),
_ => Err(NodeError::NodeState(NodeReason::Unknown).internal()),
}
}
/// Returns Ok if self is [RouterReply::Ok]
pub fn is_ok(self) -> Result<()> {
match self {
Self::Ok => Ok(()),
_ => Err(NodeError::NodeState(NodeReason::Unknown).internal()),
}
}
}