qiniu_upload_manager/data_source/
file.rs

1use super::{seekable::SeekableDataSource, DataSource, DataSourceReader, PartSize, SourceKey, UnseekableDataSource};
2use digest::Digest;
3use once_cell::sync::OnceCell;
4use os_str_bytes::OsStrBytes;
5use sha1::Sha1;
6use std::{
7    fmt::{self, Debug},
8    fs::File,
9    io::Result as IoResult,
10    path::PathBuf,
11    sync::Arc,
12};
13
14#[cfg(feature = "async")]
15use {
16    super::{first_part_number, AsyncDataSourceReader, AsyncSeekableSource, AsyncUnseekableDataSource},
17    async_once_cell::OnceCell as AsyncOnceCell,
18    async_std::{fs::File as AsyncFile, path::PathBuf as AsyncPathBuf},
19    futures::{future::BoxFuture, lock::Mutex as AsyncMutex, AsyncSeekExt},
20    std::num::NonZeroUsize,
21};
22
23enum Source<A: Digest> {
24    Seekable(SeekableDataSource),
25    Unseekable(UnseekableDataSource<File, A>),
26}
27
28impl<A: Digest> Debug for Source<A> {
29    #[inline]
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        match self {
32            Self::Seekable(source) => f.debug_struct("Seekable").field("source", source).finish(),
33            Self::Unseekable(source) => f.debug_tuple("Unseekable").field(source).finish(),
34        }
35    }
36}
37
38/// 文件数据源
39///
40/// 基于一个文件实现了数据源接口
41pub struct FileDataSource<A: Digest = Sha1> {
42    path: PathBuf,
43    canonicalized_path: Arc<OnceCell<PathBuf>>,
44    source: Arc<OnceCell<Source<A>>>,
45}
46
47impl<A: Digest> FileDataSource<A> {
48    /// 创建文件数据源
49    pub fn new(path: impl Into<PathBuf>) -> Self {
50        Self {
51            path: path.into(),
52            canonicalized_path: Default::default(),
53            source: Default::default(),
54        }
55    }
56
57    fn get_seekable_source(&self) -> IoResult<&Source<A>> {
58        self.source.get_or_try_init(|| {
59            let file = File::open(&self.path)?;
60            let file_size = file.metadata()?.len();
61            SeekableDataSource::new(file, file_size)
62                .map(Source::Seekable)
63                .or_else(|_| {
64                    File::open(&self.path)
65                        .map(UnseekableDataSource::new)
66                        .map(Source::Unseekable)
67                })
68        })
69    }
70
71    fn get_path(&self) -> IoResult<&PathBuf> {
72        self.canonicalized_path.get_or_try_init(|| self.path.canonicalize())
73    }
74}
75
76impl<D: Digest + Send> DataSource<D> for FileDataSource<D> {
77    fn slice(&self, size: PartSize) -> IoResult<Option<DataSourceReader>> {
78        match self.get_seekable_source()? {
79            Source::Seekable(source) => DataSource::<D>::slice(source, size),
80            Source::Unseekable(source) => source.slice(size),
81        }
82    }
83
84    fn reset(&self) -> IoResult<()> {
85        match self.get_seekable_source()? {
86            Source::Seekable(source) => DataSource::<D>::reset(source),
87            Source::Unseekable(source) => source.reset(),
88        }
89    }
90
91    fn source_key(&self) -> IoResult<Option<SourceKey<D>>> {
92        match self.get_seekable_source()? {
93            Source::Seekable { .. } => {
94                let mut hasher = D::new();
95                hasher.update(b"file://");
96                hasher.update(&self.get_path()?.to_raw_bytes());
97                Ok(Some(hasher.finalize().into()))
98            }
99            Source::Unseekable(source) => source.source_key(),
100        }
101    }
102
103    fn total_size(&self) -> IoResult<Option<u64>> {
104        match self.get_seekable_source()? {
105            Source::Seekable(source) => DataSource::<D>::total_size(source),
106            Source::Unseekable(source) => source.total_size(),
107        }
108    }
109}
110
111impl<A: Digest> Debug for FileDataSource<A> {
112    #[inline]
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        f.debug_struct("FileDataSource")
115            .field("path", &self.path)
116            .field("canonicalized_path", &self.canonicalized_path)
117            .field("source", &self.source)
118            .finish()
119    }
120}
121
122impl<A: Digest> Clone for FileDataSource<A> {
123    #[inline]
124    fn clone(&self) -> Self {
125        Self {
126            path: self.path.clone(),
127            canonicalized_path: self.canonicalized_path.clone(),
128            source: self.source.clone(),
129        }
130    }
131}
132
133#[cfg(feature = "async")]
134mod async_reader {
135    use super::{super::AsyncDataSource, *};
136
137    /// 异步文件数据源
138    ///
139    /// 基于一个文件实现了数据源接口
140    pub struct AsyncFileDataSource<A: Digest> {
141        path: PathBuf,
142        canonicalized_path: Arc<AsyncOnceCell<AsyncPathBuf>>,
143        source: Arc<AsyncOnceCell<AsyncSource<A>>>,
144    }
145
146    impl<A: Digest> AsyncFileDataSource<A> {
147        /// 创建文件数据源
148        pub fn new(path: impl Into<PathBuf>) -> Self {
149            Self {
150                path: path.into(),
151                canonicalized_path: Arc::new(AsyncOnceCell::new()),
152                source: Arc::new(AsyncOnceCell::new()),
153            }
154        }
155
156        async fn get_async_seekable_source(&self) -> IoResult<&AsyncSource<A>> {
157            self.source
158                .get_or_try_init(async {
159                    let mut file = AsyncFile::open(&self.path).await?;
160                    if let Ok(offset) = file.stream_position().await {
161                        Ok(AsyncSource::Seekable {
162                            original_offset: offset,
163                            file_size: file.metadata().await?.len(),
164                            source: AsyncSeekableSource::new(file, 0, 0),
165                            current: AsyncMutex::new(SourceOffset {
166                                offset,
167                                part_number: first_part_number(),
168                            }),
169                        })
170                    } else {
171                        Ok(AsyncSource::Unseekable(AsyncUnseekableDataSource::new(file)))
172                    }
173                })
174                .await
175        }
176
177        async fn get_async_path(&self) -> IoResult<&AsyncPathBuf> {
178            self.canonicalized_path
179                .get_or_try_init(async { AsyncPathBuf::from(&self.path).canonicalize().await })
180                .await
181        }
182    }
183
184    impl<A: Digest + Send> AsyncDataSource<A> for AsyncFileDataSource<A> {
185        fn slice(&self, size: PartSize) -> BoxFuture<IoResult<Option<AsyncDataSourceReader>>> {
186            Box::pin(async move {
187                match self.get_async_seekable_source().await? {
188                    AsyncSource::Seekable {
189                        source,
190                        current,
191                        file_size,
192                        ..
193                    } => {
194                        let mut cur = current.lock().await;
195                        if cur.offset < *file_size {
196                            let size = size.as_u64();
197                            let source_reader = AsyncDataSourceReader::seekable(
198                                cur.part_number,
199                                source.clone_with_new_offset_and_length(cur.offset, size),
200                            );
201                            cur.offset += size;
202                            cur.part_number =
203                                NonZeroUsize::new(cur.part_number.get() + 1).expect("Page number is too big");
204                            Ok(Some(source_reader))
205                        } else {
206                            Ok(None)
207                        }
208                    }
209                    AsyncSource::Unseekable(source) => source.slice(size).await,
210                }
211            })
212        }
213
214        fn reset(&self) -> BoxFuture<IoResult<()>> {
215            Box::pin(async move {
216                match self.get_async_seekable_source().await? {
217                    AsyncSource::Seekable {
218                        current,
219                        original_offset,
220                        ..
221                    } => {
222                        let mut cur = current.lock().await;
223                        cur.offset = *original_offset;
224                        cur.part_number = first_part_number();
225                        Ok(())
226                    }
227                    AsyncSource::Unseekable(source) => source.reset().await,
228                }
229            })
230        }
231
232        fn source_key(&self) -> BoxFuture<IoResult<Option<SourceKey<A>>>> {
233            Box::pin(async move {
234                match self.get_async_seekable_source().await? {
235                    AsyncSource::Seekable { .. } => {
236                        let mut hasher = A::new();
237                        hasher.update(b"file://");
238                        hasher.update(self.get_async_path().await?.as_os_str().to_raw_bytes());
239                        Ok(Some(hasher.finalize().into()))
240                    }
241                    AsyncSource::Unseekable(source) => source.source_key().await,
242                }
243            })
244        }
245
246        fn total_size(&self) -> BoxFuture<IoResult<Option<u64>>> {
247            Box::pin(async move {
248                match self.get_async_seekable_source().await? {
249                    AsyncSource::Seekable { file_size, .. } => Ok(Some(*file_size)),
250                    AsyncSource::Unseekable(source) => source.total_size().await,
251                }
252            })
253        }
254    }
255
256    impl<A: Digest> Debug for AsyncFileDataSource<A> {
257        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258            f.debug_struct("AsyncFileDataSource")
259                .field("path", &self.path)
260                .field("canonicalized_path", &self.canonicalized_path)
261                .field("source", &self.source)
262                .finish()
263        }
264    }
265
266    impl<A: Digest> Clone for AsyncFileDataSource<A> {
267        #[inline]
268        fn clone(&self) -> Self {
269            Self {
270                path: self.path.clone(),
271                canonicalized_path: self.canonicalized_path.clone(),
272                source: self.source.clone(),
273            }
274        }
275    }
276
277    #[derive(Debug)]
278    struct SourceOffset {
279        offset: u64,
280        part_number: NonZeroUsize,
281    }
282
283    enum AsyncSource<A: Digest> {
284        Seekable {
285            source: AsyncSeekableSource,
286            current: AsyncMutex<SourceOffset>,
287            file_size: u64,
288            original_offset: u64,
289        },
290        Unseekable(AsyncUnseekableDataSource<AsyncFile, A>),
291    }
292
293    impl<A: Digest> Debug for AsyncSource<A> {
294        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295            match self {
296                Self::Seekable {
297                    source,
298                    current,
299                    file_size,
300                    original_offset,
301                } => f
302                    .debug_struct("Seekable")
303                    .field("source", source)
304                    .field("current", current)
305                    .field("file_size", file_size)
306                    .field("original_offset", original_offset)
307                    .finish(),
308                Self::Unseekable(file) => f.debug_tuple("Unseekable").field(file).finish(),
309            }
310        }
311    }
312}
313
314#[cfg(feature = "async")]
315pub use async_reader::*;
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320    use anyhow::Result;
321    use std::{
322        io::{Read, Write},
323        thread::{spawn as thread_spawn, JoinHandle as ThreadJoinHandle},
324    };
325    use tempfile::Builder as TempfileBuilder;
326
327    #[test]
328    fn test_sync_seekable_file_data_source() -> Result<()> {
329        let temp_file_path = {
330            let mut temp_file = TempfileBuilder::new().tempfile()?;
331            for i in 0..255u8 {
332                let buf = vec![i; 1 << 20];
333                temp_file.write_all(&buf)?;
334            }
335            temp_file.into_temp_path()
336        };
337        let data_source = FileDataSource::<Sha1>::new(&temp_file_path);
338        let mut source_readers = Vec::with_capacity(256);
339        for _ in 0..255u8 {
340            source_readers.push(data_source.slice(PartSize::new(1 << 20).unwrap())?.unwrap());
341        }
342        assert!(data_source.slice(PartSize::new(1 << 20).unwrap())?.is_none());
343
344        let mut threads: Vec<ThreadJoinHandle<IoResult<()>>> = Vec::with_capacity(256);
345        for (i, mut source_reader) in source_readers.into_iter().enumerate() {
346            threads.push(thread_spawn(move || {
347                let mut buf = Vec::new();
348                let have_read = source_reader.read_to_end(&mut buf)?;
349                assert_eq!(have_read, 1 << 20);
350                assert_eq!(buf, vec![i as u8; 1 << 20]);
351                Ok(())
352            }));
353        }
354        for thread in threads {
355            thread.join().unwrap()?;
356        }
357
358        Ok(())
359    }
360
361    #[test]
362    #[cfg(target_os = "unix")]
363    fn test_sync_unseekable_file_data_source() -> Result<()> {
364        use defer_lite::defer;
365        use ipipe::Pipe;
366        use std::fs::remove_file;
367
368        let mut pipe = Pipe::create()?;
369        let pipe_path = pipe.path().to_owned();
370        defer! {
371            remove_file(&pipe_path).ok();
372        }
373        let producer_thread: ThreadJoinHandle<IoResult<()>> = {
374            thread_spawn(move || {
375                for i in 0..255u8 {
376                    let buf = vec![i; 1 << 20];
377                    pipe.write_all(&buf)?;
378                }
379                pipe.close()?;
380                Ok(())
381            })
382        };
383        let data_source = FileDataSource::new(&pipe_path);
384        let mut source_readers = Vec::with_capacity(256);
385        for _ in 0..255u8 {
386            source_readers.push(data_source.slice(1 << 20)?.unwrap());
387        }
388        assert!(data_source.slice(1 << 20)?.is_none());
389
390        let mut threads: Vec<ThreadJoinHandle<IoResult<()>>> = Vec::with_capacity(257);
391        for (i, mut source_reader) in source_readers.into_iter().enumerate() {
392            threads.push(thread_spawn(move || {
393                let mut buf = Vec::new();
394                let have_read = source_reader.read_to_end(&mut buf)?;
395                assert_eq!(have_read, 1 << 20);
396                assert_eq!(buf, vec![i as u8; 1 << 20]);
397                Ok(())
398            }));
399        }
400        threads.push(producer_thread);
401        for thread in threads {
402            thread.join().unwrap()?;
403        }
404
405        Ok(())
406    }
407}