qiniu_upload_manager/data_source/
mod.rs

1use super::PartSize;
2use auto_impl::auto_impl;
3use digest::{Digest, Output as DigestOutput};
4use dyn_clonable::clonable;
5use qiniu_apis::http::Reset;
6use std::{
7    fmt::Debug,
8    io::{copy as io_copy, sink as io_sink, Cursor, Read, Result as IoResult},
9    num::NonZeroUsize,
10};
11
12/// 数据源接口
13///
14/// 提供上传所用的数据源
15///
16/// 该 Trait 的异步版本为 [`AsyncDataSource`]。
17#[clonable]
18#[auto_impl(&, &mut, Box, Rc, Arc)]
19pub trait DataSource<A: Digest>: Clone + Debug + Sync + Send {
20    /// 数据源切片
21    fn slice(&self, size: PartSize) -> IoResult<Option<DataSourceReader>>;
22
23    /// 重置数据源
24    fn reset(&self) -> IoResult<()>;
25
26    /// 获取数据源 KEY
27    ///
28    /// 用于区分不同的数据源
29    #[inline]
30    fn source_key(&self) -> IoResult<Option<SourceKey<A>>> {
31        Ok(None)
32    }
33
34    /// 获取数据源大小
35    fn total_size(&self) -> IoResult<Option<u64>>;
36}
37
38pub(super) trait Digestible<A: Digest>: Read + Reset {
39    fn digest(&mut self) -> IoResult<DigestOutput<A>> {
40        struct ReadWithDigest<A, R> {
41            reader: R,
42            digest: A,
43        }
44
45        impl<A: Digest, R: Read> Read for ReadWithDigest<A, R> {
46            fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
47                let size = self.reader.read(buf)?;
48                self.digest.update(buf);
49                Ok(size)
50            }
51        }
52
53        let mut hasher = ReadWithDigest {
54            reader: self,
55            digest: A::new(),
56        };
57        io_copy(&mut hasher, &mut io_sink())?;
58        hasher.reader.reset()?;
59        Ok(hasher.digest.finalize())
60    }
61}
62
63impl<T: Read + Reset, A: Digest> Digestible<A> for T {}
64
65/// 数据源阅读器
66///
67/// 提供阻塞读取接口
68#[derive(Debug)]
69pub struct DataSourceReader {
70    inner: DataSourceReaderInner,
71    part_number: NonZeroUsize,
72}
73
74#[derive(Debug)]
75enum DataSourceReaderInner {
76    ReadSeekable(SeekableSource),
77    Readable { data: Cursor<Vec<u8>>, offset: u64 },
78}
79
80impl DataSourceReader {
81    /// 创建可寻址的数据源阅读器
82    #[inline]
83    pub fn seekable(part_number: NonZeroUsize, source: SeekableSource) -> Self {
84        Self {
85            inner: DataSourceReaderInner::ReadSeekable(source),
86            part_number,
87        }
88    }
89
90    /// 创建不可寻址的数据源阅读器
91    #[inline]
92    pub fn unseekable(part_number: NonZeroUsize, data: Vec<u8>, offset: u64) -> Self {
93        Self {
94            inner: DataSourceReaderInner::Readable {
95                data: Cursor::new(data),
96                offset,
97            },
98            part_number,
99        }
100    }
101
102    pub(super) fn part_number(&self) -> NonZeroUsize {
103        self.part_number
104    }
105
106    pub(super) fn offset(&self) -> u64 {
107        match &self.inner {
108            DataSourceReaderInner::ReadSeekable(source) => source.offset(),
109            DataSourceReaderInner::Readable { offset, .. } => *offset,
110        }
111    }
112
113    pub(super) fn len(&self) -> IoResult<u64> {
114        match &self.inner {
115            DataSourceReaderInner::ReadSeekable(source) => source.len(),
116            DataSourceReaderInner::Readable { data, .. } => Ok(data.get_ref().len() as u64),
117        }
118    }
119}
120
121impl Read for DataSourceReader {
122    #[inline]
123    fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
124        match &mut self.inner {
125            DataSourceReaderInner::ReadSeekable(source) => source.read(buf),
126            DataSourceReaderInner::Readable { data, .. } => data.read(buf),
127        }
128    }
129}
130
131impl Reset for DataSourceReader {
132    #[inline]
133    fn reset(&mut self) -> IoResult<()> {
134        match &mut self.inner {
135            DataSourceReaderInner::ReadSeekable(source) => source.reset(),
136            DataSourceReaderInner::Readable { data, .. } => data.reset(),
137        }
138    }
139}
140
141fn first_part_number() -> NonZeroUsize {
142    #[allow(unsafe_code)]
143    unsafe {
144        NonZeroUsize::new_unchecked(1)
145    }
146}
147
148#[cfg(feature = "async")]
149mod async_reader {
150    use super::*;
151    use futures::{
152        future::BoxFuture,
153        io::{copy as async_io_copy, sink as async_sink, Cursor, SeekFrom},
154        ready, AsyncRead, AsyncSeek, AsyncSeekExt,
155    };
156    use qiniu_apis::http::AsyncReset;
157    use std::{
158        pin::Pin,
159        task::{Context, Poll},
160    };
161
162    /// 异步数据源接口
163    ///
164    /// 提供上传所用的数据源
165    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
166    #[clonable]
167    #[auto_impl(&, &mut, Box, Rc, Arc)]
168    pub trait AsyncDataSource<A: Digest>: Clone + Debug + Sync + Send {
169        /// 异步数据源切片
170        fn slice(&self, size: PartSize) -> BoxFuture<IoResult<Option<AsyncDataSourceReader>>>;
171
172        /// 异步重置数据源
173        fn reset(&self) -> BoxFuture<IoResult<()>>;
174
175        /// 异步获取数据源 KEY
176        ///
177        /// 用于区分不同的数据源
178        fn source_key(&self) -> BoxFuture<IoResult<Option<SourceKey<A>>>>;
179
180        /// 异步获取数据源大小
181        fn total_size(&self) -> BoxFuture<IoResult<Option<u64>>>;
182    }
183
184    pub(in super::super) trait AsyncDigestible<A: Digest + Unpin + Send>:
185        AsyncRead + AsyncReset + Unpin + Send
186    {
187        fn digest(&mut self) -> BoxFuture<IoResult<DigestOutput<A>>> {
188            struct ReadWithDigest<A, R> {
189                reader: R,
190                digest: A,
191            }
192
193            impl<A: Digest + Unpin + Send, R: AsyncRead + Unpin> AsyncRead for ReadWithDigest<A, R> {
194                fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<IoResult<usize>> {
195                    let size = ready!(Pin::new(&mut self.reader).poll_read(cx, buf))?;
196                    self.digest.update(buf);
197                    Poll::Ready(Ok(size))
198                }
199            }
200
201            Box::pin(async move {
202                let mut hasher = ReadWithDigest {
203                    reader: self,
204                    digest: A::new(),
205                };
206                async_io_copy(Pin::new(&mut hasher), &mut async_sink()).await?;
207                hasher.reader.reset().await?;
208                Ok(hasher.digest.finalize())
209            })
210        }
211    }
212
213    impl<T: AsyncRead + AsyncReset + Unpin + Send, A: Digest + Unpin + Send> AsyncDigestible<A> for T {}
214
215    /// 异步数据源阅读器
216    ///
217    /// 提供异步读取接口
218    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
219    #[derive(Debug)]
220    pub struct AsyncDataSourceReader {
221        inner: AsyncDataSourceReaderInner,
222        part_number: NonZeroUsize,
223    }
224
225    #[derive(Debug)]
226    enum AsyncDataSourceReaderInner {
227        ReadSeekable(AsyncSeekableSource),
228        Readable { data: Cursor<Vec<u8>>, offset: u64 },
229    }
230
231    impl AsyncDataSourceReader {
232        /// 创建可寻址的异步数据源阅读器
233        #[inline]
234        pub fn seekable(part_number: NonZeroUsize, source: AsyncSeekableSource) -> Self {
235            Self {
236                inner: AsyncDataSourceReaderInner::ReadSeekable(source),
237                part_number,
238            }
239        }
240
241        /// 创建不可寻址的异步数据源阅读器
242        #[inline]
243        pub fn unseekable(part_number: NonZeroUsize, data: Vec<u8>, offset: u64) -> Self {
244            Self {
245                inner: AsyncDataSourceReaderInner::Readable {
246                    data: Cursor::new(data),
247                    offset,
248                },
249                part_number,
250            }
251        }
252
253        pub(in super::super) fn part_number(&self) -> NonZeroUsize {
254            self.part_number
255        }
256
257        pub(in super::super) fn offset(&self) -> u64 {
258            match &self.inner {
259                AsyncDataSourceReaderInner::ReadSeekable(source) => source.offset(),
260                AsyncDataSourceReaderInner::Readable { offset, .. } => *offset,
261            }
262        }
263
264        pub(in super::super) async fn len(&self) -> IoResult<u64> {
265            match &self.inner {
266                AsyncDataSourceReaderInner::ReadSeekable(source) => source.len().await,
267                AsyncDataSourceReaderInner::Readable { data, .. } => Ok(data.get_ref().len() as u64),
268            }
269        }
270    }
271
272    impl AsyncRead for AsyncDataSourceReader {
273        #[inline]
274        fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<IoResult<usize>> {
275            match &mut self.inner {
276                AsyncDataSourceReaderInner::ReadSeekable(source) => Pin::new(source).poll_read(cx, buf),
277                AsyncDataSourceReaderInner::Readable { data, .. } => Pin::new(data).poll_read(cx, buf),
278            }
279        }
280    }
281
282    impl AsyncReset for AsyncDataSourceReader {
283        #[inline]
284        fn reset(&mut self) -> BoxFuture<IoResult<()>> {
285            match &mut self.inner {
286                AsyncDataSourceReaderInner::ReadSeekable(source) => source.reset(),
287                AsyncDataSourceReaderInner::Readable { data, .. } => Box::pin(async move {
288                    data.seek(SeekFrom::Start(0)).await?;
289                    Ok(())
290                }),
291            }
292        }
293    }
294
295    trait ReadSeek: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin {}
296    impl<T: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin> ReadSeek for T {}
297}
298
299#[cfg(feature = "async")]
300pub use async_reader::*;
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use anyhow::Result;
306    use rand::{thread_rng, RngCore};
307    use std::{
308        fs::OpenOptions,
309        io::{copy as io_copy, Read, Seek, SeekFrom},
310        sync::{Arc, Mutex},
311        thread::spawn as thread_spawn,
312    };
313    use tempfile::{Builder as TempfileBuilder, NamedTempFile};
314
315    const FILE_SIZE: u64 = 1 << 26;
316
317    #[test]
318    fn test_sync_data_source_reader() -> Result<()> {
319        env_logger::builder().is_test(true).try_init().ok();
320
321        let temp_file = new_temp_file()?;
322        {
323            let r = OpenOptions::new().read(true).open(temp_file.path())?;
324            let mut w = OpenOptions::new().write(true).open(temp_file.path())?;
325            w.seek(SeekFrom::End(0))?;
326
327            io_copy(&mut r.take(FILE_SIZE), &mut w)?;
328        }
329
330        let s1 = SeekableSource::new(temp_file, 0, FILE_SIZE);
331        let s2 = s1.clone_with_new_offset_and_length(FILE_SIZE, FILE_SIZE);
332        let mut r1 = DataSourceReader::seekable(NonZeroUsize::new(1).unwrap(), s1);
333        let r1_buf = Vec::<u8>::with_capacity(FILE_SIZE as usize);
334        let r1_buf = Arc::new(Mutex::new(Cursor::new(r1_buf)));
335        let mut r2 = DataSourceReader::seekable(NonZeroUsize::new(2).unwrap(), s2);
336        let r2_buf = Vec::<u8>::with_capacity(FILE_SIZE as usize);
337        let r2_buf = Arc::new(Mutex::new(Cursor::new(r2_buf)));
338
339        let t1 = thread_spawn({
340            let r1_buf = r1_buf.to_owned();
341            move || {
342                let mut r1_buf = r1_buf.lock().unwrap();
343                io_copy(&mut r1, &mut *r1_buf).unwrap()
344            }
345        });
346        let t2 = thread_spawn({
347            let r2_buf = r2_buf.to_owned();
348            move || {
349                let mut r2_buf = r2_buf.lock().unwrap();
350                io_copy(&mut r2, &mut *r2_buf).unwrap()
351            }
352        });
353
354        t1.join().unwrap();
355        t2.join().unwrap();
356
357        let r1_buf = Arc::try_unwrap(r1_buf).unwrap().into_inner()?.into_inner();
358        let r2_buf = Arc::try_unwrap(r2_buf).unwrap().into_inner()?.into_inner();
359        assert_eq!(r1_buf.len(), r2_buf.len());
360        assert!(r1_buf == r2_buf);
361
362        Ok(())
363    }
364
365    #[cfg(feature = "async")]
366    #[async_std::test]
367    async fn test_async_data_source_reader() -> Result<()> {
368        use async_std::fs::OpenOptions;
369        use futures::{
370            future::join,
371            io::{copy as io_copy, AsyncReadExt, AsyncSeekExt, Cursor},
372            lock::Mutex,
373        };
374
375        env_logger::builder().is_test(true).try_init().ok();
376
377        let temp_path = new_temp_file()?.into_temp_path();
378        let temp_file = OpenOptions::new().read(true).write(true).open(&*temp_path).await?;
379        {
380            let r = OpenOptions::new().read(true).open(&*temp_path).await?;
381            let mut w = OpenOptions::new().write(true).open(&*temp_path).await?;
382            w.seek(SeekFrom::End(0)).await?;
383
384            io_copy(&mut r.take(FILE_SIZE), &mut w).await?;
385        }
386        let s1 = AsyncSeekableSource::new(temp_file, 0, FILE_SIZE);
387        let s2 = s1.clone_with_new_offset_and_length(FILE_SIZE, FILE_SIZE);
388        let mut r1 = AsyncDataSourceReader::seekable(NonZeroUsize::new(1).unwrap(), s1);
389        let r1_buf = Vec::<u8>::with_capacity(FILE_SIZE as usize);
390        let r1_buf = Arc::new(Mutex::new(Cursor::new(r1_buf)));
391        let mut r2 = AsyncDataSourceReader::seekable(NonZeroUsize::new(2).unwrap(), s2);
392        let r2_buf = Vec::<u8>::with_capacity(FILE_SIZE as usize);
393        let r2_buf = Arc::new(Mutex::new(Cursor::new(r2_buf)));
394
395        let f1 = {
396            let r1_buf = r1_buf.to_owned();
397            async move {
398                let mut r1_buf = r1_buf.lock().await;
399                io_copy(&mut r1, &mut *r1_buf).await.unwrap()
400            }
401        };
402        let f2 = {
403            let r2_buf = r2_buf.to_owned();
404            async move {
405                let mut r2_buf = r2_buf.lock().await;
406                io_copy(&mut r2, &mut *r2_buf).await.unwrap()
407            }
408        };
409        join(f1, f2).await;
410
411        let r1_buf = Arc::try_unwrap(r1_buf).unwrap().into_inner().into_inner();
412        let r2_buf = Arc::try_unwrap(r2_buf).unwrap().into_inner().into_inner();
413        assert_eq!(r1_buf.len(), r2_buf.len());
414        assert!(r1_buf == r2_buf);
415
416        Ok(())
417    }
418
419    fn new_temp_file() -> Result<NamedTempFile> {
420        let mut temp_file = TempfileBuilder::new().tempfile()?;
421        let rng = &mut thread_rng() as &mut dyn RngCore;
422        io_copy(&mut rng.take(FILE_SIZE), &mut temp_file)?;
423        temp_file.rewind()?;
424        Ok(temp_file)
425    }
426}
427
428mod source_key;
429pub use source_key::SourceKey;
430
431mod file;
432pub use file::FileDataSource;
433
434mod seekable;
435pub use seekable::SeekableSource;
436
437mod unseekable;
438pub use unseekable::UnseekableDataSource;
439
440#[cfg(feature = "async")]
441pub use {file::AsyncFileDataSource, seekable::AsyncSeekableSource};
442
443#[cfg(feature = "async")]
444pub use unseekable::AsyncUnseekableDataSource;