qubit_http/request/http_request_streaming_body.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! Deferred streaming request body wrapper.
10
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14
15use super::http_request_body_byte_stream::HttpRequestBodyByteStream;
16
17type HttpRequestBodyStreamFactoryFuture =
18 Pin<Box<dyn Future<Output = HttpRequestBodyByteStream> + Send + 'static>>;
19
20type HttpRequestBodyStreamFactoryFn =
21 dyn Fn() -> HttpRequestBodyStreamFactoryFuture + Send + Sync + 'static;
22
23/// Deferred streaming upload body source.
24///
25/// Each send attempt obtains a fresh byte stream from the stored async factory,
26/// so retries can rebuild the outbound body stream deterministically.
27#[derive(Clone)]
28pub struct HttpRequestStreamingBody {
29 /// Async factory producing one stream for one request attempt.
30 factory: Arc<HttpRequestBodyStreamFactoryFn>,
31}
32
33impl std::fmt::Debug for HttpRequestStreamingBody {
34 /// Formats this type without exposing closure internals.
35 ///
36 /// # Parameters
37 /// - `f`: Formatter destination.
38 ///
39 /// # Returns
40 /// Formatting result.
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("HttpRequestStreamingBody")
43 .finish_non_exhaustive()
44 }
45}
46
47impl HttpRequestStreamingBody {
48 /// Creates a deferred streaming body from an async stream factory.
49 ///
50 /// # Parameters
51 /// - `factory`: Async factory that returns a fresh byte stream for one send
52 /// attempt.
53 ///
54 /// # Returns
55 /// New deferred streaming body wrapper.
56 pub fn new<F>(factory: F) -> Self
57 where
58 F: Fn() -> HttpRequestBodyStreamFactoryFuture + Send + Sync + 'static,
59 {
60 Self {
61 factory: Arc::new(factory),
62 }
63 }
64
65 /// Builds reqwest body from one factory-produced byte stream.
66 ///
67 /// # Returns
68 /// New [`reqwest::Body`] stream instance for one send attempt.
69 pub(crate) async fn to_reqwest_body(&self) -> reqwest::Body {
70 let stream = (self.factory)().await;
71 reqwest::Body::wrap_stream(stream)
72 }
73}