use std::{convert::Infallible, sync::Arc, time::Duration};
use futures::{FutureExt, future::BoxFuture};
use tokio::task::JoinHandle;
use tracing::Instrument;
use super::{
NetEventRegister, PeerId,
network_bridge::{
EventLoopNotificationsReceiver, event_loop_notification_channel, p2p_protoc::P2pConnManager,
},
};
use crate::{
client_events::client_event_handling,
ring::{ConnectionManager, Location},
};
use crate::{
client_events::{BoxedClient, combinator::ClientEventsCombinator},
config::GlobalExecutor,
contract::{
self, ContractHandler, ContractHandlerChannel, ExecutorToEventLoopChannel,
NetworkEventListenerHalve, WaitingResolution, mediator_channels, run_op_request_mediator,
},
message::NodeEvent,
node::NodeConfig,
operations::connect,
};
use super::{OpManager, background_task_monitor::BackgroundTaskMonitor};
pub(crate) struct NodeP2P {
pub(crate) op_manager: Arc<OpManager>,
pub(super) conn_manager: P2pConnManager,
pub(super) peer_id: Option<PeerId>,
pub(super) is_gateway: bool,
pub(super) location: Option<Location>,
notification_channel: EventLoopNotificationsReceiver,
client_wait_for_transaction: ContractHandlerChannel<WaitingResolution>,
executor_listener: ExecutorToEventLoopChannel<NetworkEventListenerHalve>,
node_controller: tokio::sync::mpsc::Receiver<NodeEvent>,
should_try_connect: bool,
client_events_task: BoxFuture<'static, anyhow::Error>,
contract_executor_task: BoxFuture<'static, anyhow::Error>,
initial_join_task: Option<JoinHandle<()>>,
session_actor_task: JoinHandle<()>,
result_router_task: JoinHandle<()>,
op_mediator_task: JoinHandle<()>,
background_task_monitor: BackgroundTaskMonitor,
}
impl NodeP2P {
async fn aggressive_initial_connections_impl(
op_manager: &Arc<OpManager>,
min_connections: usize,
) {
tracing::info!(
"Starting aggressive connection acquisition phase (target: {} connections)",
min_connections
);
let start = tokio::time::Instant::now();
let max_duration = Duration::from_secs(10);
let mut last_connection_count = 0;
while start.elapsed() < max_duration {
tokio::task::yield_now().await;
let current_connections = op_manager.ring.open_connections();
if current_connections >= min_connections {
tracing::info!(
"Reached minimum connections target: {}/{}",
current_connections,
min_connections
);
break;
}
if current_connections != last_connection_count {
tracing::info!(
"Connection progress: {}/{} (elapsed: {}s)",
current_connections,
min_connections,
start.elapsed().as_secs()
);
last_connection_count = current_connections;
} else {
tracing::debug!(
"Current connections: {}/{}, waiting for more peers (elapsed: {}s)",
current_connections,
min_connections,
start.elapsed().as_secs()
);
}
let sleep_duration = if start.elapsed() < Duration::from_secs(3) {
Duration::from_millis(250)
} else {
Duration::from_millis(500)
};
tokio::time::sleep(sleep_duration).await;
}
let final_connections = op_manager.ring.open_connections();
tracing::info!(
"Aggressive connection phase complete. Final connections: {}/{} (took {}s)",
final_connections,
min_connections,
start.elapsed().as_secs()
);
}
pub(super) async fn run_node(mut self) -> anyhow::Result<Infallible> {
let start_time = tokio::time::Instant::now();
let gateway_addrs: std::collections::HashSet<std::net::SocketAddr> = self
.conn_manager
.gateways
.iter()
.filter_map(|g| g.socket_addr())
.collect();
super::network_status::init(
self.conn_manager.listening_port(),
gateway_addrs,
crate::config::PCK_VERSION.to_string(),
);
if let Some(event) = crate::tracing::NetEventLog::peer_startup(
&self.op_manager.ring,
crate::config::PCK_VERSION.to_string(),
None, None, ) {
use either::Either;
self.op_manager
.ring
.register_events(Either::Left(event))
.await;
tracing::info!(
version = crate::config::PCK_VERSION,
is_gateway = self.op_manager.ring.is_gateway(),
"Peer startup event emitted"
);
}
if self.should_try_connect {
let join_handle = connect::initial_join_procedure(
self.op_manager.clone(),
&self.conn_manager.gateways,
)
.await?;
self.initial_join_task = Some(join_handle);
}
let aggressive_conn_task = if self.should_try_connect {
let op_manager = self.op_manager.clone();
let min_connections = op_manager.ring.connection_manager.min_connections;
Some(GlobalExecutor::spawn(async move {
Self::aggressive_initial_connections_impl(&op_manager, min_connections).await;
}))
} else {
None
};
let f = self.conn_manager.run_event_listener(
self.op_manager.clone(),
self.client_wait_for_transaction,
self.notification_channel,
self.executor_listener,
self.node_controller,
);
let session_abort = self.session_actor_task.abort_handle();
let router_abort = self.result_router_task.abort_handle();
let mediator_abort = self.op_mediator_task.abort_handle();
let infra_monitor = {
let mut session_handle = self.session_actor_task;
let mut router_handle = self.result_router_task;
let mut mediator_handle = self.op_mediator_task;
async move {
fn join_result_to_error(
name: &str,
r: Result<(), tokio::task::JoinError>,
) -> anyhow::Error {
match r {
Err(e) if e.is_panic() => anyhow::anyhow!("{name} panicked: {e}"),
Err(e) => anyhow::anyhow!("{name} task failed: {e}"),
Ok(()) => anyhow::anyhow!("{name} exited unexpectedly"),
}
}
let e: anyhow::Error = tokio::select! {
biased;
r = &mut session_handle => join_result_to_error("Session actor", r),
r = &mut router_handle => join_result_to_error("Result router", r),
r = &mut mediator_handle => join_result_to_error("Op mediator", r),
};
e
}
};
let background_monitor = self.background_task_monitor.wait_for_any_exit();
let join_task = self.initial_join_task.take();
let result = crate::deterministic_select! {
r = f => {
let Err(e) = r;
eprintln!("CRITICAL: Network event listener exited: {e}");
tracing::error!("Network event listener exited: {}", e);
Err(e)
},
e = self.client_events_task => {
eprintln!("CRITICAL: Client events task exited: {e}");
tracing::error!("Client events task exited: {:?}", e);
Err(e)
},
e = self.contract_executor_task => {
eprintln!("CRITICAL: Contract executor task exited: {e}");
tracing::error!("Contract executor task exited: {:?}", e);
Err(e)
},
e = infra_monitor => {
eprintln!("CRITICAL: Infrastructure task exited: {e}");
tracing::error!("Infrastructure task exited: {:?}", e);
Err(e)
},
e = background_monitor => {
eprintln!("CRITICAL: Background task exited: {e}");
tracing::error!("Background task exited: {:?}", e);
Err(e)
},
};
if let Some(handle) = join_task {
handle.abort();
}
if let Some(handle) = aggressive_conn_task {
handle.abort();
}
session_abort.abort();
router_abort.abort();
mediator_abort.abort();
let (graceful, reason) = match &result {
Ok(_) => (true, None),
Err(e) => (false, Some(e.to_string())),
};
if let Some(event) = crate::tracing::NetEventLog::peer_shutdown(
&self.op_manager.ring,
graceful,
reason.clone(),
start_time,
) {
use either::Either;
self.op_manager
.ring
.register_events(Either::Left(event))
.await;
tracing::info!(
graceful,
reason = reason.as_deref().unwrap_or("clean exit"),
uptime_secs = start_time.elapsed().as_secs(),
"Peer shutdown event emitted"
);
}
result
}
pub(crate) async fn build<CH, const CLIENTS: usize, ER>(
config: NodeConfig,
clients: [BoxedClient; CLIENTS],
event_register: ER,
ch_builder: CH::Builder,
) -> anyhow::Result<(Self, tokio::sync::mpsc::Sender<NodeEvent>)>
where
CH: ContractHandler + Send + 'static,
ER: NetEventRegister + Clone,
{
let (notification_channel, notification_tx) = event_loop_notification_channel();
let (mut ch_outbound, ch_inbound, wait_for_event) = contract::contract_handler_channel();
let (client_responses, cli_response_sender) = contract::client_responses_channel();
let (session_tx, session_rx) = tokio::sync::mpsc::channel(1000);
ch_outbound.with_session_adapter(session_tx.clone());
let (result_router_tx, result_router_rx) = tokio::sync::mpsc::channel(1000);
use crate::client_events::session_actor::SessionActor;
let session_actor = SessionActor::new(session_rx, cli_response_sender.clone());
let session_actor_task = GlobalExecutor::spawn(async move {
tracing::info!("Session actor starting");
session_actor.run().await;
tracing::warn!("Session actor stopped");
});
use crate::client_events::result_router::ResultRouter;
let router = ResultRouter::new(result_router_rx, session_tx.clone());
let result_router_task = GlobalExecutor::spawn(async move {
tracing::info!("Result router starting");
router.run().await;
tracing::warn!("Result router stopped");
});
tracing::info!("Actor-based client management infrastructure installed with result router");
let background_task_monitor = BackgroundTaskMonitor::new();
let connection_manager = ConnectionManager::new(&config);
let op_manager = Arc::new(OpManager::new(
notification_tx,
ch_outbound,
&config,
event_register.clone(),
connection_manager,
result_router_tx,
&background_task_monitor,
)?);
op_manager.ring.attach_op_manager(&op_manager);
let (op_request_receiver, op_sender) = contract::op_request_channel();
let (executor_listener, to_event_loop_tx, from_event_loop_rx) =
mediator_channels(op_manager.clone());
let op_mediator_task = GlobalExecutor::spawn({
let mediator_task =
run_op_request_mediator(op_request_receiver, to_event_loop_tx, from_event_loop_rx);
mediator_task.instrument(tracing::info_span!("op_request_mediator"))
});
let contract_handler = CH::build(ch_inbound, op_sender, op_manager.clone(), ch_builder)
.await
.map_err(|e| anyhow::anyhow!(e))?;
let conn_manager =
P2pConnManager::build(&config, op_manager.clone(), event_register).await?;
let parent_span = tracing::Span::current();
let contract_executor_task = GlobalExecutor::spawn({
let task = async move {
tracing::info!("Contract executor task starting");
let result = contract::contract_handling(
contract_handler,
crate::contract::user_input::DashboardPrompter::new(
crate::contract::user_input::pending_prompts(),
),
)
.await;
match &result {
Ok(_) => tracing::warn!("Contract executor task exiting normally (unexpected)"),
Err(e) => tracing::error!("Contract executor task exiting with error: {e}"),
}
result
};
task.instrument(tracing::info_span!(parent: parent_span.clone(), "contract_handling"))
})
.map(|r| match r {
Ok(Err(e)) => anyhow::anyhow!("Error in contract handling task: {e}"),
Ok(Ok(_)) => anyhow::anyhow!("Contract handling task exited unexpectedly"),
Err(e) => anyhow::anyhow!(e),
})
.boxed();
let clients = ClientEventsCombinator::new(clients).with_slot_names(&["http", "websocket"]);
let (node_controller_tx, node_controller_rx) = tokio::sync::mpsc::channel(1);
let shutdown_tx = node_controller_tx.clone();
let client_events_task = GlobalExecutor::spawn({
let op_manager_clone = op_manager.clone();
let task = async move {
tracing::info!("Client events task starting");
let result = client_event_handling(
op_manager_clone,
clients,
client_responses,
node_controller_tx,
)
.await;
tracing::warn!("Client events task exiting (unexpected)");
result
};
task.instrument(tracing::info_span!(parent: parent_span, "client_event_handling"))
})
.map(|r| match r {
Ok(_) => anyhow::anyhow!("Client event handling task exited unexpectedly"),
Err(e) => anyhow::anyhow!(e),
})
.boxed();
Ok((
NodeP2P {
conn_manager,
notification_channel,
client_wait_for_transaction: wait_for_event,
op_manager,
executor_listener,
node_controller: node_controller_rx,
should_try_connect: config.should_connect,
peer_id: None, is_gateway: config.is_gateway,
location: config.location,
client_events_task,
contract_executor_task,
initial_join_task: None,
session_actor_task,
result_router_task,
op_mediator_task,
background_task_monitor,
},
shutdown_tx,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_join_handle_detects_panic() {
let handle: JoinHandle<()> = tokio::spawn(async {
panic!("intentional test panic");
});
let result = handle.await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.is_panic(),
"JoinError should indicate a panic, got: {err}"
);
}
#[tokio::test]
async fn test_join_handle_detects_clean_exit() {
let handle: JoinHandle<()> = tokio::spawn(async {
});
let result = handle.await;
assert!(result.is_ok(), "Clean task exit should produce Ok");
}
#[tokio::test]
async fn test_select_catches_first_panicked_task() {
let mut h1: JoinHandle<()> = tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(60)).await;
});
let mut h2: JoinHandle<()> = tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(60)).await;
});
let mut h3: JoinHandle<()> = tokio::spawn(async {
tokio::time::sleep(Duration::from_millis(10)).await;
panic!("task 3 panicked");
});
let result: anyhow::Result<()> = tokio::select! {
biased;
r = &mut h1 => match r {
Err(e) if e.is_panic() => Err(anyhow::anyhow!("task 1 panicked: {e}")),
Err(e) => Err(anyhow::anyhow!("task 1 failed: {e}")),
Ok(()) => Err(anyhow::anyhow!("task 1 exited")),
},
r = &mut h2 => match r {
Err(e) if e.is_panic() => Err(anyhow::anyhow!("task 2 panicked: {e}")),
Err(e) => Err(anyhow::anyhow!("task 2 failed: {e}")),
Ok(()) => Err(anyhow::anyhow!("task 2 exited")),
},
r = &mut h3 => match r {
Err(e) if e.is_panic() => Err(anyhow::anyhow!("task 3 panicked: {e}")),
Err(e) => Err(anyhow::anyhow!("task 3 failed: {e}")),
Ok(()) => Err(anyhow::anyhow!("task 3 exited")),
},
};
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("task 3 panicked"),
"Should catch the panicking task, got: {err_msg}"
);
h1.abort();
h2.abort();
}
}