use crate::{
connection::{
net_connection::{NetHandler, NetSend, NetWorker, NetWorkerFactory},
net_connection_thread::NetConnectionThread,
NetResult,
},
in_memory::memory_worker::InMemoryWorker,
log_d, log_e,
p2p_config::*,
tweetlog::*,
};
use lib3h_protocol::{
protocol_client::Lib3hClientProtocol, protocol_server::Lib3hServerProtocol, Address,
};
use crate::sim2h_worker::Sim2hWorker;
use crossbeam_channel;
use holochain_conductor_lib_api::conductor_api::ConductorApi;
use holochain_json_api::json::JsonString;
use std::{convert::TryFrom, time::Duration};
pub type Lib3hClientProtocolWrapped = ht::EncodedSpanWrap<Lib3hClientProtocol>;
pub type Lib3hServerProtocolWrapped = ht::EncodedSpanWrap<Lib3hServerProtocol>;
const P2P_READY_TIMEOUT_MS: u64 = 5000;
#[derive(Clone)]
pub struct P2pNetwork {
connection: NetConnectionThread,
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_NET)]
impl P2pNetwork {
pub fn new(
mut handler: NetHandler,
p2p_config: P2pConfig,
agent_id: Option<Address>,
conductor_api: Option<ConductorApi>,
tracer: Option<ht::Tracer>,
) -> NetResult<Self> {
let backend_config_str = match &p2p_config.backend_config {
BackendConfig::Json(ref json) => JsonString::from_json(&json.to_string()),
_ => JsonString::from(""),
};
let p2p_config_str = p2p_config.clone().as_str();
let p2p_config2 = p2p_config.clone();
let worker_factory: NetWorkerFactory = match &p2p_config.clone().backend_kind {
P2pBackendKind::LIB3H => {
unimplemented!()
}
P2pBackendKind::GhostEngineMemory => Box::new(move |_h| {
unimplemented!()
}),
P2pBackendKind::LegacyInMemory => Box::new(move |h| {
Ok(Box::new(InMemoryWorker::new(h, &backend_config_str)?) as Box<dyn NetWorker>)
}),
P2pBackendKind::SIM2H => Box::new(move |h| {
let backend_config = match &p2p_config.clone().backend_config {
BackendConfig::Sim2h(config) => config.clone(),
_ => return Err(format_err!("mismatch backend type, expecting sim2h")),
};
Ok(Box::new(Sim2hWorker::new(
h,
backend_config,
agent_id
.clone()
.expect("Can't construct Sim2hWorker without agent ID"),
conductor_api
.clone()
.expect("Can't construct Sim2hWorker without conductor API"),
tracer.clone(),
)?) as Box<dyn NetWorker>)
}),
};
let (t, rx) = crossbeam_channel::unbounded();
let tx = t.clone();
let wrapped_handler = if Self::should_wait_for_p2p_ready(&p2p_config2.clone()) {
NetHandler::new(Box::new(move |message| {
let unwrapped = message.unwrap();
let message = unwrapped.clone();
match Lib3hServerProtocol::try_from(unwrapped.data.clone()) {
Ok(Lib3hServerProtocol::P2pReady) => {
tx.send(Lib3hServerProtocol::P2pReady).ok();
log_d!("net/p2p_network: sent P2pReady event")
}
Ok(_msg) => {}
Err(_protocol_error) => {
}
};
handler.handle(Ok(message))
}))
} else {
handler
};
let connection =
NetConnectionThread::new(wrapped_handler, worker_factory).map_err(|e| {
format_err!(
"Failed to obtain a connection to a p2p network module w/ config: {}: {} ",
p2p_config_str,
e
)
})?;
if Self::should_wait_for_p2p_ready(&p2p_config2.clone()) {
P2pNetwork::wait_p2p_ready(&rx);
}
Ok(P2pNetwork { connection })
}
fn should_wait_for_p2p_ready(p2p_config: &P2pConfig) -> bool {
match p2p_config.backend_kind {
P2pBackendKind::LIB3H
| P2pBackendKind::GhostEngineMemory
| P2pBackendKind::SIM2H
| P2pBackendKind::LegacyInMemory => false,
}
}
fn wait_p2p_ready(rx: &crossbeam_channel::Receiver<Lib3hServerProtocol>) {
let maybe_message = rx.recv_timeout(Duration::from_millis(P2P_READY_TIMEOUT_MS));
match maybe_message {
Ok(Lib3hServerProtocol::P2pReady) => log_d!("net/p2p_network: received P2pReady event"),
Ok(msg) => {
log_d!("net/p2p_network: received unexpected event: {:?}", msg);
}
Err(e) => {
log_e!("net/p2p_network: did not receive P2pReady: {:?}", e);
panic!(
"p2p network not ready within alloted time of {:?} ms",
P2P_READY_TIMEOUT_MS
);
}
};
}
pub fn stop(&mut self) {
self.connection.stop();
}
pub fn endpoint(&self) -> String {
self.connection.endpoint.clone()
}
pub fn p2p_endpoint(&self) -> url::Url {
self.connection.p2p_endpoint.clone()
}
}
impl std::fmt::Debug for P2pNetwork {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "P2pNetwork {{}}")
}
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_NET)]
impl NetSend for P2pNetwork {
fn send(&mut self, data: Lib3hClientProtocolWrapped) -> NetResult<()> {
self.connection.send(data)
}
}
#[cfg(test)]
mod tests {
use super::*;
use lib3h_protocol::{data_types::ConnectData, uri::Lib3hUri};
#[test]
fn it_should_create_memory_network() {
let p2p = P2pConfig::new_with_unique_memory_backend();
let handler = NetHandler::new(Box::new(|_r| Ok(())));
let mut res = P2pNetwork::new(handler.clone(), p2p, None, None, None).unwrap();
let connect_data = ConnectData {
request_id: "memory_network_req_id".into(),
peer_location: Lib3hUri::with_undefined(),
network_id: "test_net_id".into(),
};
handler
.to_owned()
.handle(Ok(ht::test_span()
.wrap(Lib3hServerProtocol::P2pReady)
.into()))
.unwrap();
res.send(
ht::test_span()
.wrap(Lib3hClientProtocol::Connect(connect_data))
.into(),
)
.unwrap();
res.stop();
}
}