Skip to main content

typeway_client/
streaming.rs

1//! Streaming response support for the [`crate::Client`].
2//!
3//! The [`call_streaming`](crate::Client::call_streaming) method sends a
4//! request but returns the raw [`reqwest::Response`] instead of
5//! deserializing the body. This is useful for SSE streams, file downloads,
6//! or any large response where buffering the entire body is undesirable.
7
8use crate::call::CallEndpoint;
9use crate::client::Client;
10use crate::error::ClientError;
11
12impl Client {
13    /// Call an endpoint and return the raw response for streaming.
14    ///
15    /// Unlike [`call`](Client::call), this does not deserialize the response
16    /// body. The caller can stream the body using
17    /// `reqwest::Response::bytes_stream()` or read it
18    /// manually.
19    ///
20    /// The request is built identically to `call` (path substitution, method,
21    /// body serialization, interceptors), but the response is returned as-is
22    /// after a status check. A non-2xx status code produces
23    /// [`ClientError::Status`].
24    ///
25    /// Retries are **not** applied — streaming responses are not idempotent
26    /// in general and partial reads cannot be rewound.
27    ///
28    /// # Example
29    ///
30    /// ```ignore
31    /// use futures::StreamExt;
32    ///
33    /// let resp = client.call_streaming::<GetEndpoint<EventsPath, ()>>(()).await?;
34    /// let mut stream = resp.bytes_stream();
35    /// while let Some(chunk) = stream.next().await {
36    ///     let bytes = chunk?;
37    ///     // process chunk...
38    /// }
39    /// ```
40    pub async fn call_streaming<E: CallEndpoint>(
41        &self,
42        args: E::Args,
43    ) -> Result<reqwest::Response, ClientError> {
44        let path = E::build_path(&args);
45        let url = self.base_url.join(&path)?;
46        let method = E::method();
47
48        let mut request = self.inner.request(method, url);
49
50        if let Some(body_result) = E::request_body(&args) {
51            let body = body_result?;
52            request = request
53                .header(http::header::CONTENT_TYPE, "application/json")
54                .body(body);
55        }
56
57        // Apply request interceptors.
58        for interceptor in &self.config.request_interceptors {
59            request = interceptor(request);
60        }
61
62        let response: reqwest::Response = match request.send().await {
63            Ok(resp) => resp,
64            Err(e) if e.is_timeout() => return Err(ClientError::Timeout),
65            Err(e) => return Err(ClientError::Request(e)),
66        };
67
68        // Apply response interceptors.
69        for interceptor in &self.config.response_interceptors {
70            interceptor(&response);
71        }
72
73        let status = response.status();
74
75        if !status.is_success() {
76            let body = response.text().await.unwrap_or_default();
77            return Err(ClientError::Status { status, body });
78        }
79
80        Ok(response)
81    }
82}