ant_protocol/
chunk_protocol.rs1use crate::chunk::{ChunkMessage, ChunkMessageBody, CHUNK_PROTOCOL_ID};
7use crate::logging::{debug, warn};
8use saorsa_core::identity::PeerId;
9use saorsa_core::{MultiAddr, P2PEvent, P2PNode};
10use std::time::Duration;
11use tokio::sync::broadcast::error::RecvError;
12use tokio::time::Instant;
13
14#[allow(clippy::too_many_arguments)]
31pub async fn send_and_await_chunk_response<T, E>(
32 node: &P2PNode,
33 target_peer: &PeerId,
34 message_bytes: Vec<u8>,
35 request_id: u64,
36 timeout: Duration,
37 peer_addrs: &[MultiAddr],
38 response_handler: impl Fn(ChunkMessageBody) -> Option<Result<T, E>>,
39 send_error: impl FnOnce(String) -> E,
40 timeout_error: impl FnOnce() -> E,
41) -> Result<T, E> {
42 let mut events = node.subscribe_events();
44
45 node.send_message(target_peer, CHUNK_PROTOCOL_ID, message_bytes, peer_addrs)
46 .await
47 .map_err(|e| send_error(e.to_string()))?;
48
49 let deadline = Instant::now()
53 .checked_add(timeout)
54 .unwrap_or_else(Instant::now);
55
56 while Instant::now() < deadline {
57 let remaining = deadline.saturating_duration_since(Instant::now());
58 match tokio::time::timeout(remaining, events.recv()).await {
59 Ok(Ok(P2PEvent::Message {
60 topic,
61 source: Some(source),
62 data,
63 })) if topic == CHUNK_PROTOCOL_ID && source == *target_peer => {
64 let response = match ChunkMessage::decode(&data) {
65 Ok(r) => r,
66 Err(e) => {
67 warn!("Failed to decode chunk message, skipping: {e}");
68 continue;
69 }
70 };
71 if response.request_id != request_id {
72 continue;
73 }
74 if let Some(result) = response_handler(response.body) {
75 return result;
76 }
77 }
78 Ok(Ok(_)) => {}
79 Ok(Err(RecvError::Lagged(skipped))) => {
80 debug!("Chunk protocol events lagged by {skipped} messages, continuing");
81 }
82 Ok(Err(RecvError::Closed)) | Err(_) => break,
83 }
84 }
85
86 Err(timeout_error())
87}