#![allow(clippy::disallowed_types)]
use bytes::Bytes;
use super::handles::{BodyReader, BodyWriter};
pub(crate) const PUMP_READ_BUF: usize = 64 * 1024;
pub(crate) async fn pump_quic_recv_to_body(
mut recv: iroh::endpoint::RecvStream,
writer: BodyWriter,
) {
while let Ok(Some(chunk)) = recv.read_chunk(PUMP_READ_BUF).await {
if writer.send_chunk(chunk.bytes).await.is_err() {
break;
}
}
}
pub(crate) async fn pump_body_to_quic_send(
reader: BodyReader,
mut send: iroh::endpoint::SendStream,
) {
loop {
match reader.next_chunk().await {
None => break,
Some(data) => {
if send.write_all(&data).await.is_err() {
break;
}
}
}
}
let _ = send.finish();
}
#[allow(dead_code)]
pub(crate) async fn pump_hyper_body_to_channel<B>(body: B, writer: BodyWriter)
where
B: http_body::Body<Data = Bytes>,
B::Error: std::fmt::Debug,
{
let timeout = writer.drain_timeout;
pump_hyper_body_to_channel_limited(body, writer, None, timeout, None).await;
}
pub(crate) async fn pump_hyper_body_to_channel_limited<B>(
body: B,
writer: BodyWriter,
max_bytes: Option<usize>,
frame_timeout: std::time::Duration,
mut overflow_tx: Option<tokio::sync::oneshot::Sender<()>>,
) where
B: http_body::Body<Data = Bytes>,
B::Error: std::fmt::Debug,
{
use http_body_util::BodyExt;
let mut body = Box::pin(body);
let mut total = 0usize;
let mut overflowed = false;
loop {
let frame_result = match tokio::time::timeout(frame_timeout, body.frame()).await {
Err(_elapsed) => {
tracing::warn!("iroh-http: body frame read timed out after {frame_timeout:?}");
break;
}
Ok(None) => break,
Ok(Some(r)) => r,
};
match frame_result {
Err(e) => {
tracing::warn!("iroh-http: body frame error: {e:?}");
break;
}
Ok(frame) => {
if overflowed {
continue;
}
if frame.is_data() {
let data = frame.into_data().expect("is_data checked above");
total = total.saturating_add(data.len());
if let Some(limit) = max_bytes {
if total > limit {
tracing::warn!("iroh-http: request body exceeded {limit} bytes");
if let Some(tx) = overflow_tx.take() {
let _ = tx.send(());
}
overflowed = true;
continue; }
}
if writer.send_chunk(data).await.is_err() {
return; }
}
}
}
}
drop(writer);
}