qubit_http/request/http_request_streaming_body.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Deferred streaming request body wrapper.
11
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use super::http_request_body_byte_stream::HttpRequestBodyByteStream;
17
18type HttpRequestBodyStreamFactoryFuture =
19 Pin<Box<dyn Future<Output = HttpRequestBodyByteStream> + Send + 'static>>;
20
21type HttpRequestBodyStreamFactoryFn =
22 dyn Fn() -> HttpRequestBodyStreamFactoryFuture + Send + Sync + 'static;
23
24/// Deferred streaming upload body source.
25///
26/// Each send attempt obtains a fresh byte stream from the stored async factory,
27/// so retries can rebuild the outbound body stream deterministically.
28#[derive(Clone)]
29pub struct HttpRequestStreamingBody {
30 /// Async factory producing one stream for one request attempt.
31 factory: Arc<HttpRequestBodyStreamFactoryFn>,
32}
33
34impl std::fmt::Debug for HttpRequestStreamingBody {
35 /// Formats this type without exposing closure internals.
36 ///
37 /// # Parameters
38 /// - `f`: Formatter destination.
39 ///
40 /// # Returns
41 /// Formatting result.
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("HttpRequestStreamingBody")
44 .finish_non_exhaustive()
45 }
46}
47
48impl HttpRequestStreamingBody {
49 /// Creates a deferred streaming body from an async stream factory.
50 ///
51 /// # Parameters
52 /// - `factory`: Async factory that returns a fresh byte stream for one send
53 /// attempt.
54 ///
55 /// # Returns
56 /// New deferred streaming body wrapper.
57 pub fn new<F>(factory: F) -> Self
58 where
59 F: Fn() -> HttpRequestBodyStreamFactoryFuture + Send + Sync + 'static,
60 {
61 Self {
62 factory: Arc::new(factory),
63 }
64 }
65
66 /// Builds reqwest body from one factory-produced byte stream.
67 ///
68 /// # Returns
69 /// New [`reqwest::Body`] stream instance for one send attempt.
70 pub(crate) async fn to_reqwest_body(&self) -> reqwest::Body {
71 let stream = (self.factory)().await;
72 reqwest::Body::wrap_stream(stream)
73 }
74}