use crate::channel_types::{SmallReceiver, SmallSender};
use crate::tokio::runtime::Handle;
use crate::{error::*, AsyncDropSender, NodeMessage};
use core::sync::atomic::AtomicUsize;
use ockam_core::compat::collections::HashMap;
use ockam_core::compat::sync::{Arc, RwLock};
use ockam_core::compat::{string::String, vec::Vec};
use ockam_core::flow_control::FlowControls;
use ockam_core::{async_trait, Address, Mailboxes, RelayMessage, Result, TransportType};
#[cfg(feature = "std")]
use core::fmt::{Debug, Formatter};
use ockam_transport_core::Transport;
pub const DEFAULT_TIMEOUT: u64 = 30;
pub struct Context {
pub(super) mailboxes: Mailboxes,
pub(super) sender: SmallSender<NodeMessage>,
pub(super) rt: Handle,
pub(super) receiver: SmallReceiver<RelayMessage>,
pub(super) async_drop_sender: Option<AsyncDropSender>,
pub(super) mailbox_count: Arc<AtomicUsize>,
pub(super) transports: Arc<RwLock<HashMap<TransportType, Arc<dyn Transport>>>>,
pub(super) flow_controls: FlowControls,
}
#[async_trait]
pub trait HasContext {
fn get_context(&self) -> &Context;
}
#[cfg(feature = "std")]
impl Debug for Context {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Context")
.field("mailboxes", &self.mailboxes)
.field("sender", &self.sender)
.field("runtime", &self.rt)
.finish()
}
}
impl Context {
pub fn runtime(&self) -> &Handle {
&self.rt
}
pub(crate) fn mailbox_count(&self) -> Arc<AtomicUsize> {
self.mailbox_count.clone()
}
pub(crate) fn sender(&self) -> &SmallSender<NodeMessage> {
&self.sender
}
pub fn address(&self) -> Address {
self.mailboxes.main_address()
}
pub fn addresses(&self) -> Vec<Address> {
self.mailboxes.addresses()
}
pub fn mailboxes(&self) -> &Mailboxes {
&self.mailboxes
}
pub fn flow_controls(&self) -> &FlowControls {
&self.flow_controls
}
}
impl Context {
pub async fn set_cluster<S: Into<String>>(&self, label: S) -> Result<()> {
let (msg, mut rx) = NodeMessage::set_cluster(self.address(), label.into());
self.sender
.send(msg)
.await
.map_err(NodeError::from_send_err)?;
rx.recv()
.await
.ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??
.is_ok()
}
pub async fn list_workers(&self) -> Result<Vec<Address>> {
let (msg, mut reply_rx) = NodeMessage::list_workers();
self.sender
.send(msg)
.await
.map_err(NodeError::from_send_err)?;
reply_rx
.recv()
.await
.ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??
.take_workers()
}
pub(crate) async fn send_stop_ack(&self) -> Result<()> {
self.sender
.send(NodeMessage::StopAck(self.address()))
.await
.map_err(NodeError::from_send_err)?;
Ok(())
}
pub(crate) async fn set_ready(&mut self) -> Result<()> {
self.sender
.send(NodeMessage::set_ready(self.address()))
.await
.map_err(NodeError::from_send_err)?;
Ok(())
}
pub async fn wait_for<A: Into<Address>>(&self, addr: A) -> Result<()> {
let (msg, mut reply) = NodeMessage::get_ready(addr.into());
self.sender
.send(msg)
.await
.map_err(NodeError::from_send_err)?;
reply
.recv()
.await
.ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??;
Ok(())
}
}