modality_network_node/
node.rs

1use crate::config::Config;
2use anyhow::Result;
3use libp2p::swarm::SwarmEvent;
4use libp2p::{Multiaddr, PeerId};
5
6use std::path::PathBuf;
7use libp2p::multiaddr::Protocol;
8use futures::future::{select, Either};
9use libp2p::futures::StreamExt;
10use std::time::Duration;
11use libp2p::request_response;
12
13pub struct Node {
14    pub peerid: libp2p_identity::PeerId,
15    pub node_keypair: libp2p_identity::Keypair,
16    pub listeners: Vec<Multiaddr>,
17    pub bootstrappers: Vec<Multiaddr>,
18    pub swarm: crate::swarm::NodeSwarm
19}
20
21impl Node {
22    pub async fn from_config_filepath(config_filepath: PathBuf) -> Result<Node> {
23        let config = Config::from_filepath(&config_filepath)?;
24        Node::from_config(config).await
25    }
26
27    pub async fn from_config(config: Config) -> Result<Node> {
28        let node_keypair = config.get_libp2p_keypair().await?;
29        let peerid = node_keypair.public().to_peer_id();
30        let listeners = config.listeners.unwrap();
31        let bootstrappers = exclude_multiaddresses_with_peerid(config.bootstrappers.unwrap(), peerid);
32        let swarm = crate::swarm::create_swarm(node_keypair.clone()).await?;
33        let node = Self { peerid, node_keypair, listeners, bootstrappers, swarm };
34        Ok(node)
35    }
36
37    pub async fn setup(&mut self) -> Result<()> {
38        // node.attach_storage(config.storage_path);
39        for listener in self.listeners.clone() {
40            self.swarm.listen_on(listener)?;
41        }
42        Ok(())
43    }
44
45    pub async fn run(&mut self) -> Result<()> {
46        let tick_interval: Duration = Duration::from_secs(15);
47        let mut tick = futures_timer::Delay::new(tick_interval);
48
49        loop {
50            match select(self.swarm.next(), &mut tick).await {
51                Either::Left((event, _)) => match event.unwrap() {
52                    SwarmEvent::NewListenAddr { address, .. } => {
53                        let address_with_p2p = address.clone().with(libp2p::multiaddr::Protocol::P2p(self.peerid));
54                        log::info!("Listening on {address_with_p2p:?}")
55                    }
56                    SwarmEvent::ConnectionEstablished { .. } => {
57                        // if peer_id == target_peer_id {
58                        //     log::debug!("Connected to peer {:?}", peer_id);
59                        //     // do we ever need to wait for correct transport upgrade event?
60                        //     // tokio::time::sleep(std::time::Duration::from_secs(1)).await;
61                        //     break;
62                        // }
63                    }
64                    SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
65                        if let Some(peer_id) = peer_id {
66                            log::error!("Failed to dial peer {:?}", peer_id);
67                            log::error!("Error: {:?}", error);
68                            anyhow::bail!("Failed to dial peer");
69                        }
70                    }
71                    SwarmEvent::Behaviour(crate::swarm::NodeBehaviourEvent::Reqres(
72                        request_response::Event::Message { message, .. },
73                    )) => match message {
74                        request_response::Message::Request {
75                            request,
76                            channel,
77                            .. // request, channel, ..
78                        } => {
79                            log::info!("reqres request");
80                            let res = crate::reqres::handle_request(request).await?;
81                            self.swarm.behaviour_mut().reqres.send_response(channel, res).expect("failed to respond")
82                        }
83                        request_response::Message::Response {
84                            ..
85                            // request_id,
86                            // response,
87                        } => {
88                            log::info!("reqres response")
89                        }
90                    },
91                    // SwarmEvent::Behaviour(event) => {
92                    //     log::info!("SwarmEvent::Behaviour event {:?}", event);
93                    //     match event {
94                    //         swarm::BehaviourEvent::Identify(_) => {
95                    //             log::info!("Identify Behaviour event");
96                    //         }
97                    //         swarm::BehaviourEvent::Ping(_) => {
98                    //             log::info!("Ping Behaviour event");
99                    //         }
100                    //         swarm::BehaviourEvent::Stream(_) => {
101                    //             log::info!("Stream Behaviour event");
102                    //         }
103                    //         swarm::BehaviourEvent::Reqres(_) => {
104                    //             log::info!("Reqres Behaviour event");
105                    //         }
106                    //         // _ => {
107                    //         //     log::info!("Other Swarm Behaviour event {:?}", event);
108                    //         // }
109                    //     }
110                    // }
111                    event => {
112                        log::info!("Other Node Event {:?}", event)
113                    },
114                },
115                Either::Right(_) => {
116                    log::debug!("tick");
117                    tick = futures_timer::Delay::new(tick_interval);
118                }
119            }
120        }
121    }
122}
123
124pub fn exclude_multiaddresses_with_peerid(ma: Vec<Multiaddr>, peerid: PeerId) -> Vec<Multiaddr> {
125    ma.into_iter()
126        .filter(|addr| {
127            if let Some(Protocol::P2p(addr_peerid)) = addr.iter().last() {
128                addr_peerid != peerid
129            } else {
130                true
131            }
132        })
133        .collect()
134}