freenet/node/testing_impl/
network.rs

1use crate::client_events::BoxedClient;
2use crate::contract::MemoryContractHandler;
3use crate::dev_tool::TransportPublicKey;
4use crate::node::p2p_impl::NodeP2P;
5use crate::node::{Node, ShutdownHandle};
6use crate::tracing::EventRegister;
7use anyhow::Error;
8use futures::SinkExt;
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use tokio::net::TcpStream;
12use tokio::sync::watch::{Receiver, Sender};
13use tokio::sync::Mutex;
14use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
15
16pub struct NetworkPeer {
17    pub id: String,
18    pub config: crate::node::NodeConfig,
19    pub ws_client: Option<Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
20    pub user_ev_controller: Arc<Sender<(u32, TransportPublicKey)>>,
21    pub receiver_ch: Arc<Receiver<(u32, TransportPublicKey)>>,
22}
23
24#[derive(Debug, Serialize, Deserialize)]
25pub enum PeerStatus {
26    PeerStarted(usize),
27    GatewayStarted(usize),
28    Error(String),
29}
30
31#[derive(Debug, Serialize, Deserialize)]
32pub enum PeerMessage {
33    Event(Vec<u8>),
34    Status(PeerStatus),
35    Info(String),
36}
37
38type PeerEventSender = Sender<(u32, TransportPublicKey)>;
39type PeerEventReceiver = Receiver<(u32, TransportPublicKey)>;
40
41impl NetworkPeer {
42    pub async fn new(peer_id: String) -> Result<Self, Error> {
43        let (ws_stream, _) = tokio_tungstenite::connect_async("ws://localhost:3000/v1/ws")
44            .await
45            .expect("Failed to connect to supervisor");
46
47        let config_url = format!("http://127.0.0.1:3000/v1/config/{peer_id}");
48        let response = reqwest::get(&config_url).await?;
49        let peer_config = response.json::<crate::node::NodeConfig>().await?;
50
51        tracing::debug!(?peer_config.network_listener_port, %peer_config.is_gateway, key = ?peer_config.key_pair.public(), "Received peer config");
52
53        let (user_ev_controller, receiver_ch): (PeerEventSender, PeerEventReceiver) =
54            tokio::sync::watch::channel((0, peer_config.key_pair.public().clone()));
55
56        Ok(NetworkPeer {
57            id: peer_id,
58            config: peer_config,
59            ws_client: Some(Arc::new(Mutex::new(ws_stream))),
60            user_ev_controller: Arc::new(user_ev_controller),
61            receiver_ch: Arc::new(receiver_ch),
62        })
63    }
64
65    /// Builds a node using the default backend connection manager.
66    pub async fn build<const CLIENTS: usize>(
67        &self,
68        identifier: String,
69        clients: [BoxedClient; CLIENTS],
70    ) -> anyhow::Result<Node> {
71        let event_register = {
72            #[cfg(feature = "trace-ot")]
73            {
74                use crate::tracing::OTEventRegister;
75                crate::tracing::CombinedRegister::new([
76                    Box::new(EventRegister::new(self.config.config.event_log())),
77                    Box::new(OTEventRegister::new()),
78                ])
79            }
80            #[cfg(not(feature = "trace-ot"))]
81            {
82                EventRegister::new(self.config.config.event_log())
83            }
84        };
85        let (node_inner, shutdown_tx) = NodeP2P::build::<MemoryContractHandler, CLIENTS, _>(
86            self.config.clone(),
87            clients,
88            event_register,
89            identifier,
90        )
91        .await?;
92        let shutdown_handle = ShutdownHandle { tx: shutdown_tx };
93        Ok(Node {
94            inner: node_inner,
95            shutdown_handle,
96        })
97    }
98
99    pub async fn send_peer_msg(&self, msg: PeerMessage) {
100        let serialized_msg: Vec<u8> = bincode::serialize(&msg).unwrap();
101        if let Some(ws_client) = self.ws_client.as_deref() {
102            ws_client
103                .lock()
104                .await
105                .send(tokio_tungstenite::tungstenite::protocol::Message::Binary(
106                    serialized_msg.into(),
107                ))
108                .await
109                .unwrap();
110        }
111    }
112}