use crate::channel_types::{small_channel, MessageSender, SmallReceiver, SmallSender};
use crate::{
error::{NodeError, NodeReason, RouterReason, WorkerReason},
router::SenderPair,
};
use core::{fmt, sync::atomic::AtomicUsize};
use ockam_core::compat::{string::String, sync::Arc, vec::Vec};
use ockam_core::{
Address, AddressAndMetadata, AddressMetadata, Error, RelayMessage, Result, TransportType,
};
#[derive(Debug)]
pub enum NodeMessage {
StartWorker {
addrs: Vec<Address>,
senders: SenderPair,
detached: bool,
mailbox_count: Arc<AtomicUsize>,
reply: SmallSender<NodeReplyResult>,
addresses_metadata: Vec<AddressAndMetadata>,
},
ListWorkers(SmallSender<NodeReplyResult>),
SetCluster(Address, String, SmallSender<NodeReplyResult>),
StopWorker(Address, bool, SmallSender<NodeReplyResult>),
StartProcessor {
addrs: Vec<Address>,
senders: SenderPair,
reply: SmallSender<NodeReplyResult>,
addresses_metadata: Vec<AddressAndMetadata>,
},
StopProcessor(Address, SmallSender<NodeReplyResult>),
StopNode(ShutdownType, SmallSender<NodeReplyResult>),
AbortNode,
StopAck(Address),
SenderReq(Address, SmallSender<NodeReplyResult>),
Router(TransportType, Address, SmallSender<NodeReplyResult>),
SetReady(Address),
CheckReady(Address, SmallSender<NodeReplyResult>),
FindTerminalAddress(Vec<Address>, SmallSender<NodeReplyResult>),
GetMetadata(Address, SmallSender<NodeReplyResult>),
}
impl fmt::Display for NodeMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
NodeMessage::StartWorker { .. } => write!(f, "StartWorker"),
NodeMessage::ListWorkers(_) => write!(f, "ListWorkers"),
NodeMessage::SetCluster(_, _, _) => write!(f, "SetCluster"),
NodeMessage::StopWorker(_, _, _) => write!(f, "StopWorker"),
NodeMessage::StartProcessor { .. } => write!(f, "StartProcessor"),
NodeMessage::StopProcessor(_, _) => write!(f, "StopProcessor"),
NodeMessage::StopNode(_, _) => write!(f, "StopNode"),
NodeMessage::AbortNode => write!(f, "AbortNode"),
NodeMessage::StopAck(_) => write!(f, "StopAck"),
NodeMessage::SenderReq(_, _) => write!(f, "SenderReq"),
NodeMessage::Router(_, _, _) => write!(f, "Router"),
NodeMessage::SetReady(_) => write!(f, "SetReady"),
NodeMessage::CheckReady(_, _) => write!(f, "CheckReady"),
NodeMessage::FindTerminalAddress(_, _) => write!(f, "FindTerminalAddress"),
NodeMessage::GetMetadata(_, _) => write!(f, "ReadMetadata"),
}
}
}
impl NodeMessage {
pub fn start_worker(
addrs: Vec<Address>,
senders: SenderPair,
detached: bool,
mailbox_count: Arc<AtomicUsize>,
metadata: Vec<AddressAndMetadata>,
) -> (Self, SmallReceiver<NodeReplyResult>) {
let (reply, rx) = small_channel();
(
Self::StartWorker {
addrs,
senders,
detached,
mailbox_count,
reply,
addresses_metadata: metadata,
},
rx,
)
}
pub fn start_processor(
addrs: Vec<Address>,
senders: SenderPair,
metadata: Vec<AddressAndMetadata>,
) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(
Self::StartProcessor {
addrs,
senders,
reply: tx,
addresses_metadata: metadata,
},
rx,
)
}
pub fn stop_processor(address: Address) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::StopProcessor(address, tx), rx)
}
pub fn list_workers() -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::ListWorkers(tx), rx)
}
pub fn set_cluster(addr: Address, label: String) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::SetCluster(addr, label, tx), rx)
}
pub fn stop_worker(address: Address, detached: bool) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::StopWorker(address, detached, tx), rx)
}
pub fn stop_node(tt: ShutdownType) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::StopNode(tt, tx), rx)
}
pub fn sender_request(route: Address) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::SenderReq(route, tx), rx)
}
pub fn set_ready(addr: Address) -> Self {
Self::SetReady(addr)
}
pub fn get_ready(addr: Address) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::CheckReady(addr, tx), rx)
}
pub fn find_terminal_address(addrs: Vec<Address>) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::FindTerminalAddress(addrs, tx), rx)
}
pub fn read_metadata(address: Address) -> (Self, SmallReceiver<NodeReplyResult>) {
let (tx, rx) = small_channel();
(Self::GetMetadata(address, tx), rx)
}
}
pub type NodeReplyResult = core::result::Result<RouterReply, Error>;
#[derive(Debug)]
pub enum RouterReply {
Ok,
Workers(Vec<Address>),
Sender {
addr: Address,
sender: MessageSender<RelayMessage>,
},
State(bool),
TerminalAddress(Option<AddressAndMetadata>),
Metadata(Option<AddressMetadata>),
}
#[derive(Debug, Copy, Clone)]
#[non_exhaustive]
pub enum ShutdownType {
Graceful(u8),
Immediate,
}
impl Default for ShutdownType {
fn default() -> Self {
Self::Graceful(1)
}
}
impl RouterReply {
pub fn ok() -> NodeReplyResult {
Ok(RouterReply::Ok)
}
pub fn state(b: bool) -> NodeReplyResult {
Ok(RouterReply::State(b))
}
#[track_caller]
pub fn no_such_address(a: Address) -> NodeReplyResult {
Err(NodeError::Address(a).not_found())
}
#[track_caller]
pub fn worker_exists(a: Address) -> NodeReplyResult {
Err(NodeError::Address(a).already_exists())
}
#[track_caller]
pub fn router_exists() -> NodeReplyResult {
Err(NodeError::RouterState(RouterReason::Duplicate).already_exists())
}
#[track_caller]
pub fn node_rejected(reason: NodeReason) -> NodeReplyResult {
Err(NodeError::NodeState(reason).conflict())
}
#[track_caller]
pub fn worker_rejected(reason: WorkerReason) -> NodeReplyResult {
Err(NodeError::WorkerState(reason).conflict())
}
pub fn workers(v: Vec<Address>) -> NodeReplyResult {
Ok(Self::Workers(v))
}
pub fn terminal_address(address: Option<AddressAndMetadata>) -> NodeReplyResult {
Ok(Self::TerminalAddress(address))
}
pub fn metadata(value: Option<AddressMetadata>) -> NodeReplyResult {
Ok(Self::Metadata(value))
}
pub fn sender(addr: Address, sender: MessageSender<RelayMessage>) -> NodeReplyResult {
Ok(RouterReply::Sender { addr, sender })
}
pub fn take_sender(self) -> Result<(Address, MessageSender<RelayMessage>)> {
match self {
Self::Sender { addr, sender } => Ok((addr, sender)),
_ => Err(NodeError::NodeState(NodeReason::Unknown).internal()),
}
}
pub fn take_workers(self) -> Result<Vec<Address>> {
match self {
Self::Workers(w) => Ok(w),
_ => Err(NodeError::NodeState(NodeReason::Unknown).internal()),
}
}
pub fn take_state(self) -> Result<bool> {
match self {
Self::State(b) => Ok(b),
_ => Err(NodeError::NodeState(NodeReason::Unknown).internal()),
}
}
pub fn take_terminal_address(self) -> Result<Option<AddressAndMetadata>> {
match self {
Self::TerminalAddress(addr) => Ok(addr),
_ => Err(NodeError::NodeState(NodeReason::Unknown).internal()),
}
}
pub fn take_metadata(self) -> Result<Option<AddressMetadata>> {
match self {
Self::Metadata(value) => Ok(value),
_ => Err(NodeError::NodeState(NodeReason::Unknown).internal()),
}
}
pub fn is_ok(self) -> Result<()> {
match self {
Self::Ok => Ok(()),
_ => Err(NodeError::NodeState(NodeReason::Unknown).internal()),
}
}
}