Skip to main content

ant_protocol/
chunk_protocol.rs

1//! Shared helper for the chunk protocol request/response pattern.
2//!
3//! Extracts the duplicated "subscribe → send → poll event loop" into a single
4//! generic function used by both `ant-client` and `ant-node` E2E helpers.
5
6use crate::chunk::{ChunkMessage, ChunkMessageBody, CHUNK_PROTOCOL_ID};
7use crate::logging::{debug, warn};
8use saorsa_core::identity::PeerId;
9use saorsa_core::{MultiAddr, P2PEvent, P2PNode};
10use std::time::Duration;
11use tokio::sync::broadcast::error::RecvError;
12use tokio::time::Instant;
13
14/// Send a chunk-protocol message to `target_peer` and await a matching response.
15///
16/// The event loop filters by topic (`CHUNK_PROTOCOL_ID`), source peer, decode
17/// errors (warn + skip), and `request_id` mismatch (skip).
18///
19/// * `response_handler` — inspects the decoded [`ChunkMessageBody`] and returns:
20///   - `Some(Ok(T))` to resolve successfully,
21///   - `Some(Err(E))` to resolve with an error,
22///   - `None` to keep waiting (wrong variant / not our response).
23/// * `send_error` — produces the caller's error type when `send_message` fails.
24/// * `timeout_error` — produces the caller's error type on deadline expiry.
25///
26/// # Errors
27///
28/// Returns `Err(E)` if sending fails (via `send_error`), the `response_handler`
29/// returns a protocol-level error, or the deadline expires (via `timeout_error`).
30#[allow(clippy::too_many_arguments)]
31pub async fn send_and_await_chunk_response<T, E>(
32    node: &P2PNode,
33    target_peer: &PeerId,
34    message_bytes: Vec<u8>,
35    request_id: u64,
36    timeout: Duration,
37    peer_addrs: &[MultiAddr],
38    response_handler: impl Fn(ChunkMessageBody) -> Option<Result<T, E>>,
39    send_error: impl FnOnce(String) -> E,
40    timeout_error: impl FnOnce() -> E,
41) -> Result<T, E> {
42    // Subscribe before sending so we don't miss the response
43    let mut events = node.subscribe_events();
44
45    node.send_message(target_peer, CHUNK_PROTOCOL_ID, message_bytes, peer_addrs)
46        .await
47        .map_err(|e| send_error(e.to_string()))?;
48
49    // `Instant::now() + timeout` can panic on extreme durations; fall back
50    // to the current instant (immediate timeout) if the addition overflows
51    // rather than taking down a crate that denies panics.
52    let deadline = Instant::now()
53        .checked_add(timeout)
54        .unwrap_or_else(Instant::now);
55
56    while Instant::now() < deadline {
57        let remaining = deadline.saturating_duration_since(Instant::now());
58        match tokio::time::timeout(remaining, events.recv()).await {
59            Ok(Ok(P2PEvent::Message {
60                topic,
61                source: Some(source),
62                data,
63            })) if topic == CHUNK_PROTOCOL_ID && source == *target_peer => {
64                let response = match ChunkMessage::decode(&data) {
65                    Ok(r) => r,
66                    Err(e) => {
67                        warn!("Failed to decode chunk message, skipping: {e}");
68                        continue;
69                    }
70                };
71                if response.request_id != request_id {
72                    continue;
73                }
74                if let Some(result) = response_handler(response.body) {
75                    return result;
76                }
77            }
78            Ok(Ok(_)) => {}
79            Ok(Err(RecvError::Lagged(skipped))) => {
80                debug!("Chunk protocol events lagged by {skipped} messages, continuing");
81            }
82            Ok(Err(RecvError::Closed)) | Err(_) => break,
83        }
84    }
85
86    Err(timeout_error())
87}