use core::time::Duration;
use ockam_core::compat::collections::HashMap;
use ockam_core::compat::time::now;
use ockam_core::compat::{boxed::Box, sync::Arc, sync::RwLock};
use ockam_core::flow_control::FlowControls;
#[cfg(feature = "std")]
use ockam_core::OpenTelemetryContext;
use ockam_core::{
errcode::{Kind, Origin},
Address, AsyncTryClone, DenyAll, Error, IncomingAccessControl, Mailboxes,
OutgoingAccessControl, Result, TransportType,
};
use ockam_transport_core::Transport;
use crate::async_drop::AsyncDrop;
use crate::channel_types::{message_channel, small_channel, SmallReceiver, SmallSender};
use crate::tokio::{self, runtime::Handle};
use crate::{debugger, Context};
use crate::{error::*, relay::CtrlSignal, router::SenderPair, NodeMessage};
pub type DetachedContext = Context;
pub type AsyncDropSender = crate::tokio::sync::oneshot::Sender<Address>;
impl Drop for Context {
fn drop(&mut self) {
if let Some(sender) = self.async_drop_sender.take() {
trace!("De-allocated detached context {}", self.address());
if let Err(e) = sender.send(self.address()) {
warn!("Encountered error while dropping detached context: {}", e);
}
}
}
}
#[ockam_core::async_trait]
impl AsyncTryClone for Context {
async fn async_try_clone(&self) -> Result<Self> {
self.new_detached(
Address::random_tagged("Context.async_try_clone.detached"),
DenyAll,
DenyAll,
)
.await
}
}
impl Context {
pub(crate) fn new(
rt: Handle,
sender: SmallSender<NodeMessage>,
mailboxes: Mailboxes,
async_drop_sender: Option<AsyncDropSender>,
transports: Arc<RwLock<HashMap<TransportType, Arc<dyn Transport>>>>,
flow_controls: &FlowControls,
#[cfg(feature = "std")] tracing_context: OpenTelemetryContext,
) -> (Self, SenderPair, SmallReceiver<CtrlSignal>) {
let (mailbox_tx, receiver) = message_channel();
let (ctrl_tx, ctrl_rx) = small_channel();
(
Self {
rt,
sender,
mailboxes,
receiver,
async_drop_sender,
mailbox_count: Arc::new(0.into()),
transports,
flow_controls: flow_controls.clone(),
#[cfg(feature = "std")]
tracing_context,
},
SenderPair {
msgs: mailbox_tx,
ctrl: ctrl_tx,
},
ctrl_rx,
)
}
pub(crate) fn copy_with_mailboxes(
&self,
mailboxes: Mailboxes,
) -> (Context, SenderPair, SmallReceiver<CtrlSignal>) {
Context::new(
self.runtime().clone(),
self.sender().clone(),
mailboxes,
None,
self.transports.clone(),
&self.flow_controls,
#[cfg(feature = "std")]
self.tracing_context(),
)
}
pub(crate) fn copy_with_mailboxes_detached(
&self,
mailboxes: Mailboxes,
drop_sender: AsyncDropSender,
) -> (Context, SenderPair, SmallReceiver<CtrlSignal>) {
Context::new(
self.runtime().clone(),
self.sender().clone(),
mailboxes,
Some(drop_sender),
self.transports.clone(),
&self.flow_controls,
#[cfg(feature = "std")]
OpenTelemetryContext::current(),
)
}
#[doc(hidden)]
pub async fn sleep(&self, duration: Duration) {
tokio::time::sleep(duration).await;
}
#[doc(hidden)]
pub async fn sleep_long_until(&self, deadline_timestamp_seconds: u64) {
let n = now().unwrap();
if deadline_timestamp_seconds <= n {
return;
}
let duration = deadline_timestamp_seconds - n;
if duration < 5 {
warn!(
"Low precision sleeping for less than 5 seconds. Duration: {:?}",
duration
);
self.sleep(Duration::from_secs(duration)).await;
return;
}
loop {
self.sleep(Duration::from_secs(1)).await;
if now().unwrap() >= deadline_timestamp_seconds {
return;
}
}
}
pub async fn new_detached_with_mailboxes(
&self,
mailboxes: Mailboxes,
) -> Result<DetachedContext> {
let ctx = self.new_detached_impl(mailboxes).await?;
debugger::log_inherit_context("DETACHED_WITH_MB", self, &ctx);
Ok(ctx)
}
pub async fn new_detached(
&self,
address: impl Into<Address>,
incoming: impl IncomingAccessControl,
outgoing: impl OutgoingAccessControl,
) -> Result<DetachedContext> {
let mailboxes = Mailboxes::main(address.into(), Arc::new(incoming), Arc::new(outgoing));
let ctx = self.new_detached_impl(mailboxes).await?;
debugger::log_inherit_context("DETACHED", self, &ctx);
Ok(ctx)
}
async fn new_detached_impl(&self, mailboxes: Mailboxes) -> Result<DetachedContext> {
let (async_drop, drop_sender) = AsyncDrop::new(self.sender.clone());
self.rt.spawn(async_drop.run());
let addresses = mailboxes.addresses();
let (ctx, sender, _) = self.copy_with_mailboxes_detached(mailboxes, drop_sender);
let (msg, mut rx) =
NodeMessage::start_worker(addresses, sender, true, Arc::clone(&self.mailbox_count));
self.sender
.send(msg)
.await
.map_err(|e| Error::new(Origin::Node, Kind::Invalid, e))?;
rx.recv()
.await
.ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??;
Ok(ctx)
}
}
#[cfg(test)]
mod tests {
use ockam_core::{async_trait, Mailbox};
use super::*;
#[ockam_macros::test(crate = "crate")]
async fn test_copy(ctx: &mut Context) -> Result<()> {
let transport = Arc::new(SomeTransport());
ctx.register_transport(transport.clone());
let mailboxes = Mailboxes::new(Mailbox::deny_all("address"), vec![]);
let (copy, _, _) = ctx.copy_with_mailboxes(mailboxes.clone());
assert!(copy.is_transport_registered(transport.transport_type()));
let (_, drop_sender) = AsyncDrop::new(ctx.sender.clone());
let (copy, _, _) = ctx.copy_with_mailboxes_detached(mailboxes, drop_sender);
assert!(copy.is_transport_registered(transport.transport_type()));
Ok(())
}
struct SomeTransport();
#[async_trait]
impl Transport for SomeTransport {
fn transport_type(&self) -> TransportType {
TransportType::new(0)
}
async fn resolve_address(&self, address: Address) -> Result<Address> {
Ok(address)
}
async fn disconnect(&self, _address: Address) -> Result<()> {
Ok(())
}
}
}