modality_network_node/actions/
request.rs

1use crate::swarm;
2use crate::reqres;
3use crate::node::Node;
4
5use anyhow::{Result};
6use futures::prelude::*;
7use libp2p::multiaddr::Multiaddr;
8use libp2p::swarm::SwarmEvent;
9use libp2p::request_response;
10
11pub async fn run(node: &mut Node, target: String, path: String, data: String) -> Result<()> {
12    let ma = target.parse::<Multiaddr>().unwrap();
13
14    let Some(libp2p::multiaddr::Protocol::P2p(target_peer_id)) = ma.iter().last() else {
15        anyhow::bail!("Provided address must end in `/p2p` and include PeerID");
16    };
17
18    node.swarm.dial(ma.clone())?;
19
20    loop {
21        match node.swarm.select_next_some().await {
22            SwarmEvent::ConnectionEstablished { peer_id, .. } => {
23                if peer_id == target_peer_id {
24                    log::debug!("Connected to peer {:?}", peer_id);
25                    // do we ever need to wait for correct transport upgrade event?
26                    // tokio::time::sleep(std::time::Duration::from_secs(1)).await;
27                    break;
28                }
29            }
30            SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
31                if let Some(peer_id) = peer_id {
32                    log::error!("Failed to dial peer {:?}", peer_id);
33                    log::error!("Error: {:?}", error);
34                    anyhow::bail!("Failed to dial peer");
35                }
36            }
37            event => {
38                log::debug!("Other Event {:?}", event)
39            }
40        }
41    }
42
43    let request = reqres::Request {
44        path: path.clone().to_string(),
45        data: Some(serde_json::json!(data.clone())),
46    };
47    let target_request_id = node.swarm
48        .behaviour_mut()
49        .reqres
50        .send_request(&target_peer_id, request);
51
52    let _channel = loop {
53        futures::select!(
54            event = node.swarm.select_next_some() => match event {
55              SwarmEvent::Behaviour(swarm::NodeBehaviourEvent::Reqres(
56                request_response::Event::Message {
57                  message: request_response::Message::Response { response, request_id, .. },
58                  ..
59                }
60              )) => {
61                if target_request_id == request_id {
62                  log::debug!("response: {}", serde_json::to_string_pretty(&response).unwrap());
63                  break;
64                }
65              }
66              _ => {}
67            }
68        )
69    };
70
71    let _ = node.swarm.disconnect_peer_id(target_peer_id);
72
73    loop {
74        match node.swarm.select_next_some().await {
75            SwarmEvent::ConnectionClosed { peer_id, .. } => {
76                if peer_id == target_peer_id {
77                    log::debug!("Connection closed with peer {:?}", peer_id);
78                    break;
79                }
80            }
81            _ => {}
82        }
83    }
84
85    Ok(())
86}