modality_network_node/
node.rs1use crate::config::Config;
2use anyhow::Result;
3use libp2p::swarm::SwarmEvent;
4use libp2p::{Multiaddr, PeerId};
5
6use std::path::PathBuf;
7use libp2p::multiaddr::Protocol;
8use futures::future::{select, Either};
9use libp2p::futures::StreamExt;
10use std::time::Duration;
11use libp2p::request_response;
12
13pub struct Node {
14 pub peerid: libp2p_identity::PeerId,
15 pub node_keypair: libp2p_identity::Keypair,
16 pub listeners: Vec<Multiaddr>,
17 pub bootstrappers: Vec<Multiaddr>,
18 pub swarm: crate::swarm::NodeSwarm
19}
20
21impl Node {
22 pub async fn from_config_filepath(config_filepath: PathBuf) -> Result<Node> {
23 let config = Config::from_filepath(&config_filepath)?;
24 Node::from_config(config).await
25 }
26
27 pub async fn from_config(config: Config) -> Result<Node> {
28 let node_keypair = config.get_libp2p_keypair().await?;
29 let peerid = node_keypair.public().to_peer_id();
30 let listeners = config.listeners.unwrap();
31 let bootstrappers = exclude_multiaddresses_with_peerid(config.bootstrappers.unwrap(), peerid);
32 let swarm = crate::swarm::create_swarm(node_keypair.clone()).await?;
33 let node = Self { peerid, node_keypair, listeners, bootstrappers, swarm };
34 Ok(node)
35 }
36
37 pub async fn setup(&mut self) -> Result<()> {
38 for listener in self.listeners.clone() {
40 self.swarm.listen_on(listener)?;
41 }
42 Ok(())
43 }
44
45 pub async fn run(&mut self) -> Result<()> {
46 let tick_interval: Duration = Duration::from_secs(15);
47 let mut tick = futures_timer::Delay::new(tick_interval);
48
49 loop {
50 match select(self.swarm.next(), &mut tick).await {
51 Either::Left((event, _)) => match event.unwrap() {
52 SwarmEvent::NewListenAddr { address, .. } => {
53 let address_with_p2p = address.clone().with(libp2p::multiaddr::Protocol::P2p(self.peerid));
54 log::info!("Listening on {address_with_p2p:?}")
55 }
56 SwarmEvent::ConnectionEstablished { .. } => {
57 }
64 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
65 if let Some(peer_id) = peer_id {
66 log::error!("Failed to dial peer {:?}", peer_id);
67 log::error!("Error: {:?}", error);
68 anyhow::bail!("Failed to dial peer");
69 }
70 }
71 SwarmEvent::Behaviour(crate::swarm::NodeBehaviourEvent::Reqres(
72 request_response::Event::Message { message, .. },
73 )) => match message {
74 request_response::Message::Request {
75 request,
76 channel,
77 .. } => {
79 log::info!("reqres request");
80 let res = crate::reqres::handle_request(request).await?;
81 self.swarm.behaviour_mut().reqres.send_response(channel, res).expect("failed to respond")
82 }
83 request_response::Message::Response {
84 ..
85 } => {
88 log::info!("reqres response")
89 }
90 },
91 event => {
112 log::info!("Other Node Event {:?}", event)
113 },
114 },
115 Either::Right(_) => {
116 log::debug!("tick");
117 tick = futures_timer::Delay::new(tick_interval);
118 }
119 }
120 }
121 }
122}
123
124pub fn exclude_multiaddresses_with_peerid(ma: Vec<Multiaddr>, peerid: PeerId) -> Vec<Multiaddr> {
125 ma.into_iter()
126 .filter(|addr| {
127 if let Some(Protocol::P2p(addr_peerid)) = addr.iter().last() {
128 addr_peerid != peerid
129 } else {
130 true
131 }
132 })
133 .collect()
134}