use std::{collections::HashMap, sync::Arc};
use freenet_stdlib::prelude::*;
use tracing::Instrument;
use crate::{
client_events::ClientEventsProxy,
config::GlobalExecutor,
contract::{
self, ContractHandler, MemoryContractHandler, MockWasmContractHandler,
MockWasmHandlerBuilder, SimulationContractHandler, SimulationHandlerBuilder,
mediator_channels, op_request_channel, run_op_request_mediator,
},
node::{
EventLoopExitReason, NetEventRegister,
background_task_monitor::BackgroundTaskMonitor,
network_bridge::{event_loop_notification_channel, p2p_protoc::P2pConnManager},
op_state_manager::OpManager,
},
operations::connect,
ring::{ConnectionManager, PeerKeyLocation},
transport::in_memory_socket::{SimulationSocket, register_address_network},
wasm_runtime::MockStateStorage,
};
use super::Builder;
fn handle_event_loop_result(
result: Result<std::convert::Infallible, anyhow::Error>,
) -> anyhow::Result<()> {
match result {
Ok(_infallible) => Ok(()),
Err(e) => {
if e.downcast_ref::<EventLoopExitReason>()
.map(|r| matches!(r, EventLoopExitReason::GracefulShutdown))
.unwrap_or(false)
{
tracing::info!("Node exited via graceful shutdown");
Ok(())
} else {
Err(e)
}
}
}
}
impl<ER> Builder<ER> {
#[allow(dead_code)]
pub async fn run_node<UsrEv>(
self,
user_events: UsrEv,
parent_span: tracing::Span,
) -> anyhow::Result<()>
where
UsrEv: ClientEventsProxy + Send + 'static,
ER: NetEventRegister + Clone,
{
let gateways = self.config.get_gateways()?;
let (notification_channel, notification_tx) = event_loop_notification_channel();
let (ops_ch_channel, ch_channel, wait_for_event) = contract::contract_handler_channel();
let _guard = parent_span.enter();
let connection_manager = ConnectionManager::new(&self.config);
let (result_router_tx, _result_router_rx) = tokio::sync::mpsc::channel(100);
let task_monitor = BackgroundTaskMonitor::new();
let op_manager = Arc::new(OpManager::new(
notification_tx,
ops_ch_channel,
&self.config,
self.event_register.clone(),
connection_manager.clone(),
result_router_tx.clone(),
&task_monitor,
)?);
op_manager.ring.attach_op_manager(&op_manager);
std::mem::drop(_guard);
let (op_request_receiver, op_sender) = op_request_channel();
let (executor_listener, to_event_loop_tx, from_event_loop_rx) =
mediator_channels(op_manager.clone());
GlobalExecutor::spawn({
let mediator_task =
run_op_request_mediator(op_request_receiver, to_event_loop_tx, from_event_loop_rx);
mediator_task.instrument(tracing::info_span!("op_request_mediator"))
});
let contract_handler = MemoryContractHandler::build(
ch_channel,
op_sender,
op_manager.clone(),
self.contract_handler_name,
)
.await
.map_err(|e| anyhow::anyhow!(e))?;
GlobalExecutor::spawn(
contract::contract_handling(
contract_handler,
crate::contract::user_input::AutoApprovePrompter,
)
.instrument(tracing::info_span!(parent: parent_span.clone(), "contract_handling")),
);
let conn_manager = P2pConnManager::build(
&self.config,
op_manager.clone(),
self.event_register.clone(),
)
.await?;
append_contracts(&op_manager, self.contracts, self.contract_subscribers).await?;
let join_task = if !gateways.is_empty() && !self.config.is_gateway {
Some(connect::initial_join_procedure(op_manager.clone(), &gateways).await?)
} else {
None
};
let (client_responses, _cli_response_sender) = contract::client_responses_channel();
let (node_controller_tx, node_controller_rx) = tokio::sync::mpsc::channel(1);
GlobalExecutor::spawn({
let op_manager = op_manager.clone();
crate::client_events::client_event_handling(
op_manager,
user_events,
client_responses,
node_controller_tx,
)
.instrument(tracing::info_span!(parent: parent_span.clone(), "client_event_handling"))
});
let local_addr = std::net::SocketAddr::new(
self.config.network_listener_ip,
self.config.network_listener_port,
);
register_address_network(local_addr, &self.network_name);
let result = conn_manager
.run_event_listener_with_socket::<SimulationSocket>(
op_manager,
wait_for_event,
notification_channel,
executor_listener,
node_controller_rx,
)
.instrument(parent_span)
.await;
if let Some(handle) = join_task {
handle.abort();
let _join_result = handle.await;
}
handle_event_loop_result(result)
}
pub async fn run_node_with_shared_storage<UsrEv>(
self,
user_events: UsrEv,
parent_span: tracing::Span,
shared_storage: MockStateStorage,
) -> anyhow::Result<()>
where
UsrEv: ClientEventsProxy + Send + 'static,
ER: NetEventRegister + Clone,
{
let gateways = self.config.get_gateways()?;
let (notification_channel, notification_tx) = event_loop_notification_channel();
let (ops_ch_channel, ch_channel, wait_for_event) = contract::contract_handler_channel();
let _guard = parent_span.enter();
let connection_manager = ConnectionManager::new(&self.config);
if let Some(out) = &self.shared_cm {
*out.lock() = Some(connection_manager.clone());
}
let (result_router_tx, _result_router_rx) = tokio::sync::mpsc::channel(100);
let task_monitor = BackgroundTaskMonitor::new();
let op_manager = Arc::new(OpManager::new(
notification_tx,
ops_ch_channel,
&self.config,
self.event_register.clone(),
connection_manager.clone(),
result_router_tx.clone(),
&task_monitor,
)?);
op_manager.ring.attach_op_manager(&op_manager);
std::mem::drop(_guard);
let (op_request_receiver, op_sender) = op_request_channel();
let (executor_listener, to_event_loop_tx, from_event_loop_rx) =
mediator_channels(op_manager.clone());
GlobalExecutor::spawn({
let mediator_task =
run_op_request_mediator(op_request_receiver, to_event_loop_tx, from_event_loop_rx);
mediator_task.instrument(tracing::info_span!("op_request_mediator"))
});
let contract_handler = SimulationContractHandler::build(
ch_channel,
op_sender,
op_manager.clone(),
SimulationHandlerBuilder {
identifier: self.contract_handler_name,
shared_storage,
},
)
.await
.map_err(|e| anyhow::anyhow!(e))?;
GlobalExecutor::spawn(
contract::contract_handling(
contract_handler,
crate::contract::user_input::AutoApprovePrompter,
)
.instrument(tracing::info_span!(parent: parent_span.clone(), "contract_handling")),
);
let conn_manager = P2pConnManager::build(
&self.config,
op_manager.clone(),
self.event_register.clone(),
)
.await?;
append_contracts(&op_manager, self.contracts, self.contract_subscribers).await?;
let join_task = if !gateways.is_empty() && !self.config.is_gateway {
Some(connect::initial_join_procedure(op_manager.clone(), &gateways).await?)
} else {
None
};
let (client_responses, _cli_response_sender) = contract::client_responses_channel();
let (node_controller_tx, node_controller_rx) = tokio::sync::mpsc::channel(1);
GlobalExecutor::spawn({
let op_manager = op_manager.clone();
crate::client_events::client_event_handling(
op_manager,
user_events,
client_responses,
node_controller_tx,
)
.instrument(tracing::info_span!(parent: parent_span.clone(), "client_event_handling"))
});
let local_addr = std::net::SocketAddr::new(
self.config.network_listener_ip,
self.config.network_listener_port,
);
register_address_network(local_addr, &self.network_name);
let result = conn_manager
.run_event_listener_with_socket::<SimulationSocket>(
op_manager,
wait_for_event,
notification_channel,
executor_listener,
node_controller_rx,
)
.instrument(parent_span)
.await;
if let Some(handle) = join_task {
handle.abort();
let _join_result = handle.await;
}
handle_event_loop_result(result)
}
#[allow(dead_code)]
pub async fn run_node_with_mock_wasm<UsrEv>(
self,
user_events: UsrEv,
parent_span: tracing::Span,
shared_storage: MockStateStorage,
contract_store: Option<crate::wasm_runtime::InMemoryContractStore>,
) -> anyhow::Result<()>
where
UsrEv: ClientEventsProxy + Send + 'static,
ER: NetEventRegister + Clone,
{
let gateways = self.config.get_gateways()?;
let (notification_channel, notification_tx) = event_loop_notification_channel();
let (ops_ch_channel, ch_channel, wait_for_event) = contract::contract_handler_channel();
let _guard = parent_span.enter();
let connection_manager = ConnectionManager::new(&self.config);
if let Some(out) = &self.shared_cm {
*out.lock() = Some(connection_manager.clone());
}
let (result_router_tx, _result_router_rx) = tokio::sync::mpsc::channel(100);
let task_monitor = BackgroundTaskMonitor::new();
let op_manager = Arc::new(OpManager::new(
notification_tx,
ops_ch_channel,
&self.config,
self.event_register.clone(),
connection_manager.clone(),
result_router_tx.clone(),
&task_monitor,
)?);
op_manager.ring.attach_op_manager(&op_manager);
std::mem::drop(_guard);
let (op_request_receiver, op_sender) = op_request_channel();
let (executor_listener, to_event_loop_tx, from_event_loop_rx) =
mediator_channels(op_manager.clone());
GlobalExecutor::spawn({
let mediator_task =
run_op_request_mediator(op_request_receiver, to_event_loop_tx, from_event_loop_rx);
mediator_task.instrument(tracing::info_span!("op_request_mediator"))
});
let contract_handler = MockWasmContractHandler::build(
ch_channel,
op_sender,
op_manager.clone(),
MockWasmHandlerBuilder {
identifier: self.contract_handler_name,
shared_storage,
contract_store,
},
)
.await
.map_err(|e| anyhow::anyhow!(e))?;
GlobalExecutor::spawn(
contract::contract_handling(
contract_handler,
crate::contract::user_input::AutoApprovePrompter,
)
.instrument(tracing::info_span!(parent: parent_span.clone(), "contract_handling")),
);
let conn_manager = P2pConnManager::build(
&self.config,
op_manager.clone(),
self.event_register.clone(),
)
.await?;
append_contracts(&op_manager, self.contracts, self.contract_subscribers).await?;
let join_task = if !gateways.is_empty() && !self.config.is_gateway {
Some(connect::initial_join_procedure(op_manager.clone(), &gateways).await?)
} else {
None
};
let (client_responses, _cli_response_sender) = contract::client_responses_channel();
let (node_controller_tx, node_controller_rx) = tokio::sync::mpsc::channel(1);
GlobalExecutor::spawn({
let op_manager = op_manager.clone();
crate::client_events::client_event_handling(
op_manager,
user_events,
client_responses,
node_controller_tx,
)
.instrument(tracing::info_span!(parent: parent_span.clone(), "client_event_handling"))
});
let local_addr = std::net::SocketAddr::new(
self.config.network_listener_ip,
self.config.network_listener_port,
);
register_address_network(local_addr, &self.network_name);
let result = conn_manager
.run_event_listener_with_socket::<SimulationSocket>(
op_manager,
wait_for_event,
notification_channel,
executor_listener,
node_controller_rx,
)
.instrument(parent_span)
.await;
if let Some(handle) = join_task {
handle.abort();
let _join_result = handle.await;
}
handle_event_loop_result(result)
}
}
async fn append_contracts(
op_manager: &Arc<OpManager>,
contracts: Vec<(ContractContainer, WrappedState, bool)>,
_contract_subscribers: HashMap<ContractKey, Vec<PeerKeyLocation>>,
) -> anyhow::Result<()> {
use crate::contract::ContractHandlerEvent;
for (contract, state, subscription) in contracts {
let key: ContractKey = contract.key();
let state_size = state.size() as u64;
op_manager
.notify_contract_handler(ContractHandlerEvent::PutQuery {
key,
state,
related_contracts: RelatedContracts::default(),
contract: Some(contract),
})
.await?;
tracing::debug!(
"Appended contract {} to peer {}",
key,
op_manager.ring.connection_manager.get_own_addr().unwrap()
);
if subscription {
op_manager
.ring
.host_contract(key, state_size, crate::ring::AccessType::Put);
op_manager.ring.subscribe(key);
}
}
Ok(())
}