Skip to main content

google_cloud_storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the return interface for [Storage::read_object][crate::client::Storage::read_object]
16
17use crate::Result;
18use crate::model_ext::ObjectHighlights;
19use crate::streaming_source::{Payload, StreamingSource};
20#[cfg(feature = "unstable-stream")]
21use futures::Stream;
22
23/// The result of a `ReadObject` request.
24///
25/// Objects can be large, and must be returned as a stream of bytes. This struct
26/// also provides an accessor to retrieve the object's metadata.
27#[derive(Debug)]
28pub struct ReadObjectResponse {
29    inner: Box<dyn dynamic::ReadObjectResponse + Send + 'static>,
30}
31
32impl ReadObjectResponse {
33    pub(crate) fn new<T>(inner: Box<T>) -> Self
34    where
35        T: dynamic::ReadObjectResponse + Send + 'static,
36    {
37        Self { inner }
38    }
39
40    #[cfg(google_cloud_unstable_tracing)]
41    pub(crate) fn into_parts(self) -> Box<dyn dynamic::ReadObjectResponse + Send + 'static> {
42        self.inner
43    }
44
45    /// Create a ReadObjectResponse, given a data source.
46    ///
47    /// Use this method to mock the return type of
48    /// [Storage::read_object][crate::client::Storage::read_object].
49    ///
50    /// # Example
51    /// ```
52    /// # use google_cloud_storage::model_ext::ObjectHighlights;
53    /// # use google_cloud_storage::read_object::ReadObjectResponse;
54    /// let object = ObjectHighlights::default();
55    /// let response = ReadObjectResponse::from_source(object, "payload");
56    /// ```
57    pub fn from_source<T, S>(object: ObjectHighlights, source: T) -> Self
58    where
59        T: Into<Payload<S>> + Send + Sync + 'static,
60        S: StreamingSource + Send + Sync + 'static,
61    {
62        Self {
63            inner: Box::new(FakeReadObjectResponse::<S> {
64                object,
65                source: source.into(),
66            }),
67        }
68    }
69
70    /// Get the highlights of the object metadata included in the
71    /// response.
72    ///
73    /// To get full metadata about this object, use [crate::client::StorageControl::get_object].
74    ///
75    /// # Example
76    /// ```
77    /// # use google_cloud_storage::client::Storage;
78    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
79    /// let object = client
80    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
81    ///     .send()
82    ///     .await?
83    ///     .object();
84    /// println!("object generation={}", object.generation);
85    /// println!("object metageneration={}", object.metageneration);
86    /// println!("object size={}", object.size);
87    /// println!("object content encoding={}", object.content_encoding);
88    /// # Ok(()) }
89    /// ```
90    pub fn object(&self) -> ObjectHighlights {
91        self.inner.object()
92    }
93
94    /// Stream the next bytes of the object.
95    ///
96    /// When the response has been exhausted, this will return None.
97    ///
98    /// # Example
99    /// ```
100    /// # use google_cloud_storage::client::Storage;
101    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
102    /// let mut resp = client
103    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
104    ///     .send()
105    ///     .await?;
106    /// while let Some(next) = resp.next().await {
107    ///     println!("next={:?}", next?);
108    /// }
109    /// # Ok(()) }
110    /// ```
111    pub async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
112        self.inner.next().await
113    }
114
115    #[cfg(feature = "unstable-stream")]
116    #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
117    /// Convert the response to a [Stream].
118    pub fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {
119        use futures::stream::unfold;
120        Box::pin(unfold(Some(self), move |state| async move {
121            if let Some(mut this) = state {
122                if let Some(chunk) = this.next().await {
123                    return Some((chunk, Some(this)));
124                }
125            };
126            None
127        }))
128    }
129}
130
131pub(crate) mod dynamic {
132    use crate::Result;
133    use crate::model_ext::ObjectHighlights;
134
135    /// A trait representing the interface to read an object
136    #[async_trait::async_trait]
137    pub trait ReadObjectResponse: std::fmt::Debug {
138        fn object(&self) -> ObjectHighlights;
139        async fn next(&mut self) -> Option<Result<bytes::Bytes>>;
140    }
141}
142
143struct FakeReadObjectResponse<T>
144where
145    T: StreamingSource + Send + Sync + 'static,
146{
147    object: ObjectHighlights,
148    source: Payload<T>,
149}
150
151impl<T> std::fmt::Debug for FakeReadObjectResponse<T>
152where
153    T: StreamingSource + Send + Sync + 'static,
154{
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.debug_struct("FakeReadObjectResponse")
157            .field("object", &self.object)
158            // skip source, as it is not `Debug`
159            .finish()
160    }
161}
162
163#[async_trait::async_trait]
164impl<T> dynamic::ReadObjectResponse for FakeReadObjectResponse<T>
165where
166    T: StreamingSource + Send + Sync + 'static,
167{
168    fn object(&self) -> ObjectHighlights {
169        self.object.clone()
170    }
171
172    async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
173        self.source
174            .next()
175            .await
176            .map(|r| r.map_err(crate::Error::io))
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    #[tokio::test]
185    async fn from_source() -> anyhow::Result<()> {
186        const LAZY: &str = "the quick brown fox jumps over the lazy dog";
187        let object = ObjectHighlights {
188            etag: "custom-etag".to_string(),
189            ..Default::default()
190        };
191
192        let mut response = ReadObjectResponse::from_source(object.clone(), LAZY);
193        assert_eq!(&object, &response.object());
194        let mut contents = Vec::new();
195        while let Some(chunk) = response.next().await.transpose()? {
196            contents.extend_from_slice(&chunk);
197        }
198        let contents = bytes::Bytes::from_owner(contents);
199        assert_eq!(contents, LAZY);
200        Ok(())
201    }
202}