Skip to main content

qubit_http/request/
http_request_streaming_body.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! Deferred streaming request body wrapper.
10
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14
15use super::http_request_body_byte_stream::HttpRequestBodyByteStream;
16
17type HttpRequestBodyStreamFactoryFuture =
18    Pin<Box<dyn Future<Output = HttpRequestBodyByteStream> + Send + 'static>>;
19
20type HttpRequestBodyStreamFactoryFn =
21    dyn Fn() -> HttpRequestBodyStreamFactoryFuture + Send + Sync + 'static;
22
23/// Deferred streaming upload body source.
24///
25/// Each send attempt obtains a fresh byte stream from the stored async factory,
26/// so retries can rebuild the outbound body stream deterministically.
27#[derive(Clone)]
28pub struct HttpRequestStreamingBody {
29    /// Async factory producing one stream for one request attempt.
30    factory: Arc<HttpRequestBodyStreamFactoryFn>,
31}
32
33impl std::fmt::Debug for HttpRequestStreamingBody {
34    /// Formats this type without exposing closure internals.
35    ///
36    /// # Parameters
37    /// - `f`: Formatter destination.
38    ///
39    /// # Returns
40    /// Formatting result.
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("HttpRequestStreamingBody")
43            .finish_non_exhaustive()
44    }
45}
46
47impl HttpRequestStreamingBody {
48    /// Creates a deferred streaming body from an async stream factory.
49    ///
50    /// # Parameters
51    /// - `factory`: Async factory that returns a fresh byte stream for one send
52    ///   attempt.
53    ///
54    /// # Returns
55    /// New deferred streaming body wrapper.
56    pub fn new<F>(factory: F) -> Self
57    where
58        F: Fn() -> HttpRequestBodyStreamFactoryFuture + Send + Sync + 'static,
59    {
60        Self {
61            factory: Arc::new(factory),
62        }
63    }
64
65    /// Builds reqwest body from one factory-produced byte stream.
66    ///
67    /// # Returns
68    /// New [`reqwest::Body`] stream instance for one send attempt.
69    pub(crate) async fn to_reqwest_body(&self) -> reqwest::Body {
70        let stream = (self.factory)().await;
71        reqwest::Body::wrap_stream(stream)
72    }
73}