modality_network_node/actions/
request.rs1use crate::swarm;
2use crate::reqres;
3use crate::node::Node;
4
5use anyhow::{Result};
6use futures::prelude::*;
7use libp2p::multiaddr::Multiaddr;
8use libp2p::swarm::SwarmEvent;
9use libp2p::request_response;
10
11pub async fn run(node: &mut Node, target: String, path: String, data: String) -> Result<()> {
12 let ma = target.parse::<Multiaddr>().unwrap();
13
14 let Some(libp2p::multiaddr::Protocol::P2p(target_peer_id)) = ma.iter().last() else {
15 anyhow::bail!("Provided address must end in `/p2p` and include PeerID");
16 };
17
18 node.swarm.dial(ma.clone())?;
19
20 loop {
21 match node.swarm.select_next_some().await {
22 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
23 if peer_id == target_peer_id {
24 log::debug!("Connected to peer {:?}", peer_id);
25 break;
28 }
29 }
30 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
31 if let Some(peer_id) = peer_id {
32 log::error!("Failed to dial peer {:?}", peer_id);
33 log::error!("Error: {:?}", error);
34 anyhow::bail!("Failed to dial peer");
35 }
36 }
37 event => {
38 log::debug!("Other Event {:?}", event)
39 }
40 }
41 }
42
43 let request = reqres::Request {
44 path: path.clone().to_string(),
45 data: Some(serde_json::json!(data.clone())),
46 };
47 let target_request_id = node.swarm
48 .behaviour_mut()
49 .reqres
50 .send_request(&target_peer_id, request);
51
52 let _channel = loop {
53 futures::select!(
54 event = node.swarm.select_next_some() => match event {
55 SwarmEvent::Behaviour(swarm::NodeBehaviourEvent::Reqres(
56 request_response::Event::Message {
57 message: request_response::Message::Response { response, request_id, .. },
58 ..
59 }
60 )) => {
61 if target_request_id == request_id {
62 log::debug!("response: {}", serde_json::to_string_pretty(&response).unwrap());
63 break;
64 }
65 }
66 _ => {}
67 }
68 )
69 };
70
71 let _ = node.swarm.disconnect_peer_id(target_peer_id);
72
73 loop {
74 match node.swarm.select_next_some().await {
75 SwarmEvent::ConnectionClosed { peer_id, .. } => {
76 if peer_id == target_peer_id {
77 log::debug!("Connection closed with peer {:?}", peer_id);
78 break;
79 }
80 }
81 _ => {}
82 }
83 }
84
85 Ok(())
86}