google_cloud_storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the return interface for [Storage::read_object][crate::client::Storage::read_object]
16
17use crate::Result;
18use crate::model_ext::ObjectHighlights;
19use crate::streaming_source::{Payload, StreamingSource};
20#[cfg(feature = "unstable-stream")]
21use futures::Stream;
22
23/// The result of a `ReadObject` request.
24///
25/// Objects can be large, and must be returned as a stream of bytes. This struct
26/// also provides an accessor to retrieve the object's metadata.
27#[derive(Debug)]
28pub struct ReadObjectResponse {
29    inner: Box<dyn dynamic::ReadObjectResponse + Send>,
30}
31
32impl ReadObjectResponse {
33    pub(crate) fn new<T>(inner: Box<T>) -> Self
34    where
35        T: dynamic::ReadObjectResponse + Send + 'static,
36    {
37        Self { inner }
38    }
39
40    /// Create a ReadObjectResponse, given a data source.
41    ///
42    /// Use this method to mock the return type of
43    /// [Storage::read_object][crate::client::Storage::read_object].
44    ///
45    /// # Example
46    /// ```
47    /// # use google_cloud_storage::model_ext::ObjectHighlights;
48    /// # use google_cloud_storage::read_object::ReadObjectResponse;
49    /// let object = ObjectHighlights::default();
50    /// let response = ReadObjectResponse::from_source(object, "payload");
51    /// ```
52    pub fn from_source<T, S>(object: ObjectHighlights, source: T) -> Self
53    where
54        T: Into<Payload<S>> + Send + Sync + 'static,
55        S: StreamingSource + Send + Sync + 'static,
56    {
57        Self {
58            inner: Box::new(FakeReadObjectResponse::<S> {
59                object,
60                source: source.into(),
61            }),
62        }
63    }
64
65    /// Get the highlights of the object metadata included in the
66    /// response.
67    ///
68    /// To get full metadata about this object, use [crate::client::StorageControl::get_object].
69    ///
70    /// # Example
71    /// ```
72    /// # use google_cloud_storage::client::Storage;
73    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
74    /// let object = client
75    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
76    ///     .send()
77    ///     .await?
78    ///     .object();
79    /// println!("object generation={}", object.generation);
80    /// println!("object metageneration={}", object.metageneration);
81    /// println!("object size={}", object.size);
82    /// println!("object content encoding={}", object.content_encoding);
83    /// # Ok(()) }
84    /// ```
85    pub fn object(&self) -> ObjectHighlights {
86        self.inner.object()
87    }
88
89    /// Stream the next bytes of the object.
90    ///
91    /// When the response has been exhausted, this will return None.
92    ///
93    /// # Example
94    /// ```
95    /// # use google_cloud_storage::client::Storage;
96    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
97    /// let mut resp = client
98    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
99    ///     .send()
100    ///     .await?;
101    /// while let Some(next) = resp.next().await {
102    ///     println!("next={:?}", next?);
103    /// }
104    /// # Ok(()) }
105    /// ```
106    pub async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
107        self.inner.next().await
108    }
109
110    #[cfg(feature = "unstable-stream")]
111    #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
112    /// Convert the response to a [Stream].
113    pub fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {
114        use futures::stream::unfold;
115        Box::pin(unfold(Some(self), move |state| async move {
116            if let Some(mut this) = state {
117                if let Some(chunk) = this.next().await {
118                    return Some((chunk, Some(this)));
119                }
120            };
121            None
122        }))
123    }
124}
125
126pub(crate) mod dynamic {
127    use crate::Result;
128    use crate::model_ext::ObjectHighlights;
129
130    /// A trait representing the interface to read an object
131    #[async_trait::async_trait]
132    pub trait ReadObjectResponse: std::fmt::Debug {
133        fn object(&self) -> ObjectHighlights;
134        async fn next(&mut self) -> Option<Result<bytes::Bytes>>;
135    }
136}
137
138struct FakeReadObjectResponse<T>
139where
140    T: StreamingSource + Send + Sync + 'static,
141{
142    object: ObjectHighlights,
143    source: Payload<T>,
144}
145
146impl<T> std::fmt::Debug for FakeReadObjectResponse<T>
147where
148    T: StreamingSource + Send + Sync + 'static,
149{
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        f.debug_struct("FakeReadObjectResponse")
152            .field("object", &self.object)
153            // skip source, as it is not `Debug`
154            .finish()
155    }
156}
157
158#[async_trait::async_trait]
159impl<T> dynamic::ReadObjectResponse for FakeReadObjectResponse<T>
160where
161    T: StreamingSource + Send + Sync + 'static,
162{
163    fn object(&self) -> ObjectHighlights {
164        self.object.clone()
165    }
166
167    async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
168        self.source
169            .next()
170            .await
171            .map(|r| r.map_err(gax::error::Error::io))
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178
179    #[tokio::test]
180    async fn from_source() -> anyhow::Result<()> {
181        const LAZY: &str = "the quick brown fox jumps over the lazy dog";
182        let object = ObjectHighlights {
183            etag: "custom-etag".to_string(),
184            ..Default::default()
185        };
186
187        let mut response = ReadObjectResponse::from_source(object.clone(), LAZY);
188        assert_eq!(&object, &response.object());
189        let mut contents = Vec::new();
190        while let Some(chunk) = response.next().await.transpose()? {
191            contents.extend_from_slice(&chunk);
192        }
193        let contents = bytes::Bytes::from_owner(contents);
194        assert_eq!(contents, LAZY);
195        Ok(())
196    }
197}