1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
use crate::tokio::runtime::Runtime;
use crate::{debugger, Context, Executor};
use ockam_core::compat::sync::Arc;
use ockam_core::flow_control::FlowControls;
#[cfg(feature = "std")]
use ockam_core::OpenTelemetryContext;
use ockam_core::{Address, AllowAll, Mailbox, Mailboxes, LATEST_PROTOCOL_VERSION};
/// A minimal worker implementation that does nothing
pub struct NullWorker;
impl ockam_core::Worker for NullWorker {
type Context = Context;
type Message = (); // This message type is never used
}
/// Start a node with a custom setup configuration
///
/// The `start_node()` function wraps this type and simply calls
/// `NodeBuilder::default()`. Varying use-cases should use the
/// builder API to customise the underlying node that is created.
pub struct NodeBuilder {
logging: bool,
exit_on_panic: bool,
rt: Option<Arc<Runtime>>,
}
impl Default for NodeBuilder {
fn default() -> Self {
Self::new()
}
}
impl NodeBuilder {
/// Create a node
pub fn new() -> Self {
Self {
logging: true,
exit_on_panic: true,
rt: None,
}
}
/// Disable logging on this node
pub fn no_logging(self) -> Self {
Self {
logging: false,
exit_on_panic: self.exit_on_panic,
rt: self.rt,
}
}
/// Disable exit on panic on this node
pub fn no_exit_on_panic(self) -> Self {
Self {
logging: self.logging,
exit_on_panic: false,
rt: self.rt,
}
}
/// Use a specific runtime
pub fn with_runtime(self, rt: Arc<Runtime>) -> Self {
Self {
logging: self.logging,
exit_on_panic: self.exit_on_panic,
rt: Some(rt),
}
}
/// Consume this builder and yield a new Ockam Node
#[inline]
pub fn build(self) -> (Context, Executor) {
if self.logging {
setup_tracing();
}
// building a node should happen only once per process
// to create the Context and the Executor (containing the Router)
// Since the Executor is used to run async functions we need to catch
// any panic raised by those functions and exit the current process in case this happens.
// Otherwise the Executor might stay blocked on the Router execution.
#[cfg(feature = "std")]
if self.exit_on_panic {
std::panic::set_hook(Box::new(|panic_info| {
let message1 = format!("A fatal error occurred: {panic_info}.");
let message2 = "Please report this issue, with a copy of your logs, to https://github.com/build-trust/ockam/issues.";
error!(message1);
error!(message2);
println!("{}", message1);
println!("{}", message2);
std::process::exit(1);
}));
}
info!("Initializing ockam node");
// Shared instance of FlowControls
let flow_controls = FlowControls::new();
let rt = self.rt.unwrap_or_else(|| {
#[cfg(feature = "std")]
{
Arc::new(
crate::tokio::runtime::Builder::new_multi_thread()
// Using a lower stack size than the default (2MB),
// this helps improve the cache hit ratio and reduces
// the memory footprint.
// Can be increased if needed.
.thread_stack_size(1024 * 1024)
.enable_all()
.build()
.expect("cannot initialize the tokio runtime"),
)
}
#[cfg(not(feature = "std"))]
Arc::new(Runtime::new().expect("cannot initialize the tokio runtime"))
});
let mut exe = Executor::new(rt.clone(), &flow_controls);
let addr: Address = "app".into();
// The root application worker needs a mailbox and relay to accept
// messages from workers, and to buffer incoming transcoded data.
let (ctx, sender, _) = Context::new(
LATEST_PROTOCOL_VERSION,
rt.handle().clone(),
exe.sender(),
Mailboxes::new(
Mailbox::new(addr, Arc::new(AllowAll), Arc::new(AllowAll)),
vec![],
),
None,
Default::default(),
&flow_controls,
#[cfg(feature = "std")]
OpenTelemetryContext::current(),
);
debugger::log_inherit_context("NODE", &ctx, &ctx);
// Register this mailbox handle with the executor
exe.initialize_system("app", sender);
// Then return the root context and executor
(ctx, exe)
}
}
/// Utility to setup tracing-subscriber from the environment.
///
/// Does nothing if the `no_init_tracing` feature is enabled (for now -- this
/// should be improved, though).
fn setup_tracing() {
#[cfg(feature = "std")]
{
use tracing_subscriber::{filter::LevelFilter, fmt, prelude::*, EnvFilter};
static ONCE: std::sync::Once = std::sync::Once::new();
ONCE.call_once(|| {
let filter = EnvFilter::try_from_env("OCKAM_LOG").unwrap_or_else(|_| {
EnvFilter::default()
.add_directive(LevelFilter::INFO.into())
.add_directive("ockam_node=info".parse().unwrap())
});
// Ignore failure, since we may init externally.
let _ = tracing_subscriber::registry()
.with(filter)
.with(tracing_error::ErrorLayer::default())
.with(fmt::layer())
.try_init();
});
}
}