qiniu_upload_manager/data_source/
mod.rs1use super::PartSize;
2use auto_impl::auto_impl;
3use digest::{Digest, Output as DigestOutput};
4use dyn_clonable::clonable;
5use qiniu_apis::http::Reset;
6use std::{
7 fmt::Debug,
8 io::{copy as io_copy, sink as io_sink, Cursor, Read, Result as IoResult},
9 num::NonZeroUsize,
10};
11
12#[clonable]
18#[auto_impl(&, &mut, Box, Rc, Arc)]
19pub trait DataSource<A: Digest>: Clone + Debug + Sync + Send {
20 fn slice(&self, size: PartSize) -> IoResult<Option<DataSourceReader>>;
22
23 fn reset(&self) -> IoResult<()>;
25
26 #[inline]
30 fn source_key(&self) -> IoResult<Option<SourceKey<A>>> {
31 Ok(None)
32 }
33
34 fn total_size(&self) -> IoResult<Option<u64>>;
36}
37
38pub(super) trait Digestible<A: Digest>: Read + Reset {
39 fn digest(&mut self) -> IoResult<DigestOutput<A>> {
40 struct ReadWithDigest<A, R> {
41 reader: R,
42 digest: A,
43 }
44
45 impl<A: Digest, R: Read> Read for ReadWithDigest<A, R> {
46 fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
47 let size = self.reader.read(buf)?;
48 self.digest.update(buf);
49 Ok(size)
50 }
51 }
52
53 let mut hasher = ReadWithDigest {
54 reader: self,
55 digest: A::new(),
56 };
57 io_copy(&mut hasher, &mut io_sink())?;
58 hasher.reader.reset()?;
59 Ok(hasher.digest.finalize())
60 }
61}
62
63impl<T: Read + Reset, A: Digest> Digestible<A> for T {}
64
65#[derive(Debug)]
69pub struct DataSourceReader {
70 inner: DataSourceReaderInner,
71 part_number: NonZeroUsize,
72}
73
74#[derive(Debug)]
75enum DataSourceReaderInner {
76 ReadSeekable(SeekableSource),
77 Readable { data: Cursor<Vec<u8>>, offset: u64 },
78}
79
80impl DataSourceReader {
81 #[inline]
83 pub fn seekable(part_number: NonZeroUsize, source: SeekableSource) -> Self {
84 Self {
85 inner: DataSourceReaderInner::ReadSeekable(source),
86 part_number,
87 }
88 }
89
90 #[inline]
92 pub fn unseekable(part_number: NonZeroUsize, data: Vec<u8>, offset: u64) -> Self {
93 Self {
94 inner: DataSourceReaderInner::Readable {
95 data: Cursor::new(data),
96 offset,
97 },
98 part_number,
99 }
100 }
101
102 pub(super) fn part_number(&self) -> NonZeroUsize {
103 self.part_number
104 }
105
106 pub(super) fn offset(&self) -> u64 {
107 match &self.inner {
108 DataSourceReaderInner::ReadSeekable(source) => source.offset(),
109 DataSourceReaderInner::Readable { offset, .. } => *offset,
110 }
111 }
112
113 pub(super) fn len(&self) -> IoResult<u64> {
114 match &self.inner {
115 DataSourceReaderInner::ReadSeekable(source) => source.len(),
116 DataSourceReaderInner::Readable { data, .. } => Ok(data.get_ref().len() as u64),
117 }
118 }
119}
120
121impl Read for DataSourceReader {
122 #[inline]
123 fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
124 match &mut self.inner {
125 DataSourceReaderInner::ReadSeekable(source) => source.read(buf),
126 DataSourceReaderInner::Readable { data, .. } => data.read(buf),
127 }
128 }
129}
130
131impl Reset for DataSourceReader {
132 #[inline]
133 fn reset(&mut self) -> IoResult<()> {
134 match &mut self.inner {
135 DataSourceReaderInner::ReadSeekable(source) => source.reset(),
136 DataSourceReaderInner::Readable { data, .. } => data.reset(),
137 }
138 }
139}
140
141fn first_part_number() -> NonZeroUsize {
142 #[allow(unsafe_code)]
143 unsafe {
144 NonZeroUsize::new_unchecked(1)
145 }
146}
147
148#[cfg(feature = "async")]
149mod async_reader {
150 use super::*;
151 use futures::{
152 future::BoxFuture,
153 io::{copy as async_io_copy, sink as async_sink, Cursor, SeekFrom},
154 ready, AsyncRead, AsyncSeek, AsyncSeekExt,
155 };
156 use qiniu_apis::http::AsyncReset;
157 use std::{
158 pin::Pin,
159 task::{Context, Poll},
160 };
161
162 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
166 #[clonable]
167 #[auto_impl(&, &mut, Box, Rc, Arc)]
168 pub trait AsyncDataSource<A: Digest>: Clone + Debug + Sync + Send {
169 fn slice(&self, size: PartSize) -> BoxFuture<IoResult<Option<AsyncDataSourceReader>>>;
171
172 fn reset(&self) -> BoxFuture<IoResult<()>>;
174
175 fn source_key(&self) -> BoxFuture<IoResult<Option<SourceKey<A>>>>;
179
180 fn total_size(&self) -> BoxFuture<IoResult<Option<u64>>>;
182 }
183
184 pub(in super::super) trait AsyncDigestible<A: Digest + Unpin + Send>:
185 AsyncRead + AsyncReset + Unpin + Send
186 {
187 fn digest(&mut self) -> BoxFuture<IoResult<DigestOutput<A>>> {
188 struct ReadWithDigest<A, R> {
189 reader: R,
190 digest: A,
191 }
192
193 impl<A: Digest + Unpin + Send, R: AsyncRead + Unpin> AsyncRead for ReadWithDigest<A, R> {
194 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<IoResult<usize>> {
195 let size = ready!(Pin::new(&mut self.reader).poll_read(cx, buf))?;
196 self.digest.update(buf);
197 Poll::Ready(Ok(size))
198 }
199 }
200
201 Box::pin(async move {
202 let mut hasher = ReadWithDigest {
203 reader: self,
204 digest: A::new(),
205 };
206 async_io_copy(Pin::new(&mut hasher), &mut async_sink()).await?;
207 hasher.reader.reset().await?;
208 Ok(hasher.digest.finalize())
209 })
210 }
211 }
212
213 impl<T: AsyncRead + AsyncReset + Unpin + Send, A: Digest + Unpin + Send> AsyncDigestible<A> for T {}
214
215 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
219 #[derive(Debug)]
220 pub struct AsyncDataSourceReader {
221 inner: AsyncDataSourceReaderInner,
222 part_number: NonZeroUsize,
223 }
224
225 #[derive(Debug)]
226 enum AsyncDataSourceReaderInner {
227 ReadSeekable(AsyncSeekableSource),
228 Readable { data: Cursor<Vec<u8>>, offset: u64 },
229 }
230
231 impl AsyncDataSourceReader {
232 #[inline]
234 pub fn seekable(part_number: NonZeroUsize, source: AsyncSeekableSource) -> Self {
235 Self {
236 inner: AsyncDataSourceReaderInner::ReadSeekable(source),
237 part_number,
238 }
239 }
240
241 #[inline]
243 pub fn unseekable(part_number: NonZeroUsize, data: Vec<u8>, offset: u64) -> Self {
244 Self {
245 inner: AsyncDataSourceReaderInner::Readable {
246 data: Cursor::new(data),
247 offset,
248 },
249 part_number,
250 }
251 }
252
253 pub(in super::super) fn part_number(&self) -> NonZeroUsize {
254 self.part_number
255 }
256
257 pub(in super::super) fn offset(&self) -> u64 {
258 match &self.inner {
259 AsyncDataSourceReaderInner::ReadSeekable(source) => source.offset(),
260 AsyncDataSourceReaderInner::Readable { offset, .. } => *offset,
261 }
262 }
263
264 pub(in super::super) async fn len(&self) -> IoResult<u64> {
265 match &self.inner {
266 AsyncDataSourceReaderInner::ReadSeekable(source) => source.len().await,
267 AsyncDataSourceReaderInner::Readable { data, .. } => Ok(data.get_ref().len() as u64),
268 }
269 }
270 }
271
272 impl AsyncRead for AsyncDataSourceReader {
273 #[inline]
274 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<IoResult<usize>> {
275 match &mut self.inner {
276 AsyncDataSourceReaderInner::ReadSeekable(source) => Pin::new(source).poll_read(cx, buf),
277 AsyncDataSourceReaderInner::Readable { data, .. } => Pin::new(data).poll_read(cx, buf),
278 }
279 }
280 }
281
282 impl AsyncReset for AsyncDataSourceReader {
283 #[inline]
284 fn reset(&mut self) -> BoxFuture<IoResult<()>> {
285 match &mut self.inner {
286 AsyncDataSourceReaderInner::ReadSeekable(source) => source.reset(),
287 AsyncDataSourceReaderInner::Readable { data, .. } => Box::pin(async move {
288 data.seek(SeekFrom::Start(0)).await?;
289 Ok(())
290 }),
291 }
292 }
293 }
294
295 trait ReadSeek: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin {}
296 impl<T: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin> ReadSeek for T {}
297}
298
299#[cfg(feature = "async")]
300pub use async_reader::*;
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use anyhow::Result;
306 use rand::{thread_rng, RngCore};
307 use std::{
308 fs::OpenOptions,
309 io::{copy as io_copy, Read, Seek, SeekFrom},
310 sync::{Arc, Mutex},
311 thread::spawn as thread_spawn,
312 };
313 use tempfile::{Builder as TempfileBuilder, NamedTempFile};
314
315 const FILE_SIZE: u64 = 1 << 26;
316
317 #[test]
318 fn test_sync_data_source_reader() -> Result<()> {
319 env_logger::builder().is_test(true).try_init().ok();
320
321 let temp_file = new_temp_file()?;
322 {
323 let r = OpenOptions::new().read(true).open(temp_file.path())?;
324 let mut w = OpenOptions::new().write(true).open(temp_file.path())?;
325 w.seek(SeekFrom::End(0))?;
326
327 io_copy(&mut r.take(FILE_SIZE), &mut w)?;
328 }
329
330 let s1 = SeekableSource::new(temp_file, 0, FILE_SIZE);
331 let s2 = s1.clone_with_new_offset_and_length(FILE_SIZE, FILE_SIZE);
332 let mut r1 = DataSourceReader::seekable(NonZeroUsize::new(1).unwrap(), s1);
333 let r1_buf = Vec::<u8>::with_capacity(FILE_SIZE as usize);
334 let r1_buf = Arc::new(Mutex::new(Cursor::new(r1_buf)));
335 let mut r2 = DataSourceReader::seekable(NonZeroUsize::new(2).unwrap(), s2);
336 let r2_buf = Vec::<u8>::with_capacity(FILE_SIZE as usize);
337 let r2_buf = Arc::new(Mutex::new(Cursor::new(r2_buf)));
338
339 let t1 = thread_spawn({
340 let r1_buf = r1_buf.to_owned();
341 move || {
342 let mut r1_buf = r1_buf.lock().unwrap();
343 io_copy(&mut r1, &mut *r1_buf).unwrap()
344 }
345 });
346 let t2 = thread_spawn({
347 let r2_buf = r2_buf.to_owned();
348 move || {
349 let mut r2_buf = r2_buf.lock().unwrap();
350 io_copy(&mut r2, &mut *r2_buf).unwrap()
351 }
352 });
353
354 t1.join().unwrap();
355 t2.join().unwrap();
356
357 let r1_buf = Arc::try_unwrap(r1_buf).unwrap().into_inner()?.into_inner();
358 let r2_buf = Arc::try_unwrap(r2_buf).unwrap().into_inner()?.into_inner();
359 assert_eq!(r1_buf.len(), r2_buf.len());
360 assert!(r1_buf == r2_buf);
361
362 Ok(())
363 }
364
365 #[cfg(feature = "async")]
366 #[async_std::test]
367 async fn test_async_data_source_reader() -> Result<()> {
368 use async_std::fs::OpenOptions;
369 use futures::{
370 future::join,
371 io::{copy as io_copy, AsyncReadExt, AsyncSeekExt, Cursor},
372 lock::Mutex,
373 };
374
375 env_logger::builder().is_test(true).try_init().ok();
376
377 let temp_path = new_temp_file()?.into_temp_path();
378 let temp_file = OpenOptions::new().read(true).write(true).open(&*temp_path).await?;
379 {
380 let r = OpenOptions::new().read(true).open(&*temp_path).await?;
381 let mut w = OpenOptions::new().write(true).open(&*temp_path).await?;
382 w.seek(SeekFrom::End(0)).await?;
383
384 io_copy(&mut r.take(FILE_SIZE), &mut w).await?;
385 }
386 let s1 = AsyncSeekableSource::new(temp_file, 0, FILE_SIZE);
387 let s2 = s1.clone_with_new_offset_and_length(FILE_SIZE, FILE_SIZE);
388 let mut r1 = AsyncDataSourceReader::seekable(NonZeroUsize::new(1).unwrap(), s1);
389 let r1_buf = Vec::<u8>::with_capacity(FILE_SIZE as usize);
390 let r1_buf = Arc::new(Mutex::new(Cursor::new(r1_buf)));
391 let mut r2 = AsyncDataSourceReader::seekable(NonZeroUsize::new(2).unwrap(), s2);
392 let r2_buf = Vec::<u8>::with_capacity(FILE_SIZE as usize);
393 let r2_buf = Arc::new(Mutex::new(Cursor::new(r2_buf)));
394
395 let f1 = {
396 let r1_buf = r1_buf.to_owned();
397 async move {
398 let mut r1_buf = r1_buf.lock().await;
399 io_copy(&mut r1, &mut *r1_buf).await.unwrap()
400 }
401 };
402 let f2 = {
403 let r2_buf = r2_buf.to_owned();
404 async move {
405 let mut r2_buf = r2_buf.lock().await;
406 io_copy(&mut r2, &mut *r2_buf).await.unwrap()
407 }
408 };
409 join(f1, f2).await;
410
411 let r1_buf = Arc::try_unwrap(r1_buf).unwrap().into_inner().into_inner();
412 let r2_buf = Arc::try_unwrap(r2_buf).unwrap().into_inner().into_inner();
413 assert_eq!(r1_buf.len(), r2_buf.len());
414 assert!(r1_buf == r2_buf);
415
416 Ok(())
417 }
418
419 fn new_temp_file() -> Result<NamedTempFile> {
420 let mut temp_file = TempfileBuilder::new().tempfile()?;
421 let rng = &mut thread_rng() as &mut dyn RngCore;
422 io_copy(&mut rng.take(FILE_SIZE), &mut temp_file)?;
423 temp_file.rewind()?;
424 Ok(temp_file)
425 }
426}
427
428mod source_key;
429pub use source_key::SourceKey;
430
431mod file;
432pub use file::FileDataSource;
433
434mod seekable;
435pub use seekable::SeekableSource;
436
437mod unseekable;
438pub use unseekable::UnseekableDataSource;
439
440#[cfg(feature = "async")]
441pub use {file::AsyncFileDataSource, seekable::AsyncSeekableSource};
442
443#[cfg(feature = "async")]
444pub use unseekable::AsyncUnseekableDataSource;