use crate::ant_protocol::{ChunkMessage, ChunkMessageBody, CHUNK_PROTOCOL_ID};
use crate::logging::{debug, warn};
use saorsa_core::identity::PeerId;
use saorsa_core::{MultiAddr, P2PEvent, P2PNode};
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
use tokio::time::Instant;
#[allow(clippy::too_many_arguments)]
pub async fn send_and_await_chunk_response<T, E>(
node: &P2PNode,
target_peer: &PeerId,
message_bytes: Vec<u8>,
request_id: u64,
timeout: Duration,
peer_addrs: &[MultiAddr],
response_handler: impl Fn(ChunkMessageBody) -> Option<Result<T, E>>,
send_error: impl FnOnce(String) -> E,
timeout_error: impl FnOnce() -> E,
) -> Result<T, E> {
let mut events = node.subscribe_events();
node.send_message(target_peer, CHUNK_PROTOCOL_ID, message_bytes, peer_addrs)
.await
.map_err(|e| send_error(e.to_string()))?;
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(Instant::now());
match tokio::time::timeout(remaining, events.recv()).await {
Ok(Ok(P2PEvent::Message {
topic,
source: Some(source),
data,
})) if topic == CHUNK_PROTOCOL_ID && source == *target_peer => {
let response = match ChunkMessage::decode(&data) {
Ok(r) => r,
Err(e) => {
warn!("Failed to decode chunk message, skipping: {e}");
continue;
}
};
if response.request_id != request_id {
continue;
}
if let Some(result) = response_handler(response.body) {
return result;
}
}
Ok(Ok(_)) => {}
Ok(Err(RecvError::Lagged(skipped))) => {
debug!("Chunk protocol events lagged by {skipped} messages, continuing");
}
Ok(Err(RecvError::Closed)) | Err(_) => break,
}
}
Err(timeout_error())
}