1#[cfg(google_cloud_unstable_tracing)]
16use super::tracing::TracingResponse;
17use crate::Result;
18use crate::model::{Object, ReadObjectRequest};
19use crate::model_ext::WriteObjectRequest;
20use crate::read_object::ReadObjectResponse;
21use crate::storage::client::StorageInner;
22#[cfg(google_cloud_unstable_tracing)]
23use crate::storage::info::INSTRUMENTATION;
24use crate::storage::perform_upload::PerformUpload;
25use crate::storage::read_object::Reader;
26use crate::storage::request_options::RequestOptions;
27use crate::storage::streaming_source::{Seek, StreamingSource};
28use crate::{
29 model_ext::OpenObjectRequest, object_descriptor::ObjectDescriptor,
30 storage::bidi::connector::Connector, storage::bidi::transport::ObjectDescriptorTransport,
31};
32#[cfg(google_cloud_unstable_tracing)]
33use gaxi::observability::ResultExt;
34use std::sync::Arc;
35#[cfg(google_cloud_unstable_tracing)]
36use tracing::Instrument;
37
38#[derive(Clone, Debug)]
54pub struct Storage {
55 inner: Arc<StorageInner>,
56 tracing: bool,
57}
58
59impl Storage {
60 #[cfg(test)]
61 pub(crate) fn new_test(inner: Arc<StorageInner>) -> Arc<Self> {
62 Self::new(inner, false)
63 }
64
65 pub(crate) fn new(inner: Arc<StorageInner>, tracing: bool) -> Arc<Self> {
66 Arc::new(Self { inner, tracing })
67 }
68
69 async fn read_object_plain(
70 &self,
71 request: ReadObjectRequest,
72 options: RequestOptions,
73 ) -> Result<ReadObjectResponse> {
74 let reader = Reader {
75 inner: self.inner.clone(),
76 request,
77 options,
78 };
79 reader.response().await
80 }
81
82 #[tracing::instrument(name = "read_object", level = tracing::Level::DEBUG, ret, err(Debug))]
83 async fn read_object_tracing(
84 &self,
85 request: ReadObjectRequest,
86 options: RequestOptions,
87 ) -> Result<ReadObjectResponse> {
88 #[cfg(google_cloud_unstable_tracing)]
89 {
90 let span =
91 gaxi::client_request_span!("client::Storage", "read_object", &INSTRUMENTATION);
92 let response = self
93 .read_object_plain(request, options)
94 .instrument(span.clone())
95 .await
96 .record_in_span(&span)?;
97 let inner = TracingResponse::new(response.into_parts(), span);
98 let response = ReadObjectResponse::new(Box::new(inner));
99 Ok(response)
100 }
101 #[cfg(not(google_cloud_unstable_tracing))]
102 self.read_object_plain(request, options).await
103 }
104
105 async fn write_object_buffered_plain<P>(
106 &self,
107 payload: P,
108 request: WriteObjectRequest,
109 options: RequestOptions,
110 ) -> Result<Object>
111 where
112 P: StreamingSource + Send + Sync + 'static,
113 {
114 PerformUpload::new(
115 payload,
116 self.inner.clone(),
117 request.spec,
118 request.params,
119 options,
120 )
121 .send()
122 .await
123 }
124
125 #[tracing::instrument(name = "write_object_buffered", level = tracing::Level::DEBUG, ret, err(Debug), skip(payload))]
126 async fn write_object_buffered_tracing<P>(
127 &self,
128 payload: P,
129 request: WriteObjectRequest,
130 options: RequestOptions,
131 ) -> Result<Object>
132 where
133 P: StreamingSource + Send + Sync + 'static,
134 {
135 #[cfg(google_cloud_unstable_tracing)]
136 {
137 let span =
138 gaxi::client_request_span!("client::Storage", "write_object", &INSTRUMENTATION);
139 self.write_object_buffered_plain(payload, request, options)
140 .instrument(span.clone())
141 .await
142 .record_in_span(&span)
143 }
144 #[cfg(not(google_cloud_unstable_tracing))]
145 self.write_object_buffered_plain(payload, request, options)
146 .await
147 }
148
149 async fn write_object_unbuffered_plain<P>(
150 &self,
151 payload: P,
152 request: WriteObjectRequest,
153 options: RequestOptions,
154 ) -> Result<Object>
155 where
156 P: StreamingSource + Seek + Send + Sync + 'static,
157 {
158 PerformUpload::new(
159 payload,
160 self.inner.clone(),
161 request.spec,
162 request.params,
163 options,
164 )
165 .send_unbuffered()
166 .await
167 }
168
169 #[tracing::instrument(name = "write_object_unbuffered", level = tracing::Level::DEBUG, ret, err(Debug), skip(payload))]
170 async fn write_object_unbuffered_tracing<P>(
171 &self,
172 payload: P,
173 request: WriteObjectRequest,
174 options: RequestOptions,
175 ) -> Result<Object>
176 where
177 P: StreamingSource + Seek + Send + Sync + 'static,
178 {
179 #[cfg(google_cloud_unstable_tracing)]
180 {
181 let span =
182 gaxi::client_request_span!("client::Storage", "write_object", &INSTRUMENTATION);
183 self.write_object_unbuffered_plain(payload, request, options)
184 .instrument(span.clone())
185 .await
186 .record_in_span(&span)
187 }
188 #[cfg(not(google_cloud_unstable_tracing))]
189 self.write_object_unbuffered_plain(payload, request, options)
190 .await
191 }
192
193 async fn open_object_plain(
194 &self,
195 request: OpenObjectRequest,
196 options: RequestOptions,
197 ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
198 let (spec, ranges) = request.into_parts();
199 let connector = Connector::new(spec, options, self.inner.grpc.clone());
200 let (transport, readers) = ObjectDescriptorTransport::new(connector, ranges).await?;
201 Ok((ObjectDescriptor::new(transport), readers))
202 }
203
204 #[tracing::instrument(name = "open_object", level = tracing::Level::DEBUG, ret, err(Debug))]
205 async fn open_object_tracing(
206 &self,
207 request: OpenObjectRequest,
208 options: RequestOptions,
209 ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
210 #[cfg(google_cloud_unstable_tracing)]
211 {
212 let span =
213 gaxi::client_request_span!("client::Storage", "open_object", &INSTRUMENTATION);
214 self.open_object_plain(request, options)
216 .instrument(span.clone())
217 .await
218 .record_in_span(&span)
219 }
220 #[cfg(not(google_cloud_unstable_tracing))]
221 self.open_object_plain(request, options).await
222 }
223}
224
225impl super::stub::Storage for Storage {
226 async fn read_object(
228 &self,
229 req: ReadObjectRequest,
230 options: RequestOptions,
231 ) -> Result<ReadObjectResponse> {
232 if self.tracing {
233 return self.read_object_tracing(req, options).await;
234 }
235 self.read_object_plain(req, options).await
236 }
237
238 async fn write_object_buffered<P>(
240 &self,
241 payload: P,
242 req: WriteObjectRequest,
243 options: RequestOptions,
244 ) -> Result<Object>
245 where
246 P: StreamingSource + Send + Sync + 'static,
247 {
248 if self.tracing {
249 return self
250 .write_object_buffered_tracing(payload, req, options)
251 .await;
252 }
253 self.write_object_buffered_plain(payload, req, options)
254 .await
255 }
256
257 async fn write_object_unbuffered<P>(
259 &self,
260 payload: P,
261 req: WriteObjectRequest,
262 options: RequestOptions,
263 ) -> Result<Object>
264 where
265 P: StreamingSource + Seek + Send + Sync + 'static,
266 {
267 if self.tracing {
268 return self
269 .write_object_unbuffered_tracing(payload, req, options)
270 .await;
271 }
272 self.write_object_unbuffered_plain(payload, req, options)
273 .await
274 }
275
276 async fn open_object(
277 &self,
278 request: OpenObjectRequest,
279 options: RequestOptions,
280 ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
281 if self.tracing {
282 return self.open_object_tracing(request, options).await;
283 }
284 self.open_object_plain(request, options).await
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 #[cfg(google_cloud_unstable_tracing)]
291 use gaxi::observability::attributes::{
292 GCP_CLIENT_LANGUAGE_RUST, OTEL_KIND_INTERNAL, RPC_SYSTEM_HTTP, keys::*,
293 };
294 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
295 #[cfg(google_cloud_unstable_tracing)]
296 use google_cloud_test_utils::test_layer::AttributeValue;
297 use google_cloud_test_utils::test_layer::{CapturedSpan, TestLayer};
298 use httptest::{Expectation, Server, matchers::*, responders::status_code};
299 use std::collections::BTreeMap;
300
301 #[tokio::test]
302 async fn read_object() -> anyhow::Result<()> {
303 let guard = TestLayer::initialize();
304
305 let server = Server::run();
306 server.expect(
307 Expectation::matching(all_of![
308 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
309 request::query(url_decoded(contains(("alt", "media")))),
310 ])
311 .respond_with(status_code(404)),
312 );
313
314 let client = crate::client::Storage::builder()
315 .with_endpoint(format!("http://{}", server.addr()))
316 .with_credentials(Anonymous::new().build())
317 .with_tracing()
318 .build()
319 .await?;
320 let response = client
321 .read_object("projects/_/buckets/test-bucket", "test-object")
322 .send()
323 .await;
324 assert!(
325 matches!(response, Err(ref e) if e.is_transport()),
326 "{response:?}"
327 );
328
329 let captured = TestLayer::capture(&guard);
330 check_debug_log(&captured, "read_object");
331
332 #[cfg(google_cloud_unstable_tracing)]
333 client_request_span(&captured, "read_object", "404");
334
335 Ok(())
336 }
337
338 #[cfg(google_cloud_unstable_tracing)]
339 #[tokio::test]
340 async fn read_object_success() -> anyhow::Result<()> {
341 let guard = TestLayer::initialize();
342
343 let body = (0..100_000)
344 .map(|i| format!("{i:08} {:1000}", ""))
345 .collect::<Vec<_>>()
346 .join("\n");
347 let server = Server::run();
348 server.expect(
349 Expectation::matching(all_of![
350 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
351 request::query(url_decoded(contains(("alt", "media")))),
352 ])
353 .respond_with(
354 status_code(200)
355 .body(body.clone())
356 .append_header("x-goog-generation", 123456),
357 ),
358 );
359
360 let client = crate::client::Storage::builder()
361 .with_endpoint(format!("http://{}", server.addr()))
362 .with_credentials(Anonymous::new().build())
363 .with_tracing()
364 .build()
365 .await?;
366 let mut got = Vec::new();
367 let mut response = client
368 .read_object("projects/_/buckets/test-bucket", "test-object")
369 .send()
370 .await?;
371 let object = response.object();
372 assert_eq!(object.generation, 123456, "{object:?}");
373 while let Some(b) = response.next().await.transpose()? {
374 got.push(b);
375 }
376
377 let captured = TestLayer::capture(&guard);
378 let span = captured
379 .iter()
380 .find(|s| s.name == "client_request")
381 .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
382 assert_eq!(span.events, got.len() + 1, "{span:?}");
384
385 Ok(())
386 }
387
388 #[tokio::test]
389 async fn write_object_buffered() -> anyhow::Result<()> {
390 let guard = TestLayer::initialize();
391
392 let server = Server::run();
393 server.expect(
394 Expectation::matching(all_of![
395 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
396 request::query(url_decoded(contains(("uploadType", "multipart")))),
397 ])
398 .respond_with(status_code(404)),
399 );
400
401 let client = crate::client::Storage::builder()
402 .with_endpoint(format!("http://{}", server.addr()))
403 .with_credentials(Anonymous::new().build())
404 .with_tracing()
405 .build()
406 .await?;
407 let response = client
408 .write_object("projects/_/buckets/test-bucket", "test-object", "payload")
409 .send_buffered()
410 .await;
411 assert!(
412 matches!(response, Err(ref e) if e.is_transport()),
413 "{response:?}"
414 );
415
416 let captured = TestLayer::capture(&guard);
417 check_debug_log(&captured, "write_object_buffered");
418
419 #[cfg(google_cloud_unstable_tracing)]
420 client_request_span(&captured, "write_object", "404");
421
422 Ok(())
423 }
424
425 #[tokio::test]
426 async fn write_object_unbuffered() -> anyhow::Result<()> {
427 let guard = TestLayer::initialize();
428
429 let server = Server::run();
430 server.expect(
431 Expectation::matching(all_of![
432 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
433 request::query(url_decoded(contains(("uploadType", "multipart")))),
434 ])
435 .respond_with(status_code(404)),
436 );
437
438 let client = crate::client::Storage::builder()
439 .with_endpoint(format!("http://{}", server.addr()))
440 .with_credentials(Anonymous::new().build())
441 .with_tracing()
442 .build()
443 .await?;
444 let response = client
445 .write_object("projects/_/buckets/test-bucket", "test-object", "payload")
446 .send_unbuffered()
447 .await;
448 assert!(
449 matches!(response, Err(ref e) if e.is_transport()),
450 "{response:?}"
451 );
452
453 let captured = TestLayer::capture(&guard);
454 check_debug_log(&captured, "write_object_unbuffered");
455
456 #[cfg(google_cloud_unstable_tracing)]
457 client_request_span(&captured, "write_object", "404");
458
459 Ok(())
460 }
461
462 #[tokio::test]
463 async fn open_object() -> anyhow::Result<()> {
464 use gaxi::grpc::tonic::Status as TonicStatus;
465 use google_cloud_gax::error::rpc::Code;
466 use storage_grpc_mock::{MockStorage, start};
467
468 let guard = TestLayer::initialize();
469
470 let mut mock = MockStorage::new();
471 mock.expect_bidi_read_object()
472 .return_once(|_| Err(TonicStatus::not_found("not here")));
473 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
474
475 let client = crate::client::Storage::builder()
476 .with_credentials(Anonymous::new().build())
477 .with_endpoint(endpoint.clone())
478 .with_tracing()
479 .build()
480 .await?;
481 let response = client
482 .open_object("projects/_/buckets/test-bucket", "test-object")
483 .send()
484 .await;
485 assert!(
486 matches!(response, Err(ref e) if e.status().is_some_and(|s| s.code == Code::NotFound)),
487 "{response:?}"
488 );
489
490 let captured = TestLayer::capture(&guard);
491 check_debug_log(&captured, "open_object");
492
493 #[cfg(google_cloud_unstable_tracing)]
494 client_request_span(&captured, "open_object", "NOT_FOUND");
495 Ok(())
496 }
497
498 #[track_caller]
499 fn check_debug_log(captured: &Vec<CapturedSpan>, method: &'static str) {
500 let span = captured
501 .iter()
502 .find(|s| s.name == method)
503 .unwrap_or_else(|| panic!("missing `{method}` span in capture: {captured:#?}"));
504
505 let got = BTreeMap::from_iter(span.attributes.clone());
506 let want = ["self", "options", "request"];
507 let missing = want
508 .iter()
509 .filter(|k| !got.contains_key(**k))
510 .collect::<Vec<_>>();
511 assert!(
512 missing.is_empty(),
513 "missing = {missing:?}\ngot = {:?}\nwant = {want:?}\nfull = {got:#?}",
514 got.keys().collect::<Vec<_>>(),
515 );
516 }
517
518 #[cfg(google_cloud_unstable_tracing)]
519 #[track_caller]
520 fn client_request_span(
521 captured: &Vec<CapturedSpan>,
522 method: &'static str,
523 error_type: &'static str,
524 ) {
525 const EXPECTED_ATTRIBUTES: [(&str, &str); 8] = [
526 (OTEL_KIND, OTEL_KIND_INTERNAL),
527 (RPC_SYSTEM, RPC_SYSTEM_HTTP),
528 (RPC_SERVICE, "storage"),
529 (OTEL_STATUS_CODE, "ERROR"),
530 (GCP_CLIENT_SERVICE, "storage"),
531 (GCP_CLIENT_REPO, "googleapis/google-cloud-rust"),
532 (GCP_CLIENT_ARTIFACT, "google-cloud-storage"),
533 (GCP_CLIENT_LANGUAGE, GCP_CLIENT_LANGUAGE_RUST),
534 ];
535 let span = captured
536 .iter()
537 .find(|s| s.name == "client_request")
538 .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
539 let got = BTreeMap::from_iter(span.attributes.clone());
540 let want = BTreeMap::<String, AttributeValue>::from_iter(
543 EXPECTED_ATTRIBUTES
544 .iter()
545 .map(|(k, v)| (k.to_string(), AttributeValue::from(*v)))
546 .chain(
547 [
548 ("gax.client.span", true.into()),
549 (
550 OTEL_NAME,
551 format!("google_cloud_storage::client::Storage::{method}").into(),
552 ),
553 (RPC_METHOD, method.into()),
554 (ERROR_TYPE, error_type.into()),
555 ]
556 .map(|(k, v)| (k.to_string(), v)),
557 ),
558 );
559 let mismatch = want
560 .iter()
561 .filter(|(k, v)| !got.get(k.as_str()).is_some_and(|g| g == *v))
562 .collect::<Vec<_>>();
563 assert!(
564 mismatch.is_empty(),
565 "mismatch = {mismatch:?}\ngot = {got:?}\nwant = {want:?}"
566 );
567 }
568}