google_cloud_storage/read_object.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the return interface for [Storage::read_object][crate::client::Storage::read_object]
16
17use crate::Result;
18use crate::model_ext::ObjectHighlights;
19use crate::streaming_source::{Payload, StreamingSource};
20#[cfg(feature = "unstable-stream")]
21use futures::Stream;
22
23/// The result of a `ReadObject` request.
24///
25/// Objects can be large, and must be returned as a stream of bytes. This struct
26/// also provides an accessor to retrieve the object's metadata.
27#[derive(Debug)]
28pub struct ReadObjectResponse {
29 inner: Box<dyn dynamic::ReadObjectResponse + Send + 'static>,
30}
31
32impl ReadObjectResponse {
33 pub(crate) fn new<T>(inner: Box<T>) -> Self
34 where
35 T: dynamic::ReadObjectResponse + Send + 'static,
36 {
37 Self { inner }
38 }
39
40 pub(crate) fn into_parts(self) -> Box<dyn dynamic::ReadObjectResponse + Send + 'static> {
41 self.inner
42 }
43
44 /// Create a ReadObjectResponse, given a data source.
45 ///
46 /// Use this method to mock the return type of
47 /// [Storage::read_object][crate::client::Storage::read_object].
48 ///
49 /// # Example
50 /// ```
51 /// # use google_cloud_storage::model_ext::ObjectHighlights;
52 /// # use google_cloud_storage::read_object::ReadObjectResponse;
53 /// let object = ObjectHighlights::default();
54 /// let response = ReadObjectResponse::from_source(object, "payload");
55 /// ```
56 pub fn from_source<T, S>(object: ObjectHighlights, source: T) -> Self
57 where
58 T: Into<Payload<S>> + Send + Sync + 'static,
59 S: StreamingSource + Send + Sync + 'static,
60 {
61 Self {
62 inner: Box::new(FakeReadObjectResponse::<S> {
63 object,
64 source: source.into(),
65 }),
66 }
67 }
68
69 /// Get the highlights of the object metadata included in the
70 /// response.
71 ///
72 /// To get full metadata about this object, use [crate::client::StorageControl::get_object].
73 ///
74 /// # Example
75 /// ```
76 /// # use google_cloud_storage::client::Storage;
77 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
78 /// let object = client
79 /// .read_object("projects/_/buckets/my-bucket", "my-object")
80 /// .send()
81 /// .await?
82 /// .object();
83 /// println!("object generation={}", object.generation);
84 /// println!("object metageneration={}", object.metageneration);
85 /// println!("object size={}", object.size);
86 /// println!("object content encoding={}", object.content_encoding);
87 /// # Ok(()) }
88 /// ```
89 pub fn object(&self) -> ObjectHighlights {
90 self.inner.object()
91 }
92
93 /// Stream the next bytes of the object.
94 ///
95 /// When the response has been exhausted, this will return None.
96 ///
97 /// # Example
98 /// ```
99 /// # use google_cloud_storage::client::Storage;
100 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
101 /// let mut resp = client
102 /// .read_object("projects/_/buckets/my-bucket", "my-object")
103 /// .send()
104 /// .await?;
105 /// while let Some(next) = resp.next().await {
106 /// println!("next={:?}", next?);
107 /// }
108 /// # Ok(()) }
109 /// ```
110 pub async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
111 self.inner.next().await
112 }
113
114 #[cfg(feature = "unstable-stream")]
115 #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
116 /// Convert the response to a [Stream].
117 pub fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {
118 use futures::stream::unfold;
119 Box::pin(unfold(self, |mut stream| async move {
120 stream.next().await.map(|item| (item, stream))
121 }))
122 }
123}
124
125pub(crate) mod dynamic {
126 use crate::Result;
127 use crate::model_ext::ObjectHighlights;
128
129 /// A trait representing the interface to read an object
130 #[async_trait::async_trait]
131 pub trait ReadObjectResponse: std::fmt::Debug {
132 fn object(&self) -> ObjectHighlights;
133 async fn next(&mut self) -> Option<Result<bytes::Bytes>>;
134 }
135}
136
137struct FakeReadObjectResponse<T>
138where
139 T: StreamingSource + Send + Sync + 'static,
140{
141 object: ObjectHighlights,
142 source: Payload<T>,
143}
144
145impl<T> std::fmt::Debug for FakeReadObjectResponse<T>
146where
147 T: StreamingSource + Send + Sync + 'static,
148{
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("FakeReadObjectResponse")
151 .field("object", &self.object)
152 // skip source, as it is not `Debug`
153 .finish()
154 }
155}
156
157#[async_trait::async_trait]
158impl<T> dynamic::ReadObjectResponse for FakeReadObjectResponse<T>
159where
160 T: StreamingSource + Send + Sync + 'static,
161{
162 fn object(&self) -> ObjectHighlights {
163 self.object.clone()
164 }
165
166 async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
167 self.source
168 .next()
169 .await
170 .map(|r| r.map_err(crate::Error::io))
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 #[tokio::test]
179 async fn from_source() -> anyhow::Result<()> {
180 const LAZY: &str = "the quick brown fox jumps over the lazy dog";
181 let object = ObjectHighlights {
182 etag: "custom-etag".to_string(),
183 ..Default::default()
184 };
185
186 let mut response = ReadObjectResponse::from_source(object.clone(), LAZY);
187 assert_eq!(&object, &response.object());
188 let mut contents = Vec::new();
189 while let Some(chunk) = response.next().await.transpose()? {
190 contents.extend_from_slice(&chunk);
191 }
192 let contents = bytes::Bytes::from_owner(contents);
193 assert_eq!(contents, LAZY);
194 Ok(())
195 }
196}