google_cloud_storage/storage/
transport.rs1use crate::Result;
16use crate::model::{Object, ReadObjectRequest};
17use crate::model_ext::WriteObjectRequest;
18use crate::read_object::ReadObjectResponse;
19use crate::storage::client::StorageInner;
20use crate::storage::perform_upload::PerformUpload;
21use crate::storage::read_object::Reader;
22use crate::storage::request_options::RequestOptions;
23use crate::storage::streaming_source::{Seek, StreamingSource};
24use crate::{
25 model_ext::OpenObjectRequest, object_descriptor::ObjectDescriptor,
26 storage::bidi::connector::Connector, storage::bidi::transport::ObjectDescriptorTransport,
27};
28use std::sync::Arc;
29
30#[derive(Clone, Debug)]
46pub struct Storage {
47 inner: Arc<StorageInner>,
48}
49
50impl Storage {
51 pub(crate) fn new(inner: Arc<StorageInner>) -> Arc<Self> {
52 Arc::new(Self { inner })
53 }
54}
55
56impl super::stub::Storage for Storage {
57 async fn read_object(
59 &self,
60 req: ReadObjectRequest,
61 options: RequestOptions,
62 ) -> Result<ReadObjectResponse> {
63 let reader = Reader {
64 inner: self.inner.clone(),
65 request: req,
66 options,
67 };
68 reader.response().await
69 }
70
71 async fn write_object_buffered<P>(
73 &self,
74 payload: P,
75 req: WriteObjectRequest,
76 options: RequestOptions,
77 ) -> Result<Object>
78 where
79 P: StreamingSource + Send + Sync + 'static,
80 {
81 PerformUpload::new(payload, self.inner.clone(), req.spec, req.params, options)
82 .send()
83 .await
84 }
85
86 async fn write_object_unbuffered<P>(
88 &self,
89 payload: P,
90 req: WriteObjectRequest,
91 options: RequestOptions,
92 ) -> Result<Object>
93 where
94 P: StreamingSource + Seek + Send + Sync + 'static,
95 {
96 PerformUpload::new(payload, self.inner.clone(), req.spec, req.params, options)
97 .send_unbuffered()
98 .await
99 }
100
101 async fn open_object(
102 &self,
103 request: OpenObjectRequest,
104 options: RequestOptions,
105 ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
106 let (spec, ranges) = request.into_parts();
107 let connector = Connector::new(spec, options, self.inner.grpc.clone());
108 let (transport, readers) = ObjectDescriptorTransport::new(connector, ranges).await?;
109
110 Ok((ObjectDescriptor::new(transport), readers))
111 }
112}