freenet/node/testing_impl/
network.rs1use crate::client_events::BoxedClient;
2use crate::contract::MemoryContractHandler;
3use crate::dev_tool::TransportPublicKey;
4use crate::node::p2p_impl::NodeP2P;
5use crate::node::{Node, ShutdownHandle};
6use crate::tracing::EventRegister;
7use anyhow::Error;
8use futures::SinkExt;
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use tokio::net::TcpStream;
12use tokio::sync::watch::{Receiver, Sender};
13use tokio::sync::Mutex;
14use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
15
16pub struct NetworkPeer {
17 pub id: String,
18 pub config: crate::node::NodeConfig,
19 pub ws_client: Option<Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
20 pub user_ev_controller: Arc<Sender<(u32, TransportPublicKey)>>,
21 pub receiver_ch: Arc<Receiver<(u32, TransportPublicKey)>>,
22}
23
24#[derive(Debug, Serialize, Deserialize)]
25pub enum PeerStatus {
26 PeerStarted(usize),
27 GatewayStarted(usize),
28 Error(String),
29}
30
31#[derive(Debug, Serialize, Deserialize)]
32pub enum PeerMessage {
33 Event(Vec<u8>),
34 Status(PeerStatus),
35 Info(String),
36}
37
38type PeerEventSender = Sender<(u32, TransportPublicKey)>;
39type PeerEventReceiver = Receiver<(u32, TransportPublicKey)>;
40
41impl NetworkPeer {
42 pub async fn new(peer_id: String) -> Result<Self, Error> {
43 let (ws_stream, _) = tokio_tungstenite::connect_async("ws://localhost:3000/v1/ws")
44 .await
45 .expect("Failed to connect to supervisor");
46
47 let config_url = format!("http://127.0.0.1:3000/v1/config/{peer_id}");
48 let response = reqwest::get(&config_url).await?;
49 let peer_config = response.json::<crate::node::NodeConfig>().await?;
50
51 tracing::debug!(?peer_config.network_listener_port, %peer_config.is_gateway, key = ?peer_config.key_pair.public(), "Received peer config");
52
53 let (user_ev_controller, receiver_ch): (PeerEventSender, PeerEventReceiver) =
54 tokio::sync::watch::channel((0, peer_config.key_pair.public().clone()));
55
56 Ok(NetworkPeer {
57 id: peer_id,
58 config: peer_config,
59 ws_client: Some(Arc::new(Mutex::new(ws_stream))),
60 user_ev_controller: Arc::new(user_ev_controller),
61 receiver_ch: Arc::new(receiver_ch),
62 })
63 }
64
65 pub async fn build<const CLIENTS: usize>(
67 &self,
68 identifier: String,
69 clients: [BoxedClient; CLIENTS],
70 ) -> anyhow::Result<Node> {
71 let event_register = {
72 #[cfg(feature = "trace-ot")]
73 {
74 use crate::tracing::OTEventRegister;
75 crate::tracing::CombinedRegister::new([
76 Box::new(EventRegister::new(self.config.config.event_log())),
77 Box::new(OTEventRegister::new()),
78 ])
79 }
80 #[cfg(not(feature = "trace-ot"))]
81 {
82 EventRegister::new(self.config.config.event_log())
83 }
84 };
85 let (node_inner, shutdown_tx) = NodeP2P::build::<MemoryContractHandler, CLIENTS, _>(
86 self.config.clone(),
87 clients,
88 event_register,
89 identifier,
90 )
91 .await?;
92 let shutdown_handle = ShutdownHandle { tx: shutdown_tx };
93 Ok(Node {
94 inner: node_inner,
95 shutdown_handle,
96 })
97 }
98
99 pub async fn send_peer_msg(&self, msg: PeerMessage) {
100 let serialized_msg: Vec<u8> = bincode::serialize(&msg).unwrap();
101 if let Some(ws_client) = self.ws_client.as_deref() {
102 ws_client
103 .lock()
104 .await
105 .send(tokio_tungstenite::tungstenite::protocol::Message::Binary(
106 serialized_msg.into(),
107 ))
108 .await
109 .unwrap();
110 }
111 }
112}