mod actors;
mod components;
mod formats;
mod host;
mod log_tracer;
pub mod migration;
mod node_api;
mod node_impl;
mod node_storage;
pub mod settings;
mod util;
pub(crate) mod version;
pub use components::swarm_observer::SwarmObserver;
pub use node_impl::NodeError;
pub use util::{init_shutdown_ceremony, shutdown_ceremony, spawn_with_name};
pub use formats::{node_settings, ShutdownReason};
#[cfg(not(target_os = "android"))]
pub use host::lock_working_dir;
use crate::util::formats::LogSeverity;
use crate::{
swarm::event_store_ref::{self, EventStoreRef},
util::{variable::Writer, SocketAddrHelper},
};
use acto::ActoRuntime;
use actors::Actors;
use anyhow::Context;
use ax_types::service::SwarmState;
use components::{
android::{Android, FfiMessage},
logging::Logging,
node_api::NodeApi,
store::{Store, StoreRequest},
swarm_observer::swarm_observer,
Component, ComponentRequest,
};
use crossbeam::channel::{bounded, Receiver, Sender};
use formats::ExternalEvent;
pub use host::initialize_repository;
use host::Host;
use node_impl::{ComponentChannel, NodeProcessResult, NodeWrapper};
use settings::SettingsRequest;
use std::{net::ToSocketAddrs, path::PathBuf, str::FromStr, thread};
use util::init_panic_hook;
#[cfg(all(target_env = "musl", target_pointer_width = "64"))]
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
pub enum Runtime {
Android { ffi_sink: Sender<FfiMessage> },
Windows,
Linux,
}
pub struct ApplicationState {
pub join_handles: Vec<thread::JoinHandle<()>>,
pub manager: NodeWrapper,
_actors: Actors,
#[allow(dead_code)]
#[cfg(not(target_os = "android"))]
_lock: fslock::LockFile,
}
fn bounded_channel<T>() -> (Sender<T>, Receiver<T>) {
bounded(256)
}
fn spawn(
working_dir: PathBuf,
runtime: Runtime,
bind_to: BindTo,
log_no_color: bool,
log_as_json: bool,
) -> anyhow::Result<ApplicationState> {
#[cfg(not(target_os = "android"))]
let _lock = host::lock_working_dir(&working_dir)?;
let mut join_handles = vec![];
let (store_tx, store_rx) = bounded_channel();
let (node_tx, node_rx) = bounded_channel();
let (logs_tx, logs_rx) = bounded_channel();
let (nodeapi_tx, nodeapi_rx) = bounded_channel();
let actors = Actors::new(node_tx.clone()).context("creating Actors")?;
let swarm_state_writer = Writer::new(SwarmState::default());
let swarm_state = swarm_state_writer.reader();
let swarm_observer = actors
.rt()
.spawn_actor("swarm_observer", |cell| swarm_observer(cell, swarm_state_writer));
let swarm_observer_ref = swarm_observer.me.clone();
actors.supervise(swarm_observer.contramap(SwarmObserver::from));
let tx = store_tx.clone();
let event_store = EventStoreRef::new(move |e| {
tx.try_send(ComponentRequest::Individual(StoreRequest::EventsV2(e)))
.map_err(event_store_ref::Error::from)
});
let mut components = vec![
(Store::get_type().into(), ComponentChannel::Store(store_tx.clone())),
(NodeApi::get_type().into(), ComponentChannel::NodeApi(nodeapi_tx)),
(Logging::get_type().into(), ComponentChannel::Logging(logs_tx)),
];
let logging = Logging::new(logs_rx, LogSeverity::default(), log_no_color, log_as_json);
log::set_boxed_logger(Box::new(log_tracer::LogTracer::new([
"yamux",
"libp2p_gossipsub",
"multistream_select",
"netlink_proto",
"libp2p_core::upgrade::apply",
])))
.ok();
log::set_max_level(log::LevelFilter::max());
migration::migrate_if_necessary(&working_dir)?;
let host = Host::new(working_dir.clone()).context("creating host interface")?;
logging.set_log_level(host.get_settings().admin.log_levels.node.clone())?;
join_handles.push(logging.spawn().context("spawning logger")?);
let node_id = host.get_or_create_node_id().context("getting node ID")?;
tracing::debug!("NodeID: {}", node_id);
match runtime {
Runtime::Android { ffi_sink } => {
let (runtime_tx, runtime_rx) = bounded_channel();
components.push((Android::get_type().into(), ComponentChannel::Android(runtime_tx)));
let android = Android::new(node_tx.clone(), runtime_rx, ffi_sink);
join_handles.push(android.spawn()?);
}
Runtime::Linux | Runtime::Windows => {}
};
let keystore = host.get_keystore();
let node_cycle_count = host.get_cycle_count().context("getting cycle count")?;
let node = NodeWrapper::new((node_tx, node_rx), components, host).context("creating node core")?;
let node_api = {
let keypair = keystore
.read()
.get_pair(node_id.into())
.context("No keypair registered for node")?;
NodeApi::new(
node_id,
keypair.into(),
node.tx.clone(),
bind_to.admin.clone(),
nodeapi_rx,
working_dir.join("store"),
store_tx,
)
};
join_handles.push(node_api.spawn().context("spawning node API")?);
let store = Store::new(
store_rx,
event_store,
working_dir.join("store"),
bind_to,
keystore,
node_id,
node_cycle_count,
swarm_observer_ref.contramap(SwarmObserver::from),
swarm_state,
)
.context("creating event store")?;
join_handles.push(store.spawn().context("spawning event store")?);
init_panic_hook(node.tx.clone());
Ok(ApplicationState {
join_handles,
manager: node,
_actors: actors,
#[cfg(not(target_os = "android"))]
_lock,
})
}
pub type NodeLifecycleResult = Receiver<NodeProcessResult<()>>;
#[derive(Debug, Clone)]
pub struct BindTo {
pub admin: SocketAddrHelper,
pub swarm: SocketAddrHelper,
pub api: SocketAddrHelper,
}
impl Default for BindTo {
fn default() -> Self {
Self {
admin: SocketAddrHelper::unspecified(4458).expect("unspecified can only fail for port 0"),
swarm: SocketAddrHelper::unspecified(4001).expect("unspecified can only fail for port 0"),
api: "localhost:4454".parse().unwrap(),
}
}
}
impl BindTo {
pub fn random() -> anyhow::Result<Self> {
Ok(Self {
admin: SocketAddrHelper::unspecified(0)?,
swarm: SocketAddrHelper::unspecified(0)?,
api: "localhost:0".parse()?,
})
}
}
#[derive(Debug, Clone)]
pub enum PortOrHostPort<const DEFAULT: u16> {
Port(u16),
HostPort(SocketAddrHelper),
}
impl<const N: u16> FromStr for PortOrHostPort<N> {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
parse_port_maybe_host::<N>(s)
}
}
fn parse_port_maybe_host<const N: u16>(src: &str) -> Result<PortOrHostPort<N>, String> {
let port = match src.parse::<u16>() {
Ok(p) => return Ok(PortOrHostPort::Port(p)),
Err(e) => e,
};
let host_string = match SocketAddrHelper::from_host_string(src) {
Ok(p) => return Ok(PortOrHostPort::HostPort(p)),
Err(e) => e,
};
let multiaddr = match SocketAddrHelper::parse_multiaddr(src) {
Ok(m) => return Ok(PortOrHostPort::HostPort(m)),
Err(e) => e,
};
let sock_addr = match (src, N).to_socket_addrs() {
Ok(i) => return Ok(PortOrHostPort::HostPort(i.collect())),
Err(e) => e,
};
Err(format!(
"cannot interpret `{}`:\n as port number: {:#}\n as <host:port>: {:#}\
\n as multiaddr: {:#}\n as IP or name: {:#}",
src, port, host_string, multiaddr, sock_addr
))
}
impl ApplicationState {
pub fn spawn(
base_dir: PathBuf,
runtime: Runtime,
bind_to: BindTo,
log_no_color: bool,
log_as_json: bool,
) -> anyhow::Result<Self> {
spawn(base_dir, runtime, bind_to, log_no_color, log_as_json).context("spawning core infrastructure")
}
pub fn handle_settings_request(&self, message: SettingsRequest) {
self.manager.tx.send(ExternalEvent::SettingsRequest(message)).unwrap()
}
pub fn shutdown(&mut self, reason: ShutdownReason) {
let _ = self.manager.tx.send(ExternalEvent::ShutdownRequested(reason));
for h in self.join_handles.drain(..) {
tracing::debug!(
"Waiting for thread (ID: \"{:?}\", Name: \"{}\") to join",
h.thread().id(),
h.thread().name().unwrap_or("Unknown")
);
h.join().unwrap();
}
}
}
impl Drop for ApplicationState {
fn drop(&mut self) {
self.shutdown(ShutdownReason::TriggeredByHost);
}
}