google_cloud_storage/storage/
upload_source.rs1pub struct InsertPayload<T> {
39 payload: T,
40}
41
42impl<T> StreamingSource for InsertPayload<T>
43where
44 T: StreamingSource,
45{
46 type Error = T::Error;
47
48 fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send {
49 self.payload.next()
50 }
51
52 fn size_hint(&self) -> (u64, Option<u64>) {
53 self.payload.size_hint()
54 }
55}
56
57impl<T> Seek for InsertPayload<T>
58where
59 T: Seek,
60{
61 type Error = T::Error;
62
63 fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send {
64 self.payload.seek(offset)
65 }
66}
67
68impl From<bytes::Bytes> for InsertPayload<BytesSource> {
69 fn from(value: bytes::Bytes) -> Self {
70 let payload = BytesSource::new(value);
71 Self { payload }
72 }
73}
74
75impl From<&'static str> for InsertPayload<BytesSource> {
76 fn from(value: &'static str) -> Self {
77 let b = bytes::Bytes::from_static(value.as_bytes());
78 InsertPayload::from(b)
79 }
80}
81
82impl From<&'static [u8]> for InsertPayload<BytesSource> {
83 fn from(value: &'static [u8]) -> Self {
84 let b = bytes::Bytes::from_static(value);
85 InsertPayload::from(b)
86 }
87}
88
89impl<S> From<S> for InsertPayload<S>
90where
91 S: StreamingSource + Seek,
92{
93 fn from(value: S) -> Self {
94 Self { payload: value }
95 }
96}
97
98pub trait StreamingSource {
100 type Error: std::error::Error + Send + Sync + 'static;
102
103 fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes, Self::Error>>> + Send;
105
106 fn size_hint(&self) -> (u64, Option<u64>) {
114 (0_u64, None)
115 }
116}
117
118pub trait Seek {
125 type Error: std::error::Error + Send + Sync + 'static;
127
128 fn seek(&mut self, offset: u64) -> impl Future<Output = Result<(), Self::Error>> + Send;
137}
138
139const READ_SIZE: usize = 256 * 1024;
140
141impl<S> StreamingSource for S
142where
143 S: tokio::io::AsyncRead + Unpin + Send,
144{
145 type Error = std::io::Error;
146
147 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
148 let mut buffer = vec![0_u8; READ_SIZE];
149 match tokio::io::AsyncReadExt::read(self, &mut buffer).await {
150 Err(e) => Some(Err(e)),
151 Ok(0) => None,
152 Ok(n) => {
153 buffer.resize(n, 0_u8);
154 Some(Ok(bytes::Bytes::from_owner(buffer)))
155 }
156 }
157 }
158}
159
160impl<S> Seek for S
161where
162 S: tokio::io::AsyncSeek + Unpin + Send,
163{
164 type Error = std::io::Error;
165
166 async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
167 let _ = tokio::io::AsyncSeekExt::seek(self, std::io::SeekFrom::Start(offset)).await?;
168 Ok(())
169 }
170}
171
172pub struct BytesSource {
174 contents: bytes::Bytes,
175 current: Option<bytes::Bytes>,
176}
177
178impl BytesSource {
179 pub(crate) fn new(contents: bytes::Bytes) -> Self {
180 let current = Some(contents.clone());
181 Self { contents, current }
182 }
183}
184
185impl StreamingSource for BytesSource {
186 type Error = crate::Error;
187
188 async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
189 self.current.take().map(Result::Ok)
190 }
191
192 fn size_hint(&self) -> (u64, Option<u64>) {
193 let s = self.contents.len() as u64;
194 (s, Some(s))
195 }
196}
197
198impl Seek for BytesSource {
199 type Error = crate::Error;
200
201 async fn seek(&mut self, offset: u64) -> Result<(), Self::Error> {
202 let pos = std::cmp::min(offset as usize, self.contents.len());
203 self.current = Some(self.contents.slice(pos..));
204 Ok(())
205 }
206}
207
208#[cfg(test)]
209pub(crate) mod tests {
210 use super::*;
211 use std::{collections::VecDeque, io::Write};
212 use tempfile::NamedTempFile;
213
214 type Result = anyhow::Result<()>;
215
216 const CONTENTS: &[u8] = b"how vexingly quick daft zebras jump";
217
218 async fn collect<S>(source: S) -> anyhow::Result<Vec<u8>>
220 where
221 S: StreamingSource,
222 {
223 let mut vec = Vec::new();
224 let mut source = source;
225 while let Some(bytes) = source.next().await.transpose()? {
226 vec.extend_from_slice(&bytes);
227 }
228 Ok(vec)
229 }
230
231 #[tokio::test]
232 async fn empty_bytes() -> Result {
233 let buffer = InsertPayload::from(bytes::Bytes::default());
234 let range = buffer.size_hint();
235 assert_eq!(range, (0, Some(0)));
236 let got = collect(buffer).await?;
237 assert!(got.is_empty(), "{got:?}");
238
239 Ok(())
240 }
241
242 #[tokio::test]
243 async fn simple_bytes() -> Result {
244 let buffer = InsertPayload::from(bytes::Bytes::from_static(CONTENTS));
245 let range = buffer.size_hint();
246 assert_eq!(range, (CONTENTS.len() as u64, Some(CONTENTS.len() as u64)));
247 let got = collect(buffer).await?;
248 assert_eq!(got[..], CONTENTS[..], "{got:?}");
249 Ok(())
250 }
251
252 #[tokio::test]
253 async fn simple_u8() -> Result {
254 let buffer = InsertPayload::from(CONTENTS);
255 let range = buffer.size_hint();
256 assert_eq!(range, (CONTENTS.len() as u64, Some(CONTENTS.len() as u64)));
257 let got = collect(buffer).await?;
258 assert_eq!(got[..], CONTENTS[..], "{got:?}");
259 Ok(())
260 }
261
262 #[tokio::test]
263 async fn simple_str() -> Result {
264 const LAZY: &str = "the quick brown fox jumps over the lazy dog";
265 let buffer = InsertPayload::from(LAZY);
266 let range = buffer.size_hint();
267 assert_eq!(range, (LAZY.len() as u64, Some(LAZY.len() as u64)));
268 let got = collect(buffer).await?;
269 assert_eq!(&got, LAZY.as_bytes(), "{got:?}");
270 Ok(())
271 }
272
273 #[tokio::test]
274 async fn seek_bytes() -> Result {
275 let mut buffer = InsertPayload::from(bytes::Bytes::from_static(CONTENTS));
276 buffer.seek(8).await?;
277 let got = collect(buffer).await?;
278 assert_eq!(got[..], CONTENTS[8..], "{got:?}");
279 Ok(())
280 }
281
282 #[tokio::test]
283 async fn empty_stream() -> Result {
284 let source = VecStream::new(vec![]);
285 let payload = InsertPayload::from(source);
286 let range = payload.size_hint();
287 assert_eq!(range, (0, Some(0)));
288 let got = collect(payload).await?;
289 assert!(got.is_empty(), "{got:?}");
290
291 Ok(())
292 }
293
294 #[tokio::test]
295 async fn simple_stream() -> Result {
296 let source = VecStream::new(
297 ["how ", "vexingly ", "quick ", "daft ", "zebras ", "jump"]
298 .map(|v| bytes::Bytes::from_static(v.as_bytes()))
299 .to_vec(),
300 );
301 let payload = InsertPayload::from(source);
302 let got = collect(payload).await?;
303 assert_eq!(got[..], CONTENTS[..]);
304
305 Ok(())
306 }
307
308 #[tokio::test]
309 async fn empty_file() -> Result {
310 let file = NamedTempFile::new()?;
311 let read = file.reopen()?;
312 let got = collect(tokio::fs::File::from(read)).await?;
313 assert!(got.is_empty(), "{got:?}");
314 Ok(())
315 }
316
317 #[tokio::test]
318 async fn small_file() -> Result {
319 let mut file = NamedTempFile::new()?;
320 assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
321 file.flush()?;
322 let read = file.reopen()?;
323 let got = collect(tokio::fs::File::from(read)).await?;
324 assert_eq!(got[..], CONTENTS[..], "{got:?}");
325 Ok(())
326 }
327
328 #[tokio::test]
329 async fn small_file_seek() -> Result {
330 let mut file = NamedTempFile::new()?;
331 assert_eq!(file.write(CONTENTS)?, CONTENTS.len());
332 file.flush()?;
333 let mut read = tokio::fs::File::from(file.reopen()?);
334 read.seek(8).await?;
335 let got = collect(read).await?;
336 assert_eq!(got[..], CONTENTS[8..], "{got:?}");
337 Ok(())
338 }
339
340 #[tokio::test]
341 async fn larger_file() -> Result {
342 let mut file = NamedTempFile::new()?;
343 assert_eq!(file.write(&[0_u8; READ_SIZE])?, READ_SIZE);
344 assert_eq!(file.write(&[1_u8; READ_SIZE])?, READ_SIZE);
345 assert_eq!(file.write(&[2_u8; READ_SIZE])?, READ_SIZE);
346 assert_eq!(file.write(&[3_u8; READ_SIZE])?, READ_SIZE);
347 file.flush()?;
348 assert_eq!(READ_SIZE % 2, 0);
349 let mut read = tokio::fs::File::from(file.reopen()?);
350 read.seek((READ_SIZE + READ_SIZE / 2) as u64).await?;
351 let got = collect(read).await?;
352 let mut want = Vec::new();
353 want.extend_from_slice(&[1_u8; READ_SIZE / 2]);
354 want.extend_from_slice(&[2_u8; READ_SIZE]);
355 want.extend_from_slice(&[3_u8; READ_SIZE]);
356 assert_eq!(got[..], want[..], "{got:?}");
357 Ok(())
358 }
359
360 pub struct VecStream {
361 contents: Vec<bytes::Bytes>,
362 current: VecDeque<std::io::Result<bytes::Bytes>>,
363 }
364
365 impl VecStream {
366 pub fn new(contents: Vec<bytes::Bytes>) -> Self {
367 let current: VecDeque<std::io::Result<_>> =
368 contents.iter().map(|x| Ok(x.clone())).collect();
369 Self { contents, current }
370 }
371 }
372
373 impl StreamingSource for VecStream {
374 type Error = std::io::Error;
375
376 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
377 self.current.pop_front()
378 }
379
380 fn size_hint(&self) -> (u64, Option<u64>) {
381 let s = self.contents.iter().fold(0_u64, |a, i| a + i.len() as u64);
382 (s, Some(s))
383 }
384 }
385
386 impl Seek for VecStream {
387 type Error = std::io::Error;
388
389 async fn seek(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
390 let mut current: VecDeque<std::io::Result<_>> =
391 self.contents.iter().map(|x| Ok(x.clone())).collect();
392 let mut offset = offset as usize;
393 while offset > 0 {
394 match current.pop_front() {
395 None => break,
396 Some(Err(e)) => {
397 current.push_front(Err(e));
398 break;
399 }
400 Some(Ok(mut b)) if b.len() > offset => {
401 current.push_front(Ok(b.split_off(offset)));
402 break;
403 }
404 Some(Ok(b)) if b.len() == offset => {
405 break;
406 }
407 Some(Ok(b)) => {
408 offset -= b.len();
409 }
410 };
411 }
412 self.current = current;
413 Ok(())
414 }
415 }
416}