Skip to main content

qubit_http/request/
http_request_streaming_body.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Deferred streaming request body wrapper.
11
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use super::http_request_body_byte_stream::HttpRequestBodyByteStream;
17
18type HttpRequestBodyStreamFactoryFuture =
19    Pin<Box<dyn Future<Output = HttpRequestBodyByteStream> + Send + 'static>>;
20
21type HttpRequestBodyStreamFactoryFn =
22    dyn Fn() -> HttpRequestBodyStreamFactoryFuture + Send + Sync + 'static;
23
24/// Deferred streaming upload body source.
25///
26/// Each send attempt obtains a fresh byte stream from the stored async factory,
27/// so retries can rebuild the outbound body stream deterministically.
28#[derive(Clone)]
29pub struct HttpRequestStreamingBody {
30    /// Async factory producing one stream for one request attempt.
31    factory: Arc<HttpRequestBodyStreamFactoryFn>,
32}
33
34impl std::fmt::Debug for HttpRequestStreamingBody {
35    /// Formats this type without exposing closure internals.
36    ///
37    /// # Parameters
38    /// - `f`: Formatter destination.
39    ///
40    /// # Returns
41    /// Formatting result.
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.debug_struct("HttpRequestStreamingBody")
44            .finish_non_exhaustive()
45    }
46}
47
48impl HttpRequestStreamingBody {
49    /// Creates a deferred streaming body from an async stream factory.
50    ///
51    /// # Parameters
52    /// - `factory`: Async factory that returns a fresh byte stream for one send
53    ///   attempt.
54    ///
55    /// # Returns
56    /// New deferred streaming body wrapper.
57    pub fn new<F>(factory: F) -> Self
58    where
59        F: Fn() -> HttpRequestBodyStreamFactoryFuture + Send + Sync + 'static,
60    {
61        Self {
62            factory: Arc::new(factory),
63        }
64    }
65
66    /// Builds reqwest body from one factory-produced byte stream.
67    ///
68    /// # Returns
69    /// New [`reqwest::Body`] stream instance for one send attempt.
70    pub(crate) async fn to_reqwest_body(&self) -> reqwest::Body {
71        let stream = (self.factory)().await;
72        reqwest::Body::wrap_stream(stream)
73    }
74}