google_cloud_storage/read_object.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the return interface for [Storage::read_object][crate::client::Storage::read_object]
16
17use crate::Result;
18use crate::model_ext::ObjectHighlights;
19use crate::streaming_source::{Payload, StreamingSource};
20#[cfg(feature = "unstable-stream")]
21use futures::Stream;
22
23/// The result of a `ReadObject` request.
24///
25/// Objects can be large, and must be returned as a stream of bytes. This struct
26/// also provides an accessor to retrieve the object's metadata.
27#[derive(Debug)]
28pub struct ReadObjectResponse {
29 inner: Box<dyn dynamic::ReadObjectResponse + Send>,
30}
31
32impl ReadObjectResponse {
33 pub(crate) fn new<T>(inner: Box<T>) -> Self
34 where
35 T: dynamic::ReadObjectResponse + Send + 'static,
36 {
37 Self { inner }
38 }
39
40 /// Create a ReadObjectResponse, given a data source.
41 ///
42 /// Use this method to mock the return type of
43 /// [Storage::read_object][crate::client::Storage::read_object].
44 ///
45 /// # Example
46 /// ```
47 /// # use google_cloud_storage::model_ext::ObjectHighlights;
48 /// # use google_cloud_storage::read_object::ReadObjectResponse;
49 /// let object = ObjectHighlights::default();
50 /// let response = ReadObjectResponse::from_source(object, "payload");
51 /// ```
52 pub fn from_source<T, S>(object: ObjectHighlights, source: T) -> Self
53 where
54 T: Into<Payload<S>> + Send + Sync + 'static,
55 S: StreamingSource + Send + Sync + 'static,
56 {
57 Self {
58 inner: Box::new(FakeReadObjectResponse::<S> {
59 object,
60 source: source.into(),
61 }),
62 }
63 }
64
65 /// Get the highlights of the object metadata included in the
66 /// response.
67 ///
68 /// To get full metadata about this object, use [crate::client::StorageControl::get_object].
69 ///
70 /// # Example
71 /// ```
72 /// # use google_cloud_storage::client::Storage;
73 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
74 /// let object = client
75 /// .read_object("projects/_/buckets/my-bucket", "my-object")
76 /// .send()
77 /// .await?
78 /// .object();
79 /// println!("object generation={}", object.generation);
80 /// println!("object metageneration={}", object.metageneration);
81 /// println!("object size={}", object.size);
82 /// println!("object content encoding={}", object.content_encoding);
83 /// # Ok(()) }
84 /// ```
85 pub fn object(&self) -> ObjectHighlights {
86 self.inner.object()
87 }
88
89 /// Stream the next bytes of the object.
90 ///
91 /// When the response has been exhausted, this will return None.
92 ///
93 /// # Example
94 /// ```
95 /// # use google_cloud_storage::client::Storage;
96 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
97 /// let mut resp = client
98 /// .read_object("projects/_/buckets/my-bucket", "my-object")
99 /// .send()
100 /// .await?;
101 /// while let Some(next) = resp.next().await {
102 /// println!("next={:?}", next?);
103 /// }
104 /// # Ok(()) }
105 /// ```
106 pub async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
107 self.inner.next().await
108 }
109
110 #[cfg(feature = "unstable-stream")]
111 #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
112 /// Convert the response to a [Stream].
113 pub fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {
114 use futures::stream::unfold;
115 Box::pin(unfold(Some(self), move |state| async move {
116 if let Some(mut this) = state {
117 if let Some(chunk) = this.next().await {
118 return Some((chunk, Some(this)));
119 }
120 };
121 None
122 }))
123 }
124}
125
126pub(crate) mod dynamic {
127 use crate::Result;
128 use crate::model_ext::ObjectHighlights;
129
130 /// A trait representing the interface to read an object
131 #[async_trait::async_trait]
132 pub trait ReadObjectResponse: std::fmt::Debug {
133 fn object(&self) -> ObjectHighlights;
134 async fn next(&mut self) -> Option<Result<bytes::Bytes>>;
135 }
136}
137
138struct FakeReadObjectResponse<T>
139where
140 T: StreamingSource + Send + Sync + 'static,
141{
142 object: ObjectHighlights,
143 source: Payload<T>,
144}
145
146impl<T> std::fmt::Debug for FakeReadObjectResponse<T>
147where
148 T: StreamingSource + Send + Sync + 'static,
149{
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 f.debug_struct("FakeReadObjectResponse")
152 .field("object", &self.object)
153 // skip source, as it is not `Debug`
154 .finish()
155 }
156}
157
158#[async_trait::async_trait]
159impl<T> dynamic::ReadObjectResponse for FakeReadObjectResponse<T>
160where
161 T: StreamingSource + Send + Sync + 'static,
162{
163 fn object(&self) -> ObjectHighlights {
164 self.object.clone()
165 }
166
167 async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
168 self.source
169 .next()
170 .await
171 .map(|r| r.map_err(gax::error::Error::io))
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178
179 #[tokio::test]
180 async fn from_source() -> anyhow::Result<()> {
181 const LAZY: &str = "the quick brown fox jumps over the lazy dog";
182 let object = ObjectHighlights {
183 etag: "custom-etag".to_string(),
184 ..Default::default()
185 };
186
187 let mut response = ReadObjectResponse::from_source(object.clone(), LAZY);
188 assert_eq!(&object, &response.object());
189 let mut contents = Vec::new();
190 while let Some(chunk) = response.next().await.transpose()? {
191 contents.extend_from_slice(&chunk);
192 }
193 let contents = bytes::Bytes::from_owner(contents);
194 assert_eq!(contents, LAZY);
195 Ok(())
196 }
197}