qiniu_upload_manager/data_source/
file.rs1use super::{seekable::SeekableDataSource, DataSource, DataSourceReader, PartSize, SourceKey, UnseekableDataSource};
2use digest::Digest;
3use once_cell::sync::OnceCell;
4use os_str_bytes::OsStrBytes;
5use sha1::Sha1;
6use std::{
7 fmt::{self, Debug},
8 fs::File,
9 io::Result as IoResult,
10 path::PathBuf,
11 sync::Arc,
12};
13
14#[cfg(feature = "async")]
15use {
16 super::{first_part_number, AsyncDataSourceReader, AsyncSeekableSource, AsyncUnseekableDataSource},
17 async_once_cell::OnceCell as AsyncOnceCell,
18 async_std::{fs::File as AsyncFile, path::PathBuf as AsyncPathBuf},
19 futures::{future::BoxFuture, lock::Mutex as AsyncMutex, AsyncSeekExt},
20 std::num::NonZeroUsize,
21};
22
23enum Source<A: Digest> {
24 Seekable(SeekableDataSource),
25 Unseekable(UnseekableDataSource<File, A>),
26}
27
28impl<A: Digest> Debug for Source<A> {
29 #[inline]
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 match self {
32 Self::Seekable(source) => f.debug_struct("Seekable").field("source", source).finish(),
33 Self::Unseekable(source) => f.debug_tuple("Unseekable").field(source).finish(),
34 }
35 }
36}
37
38pub struct FileDataSource<A: Digest = Sha1> {
42 path: PathBuf,
43 canonicalized_path: Arc<OnceCell<PathBuf>>,
44 source: Arc<OnceCell<Source<A>>>,
45}
46
47impl<A: Digest> FileDataSource<A> {
48 pub fn new(path: impl Into<PathBuf>) -> Self {
50 Self {
51 path: path.into(),
52 canonicalized_path: Default::default(),
53 source: Default::default(),
54 }
55 }
56
57 fn get_seekable_source(&self) -> IoResult<&Source<A>> {
58 self.source.get_or_try_init(|| {
59 let file = File::open(&self.path)?;
60 let file_size = file.metadata()?.len();
61 SeekableDataSource::new(file, file_size)
62 .map(Source::Seekable)
63 .or_else(|_| {
64 File::open(&self.path)
65 .map(UnseekableDataSource::new)
66 .map(Source::Unseekable)
67 })
68 })
69 }
70
71 fn get_path(&self) -> IoResult<&PathBuf> {
72 self.canonicalized_path.get_or_try_init(|| self.path.canonicalize())
73 }
74}
75
76impl<D: Digest + Send> DataSource<D> for FileDataSource<D> {
77 fn slice(&self, size: PartSize) -> IoResult<Option<DataSourceReader>> {
78 match self.get_seekable_source()? {
79 Source::Seekable(source) => DataSource::<D>::slice(source, size),
80 Source::Unseekable(source) => source.slice(size),
81 }
82 }
83
84 fn reset(&self) -> IoResult<()> {
85 match self.get_seekable_source()? {
86 Source::Seekable(source) => DataSource::<D>::reset(source),
87 Source::Unseekable(source) => source.reset(),
88 }
89 }
90
91 fn source_key(&self) -> IoResult<Option<SourceKey<D>>> {
92 match self.get_seekable_source()? {
93 Source::Seekable { .. } => {
94 let mut hasher = D::new();
95 hasher.update(b"file://");
96 hasher.update(&self.get_path()?.to_raw_bytes());
97 Ok(Some(hasher.finalize().into()))
98 }
99 Source::Unseekable(source) => source.source_key(),
100 }
101 }
102
103 fn total_size(&self) -> IoResult<Option<u64>> {
104 match self.get_seekable_source()? {
105 Source::Seekable(source) => DataSource::<D>::total_size(source),
106 Source::Unseekable(source) => source.total_size(),
107 }
108 }
109}
110
111impl<A: Digest> Debug for FileDataSource<A> {
112 #[inline]
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 f.debug_struct("FileDataSource")
115 .field("path", &self.path)
116 .field("canonicalized_path", &self.canonicalized_path)
117 .field("source", &self.source)
118 .finish()
119 }
120}
121
122impl<A: Digest> Clone for FileDataSource<A> {
123 #[inline]
124 fn clone(&self) -> Self {
125 Self {
126 path: self.path.clone(),
127 canonicalized_path: self.canonicalized_path.clone(),
128 source: self.source.clone(),
129 }
130 }
131}
132
133#[cfg(feature = "async")]
134mod async_reader {
135 use super::{super::AsyncDataSource, *};
136
137 pub struct AsyncFileDataSource<A: Digest> {
141 path: PathBuf,
142 canonicalized_path: Arc<AsyncOnceCell<AsyncPathBuf>>,
143 source: Arc<AsyncOnceCell<AsyncSource<A>>>,
144 }
145
146 impl<A: Digest> AsyncFileDataSource<A> {
147 pub fn new(path: impl Into<PathBuf>) -> Self {
149 Self {
150 path: path.into(),
151 canonicalized_path: Arc::new(AsyncOnceCell::new()),
152 source: Arc::new(AsyncOnceCell::new()),
153 }
154 }
155
156 async fn get_async_seekable_source(&self) -> IoResult<&AsyncSource<A>> {
157 self.source
158 .get_or_try_init(async {
159 let mut file = AsyncFile::open(&self.path).await?;
160 if let Ok(offset) = file.stream_position().await {
161 Ok(AsyncSource::Seekable {
162 original_offset: offset,
163 file_size: file.metadata().await?.len(),
164 source: AsyncSeekableSource::new(file, 0, 0),
165 current: AsyncMutex::new(SourceOffset {
166 offset,
167 part_number: first_part_number(),
168 }),
169 })
170 } else {
171 Ok(AsyncSource::Unseekable(AsyncUnseekableDataSource::new(file)))
172 }
173 })
174 .await
175 }
176
177 async fn get_async_path(&self) -> IoResult<&AsyncPathBuf> {
178 self.canonicalized_path
179 .get_or_try_init(async { AsyncPathBuf::from(&self.path).canonicalize().await })
180 .await
181 }
182 }
183
184 impl<A: Digest + Send> AsyncDataSource<A> for AsyncFileDataSource<A> {
185 fn slice(&self, size: PartSize) -> BoxFuture<IoResult<Option<AsyncDataSourceReader>>> {
186 Box::pin(async move {
187 match self.get_async_seekable_source().await? {
188 AsyncSource::Seekable {
189 source,
190 current,
191 file_size,
192 ..
193 } => {
194 let mut cur = current.lock().await;
195 if cur.offset < *file_size {
196 let size = size.as_u64();
197 let source_reader = AsyncDataSourceReader::seekable(
198 cur.part_number,
199 source.clone_with_new_offset_and_length(cur.offset, size),
200 );
201 cur.offset += size;
202 cur.part_number =
203 NonZeroUsize::new(cur.part_number.get() + 1).expect("Page number is too big");
204 Ok(Some(source_reader))
205 } else {
206 Ok(None)
207 }
208 }
209 AsyncSource::Unseekable(source) => source.slice(size).await,
210 }
211 })
212 }
213
214 fn reset(&self) -> BoxFuture<IoResult<()>> {
215 Box::pin(async move {
216 match self.get_async_seekable_source().await? {
217 AsyncSource::Seekable {
218 current,
219 original_offset,
220 ..
221 } => {
222 let mut cur = current.lock().await;
223 cur.offset = *original_offset;
224 cur.part_number = first_part_number();
225 Ok(())
226 }
227 AsyncSource::Unseekable(source) => source.reset().await,
228 }
229 })
230 }
231
232 fn source_key(&self) -> BoxFuture<IoResult<Option<SourceKey<A>>>> {
233 Box::pin(async move {
234 match self.get_async_seekable_source().await? {
235 AsyncSource::Seekable { .. } => {
236 let mut hasher = A::new();
237 hasher.update(b"file://");
238 hasher.update(self.get_async_path().await?.as_os_str().to_raw_bytes());
239 Ok(Some(hasher.finalize().into()))
240 }
241 AsyncSource::Unseekable(source) => source.source_key().await,
242 }
243 })
244 }
245
246 fn total_size(&self) -> BoxFuture<IoResult<Option<u64>>> {
247 Box::pin(async move {
248 match self.get_async_seekable_source().await? {
249 AsyncSource::Seekable { file_size, .. } => Ok(Some(*file_size)),
250 AsyncSource::Unseekable(source) => source.total_size().await,
251 }
252 })
253 }
254 }
255
256 impl<A: Digest> Debug for AsyncFileDataSource<A> {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 f.debug_struct("AsyncFileDataSource")
259 .field("path", &self.path)
260 .field("canonicalized_path", &self.canonicalized_path)
261 .field("source", &self.source)
262 .finish()
263 }
264 }
265
266 impl<A: Digest> Clone for AsyncFileDataSource<A> {
267 #[inline]
268 fn clone(&self) -> Self {
269 Self {
270 path: self.path.clone(),
271 canonicalized_path: self.canonicalized_path.clone(),
272 source: self.source.clone(),
273 }
274 }
275 }
276
277 #[derive(Debug)]
278 struct SourceOffset {
279 offset: u64,
280 part_number: NonZeroUsize,
281 }
282
283 enum AsyncSource<A: Digest> {
284 Seekable {
285 source: AsyncSeekableSource,
286 current: AsyncMutex<SourceOffset>,
287 file_size: u64,
288 original_offset: u64,
289 },
290 Unseekable(AsyncUnseekableDataSource<AsyncFile, A>),
291 }
292
293 impl<A: Digest> Debug for AsyncSource<A> {
294 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295 match self {
296 Self::Seekable {
297 source,
298 current,
299 file_size,
300 original_offset,
301 } => f
302 .debug_struct("Seekable")
303 .field("source", source)
304 .field("current", current)
305 .field("file_size", file_size)
306 .field("original_offset", original_offset)
307 .finish(),
308 Self::Unseekable(file) => f.debug_tuple("Unseekable").field(file).finish(),
309 }
310 }
311 }
312}
313
314#[cfg(feature = "async")]
315pub use async_reader::*;
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320 use anyhow::Result;
321 use std::{
322 io::{Read, Write},
323 thread::{spawn as thread_spawn, JoinHandle as ThreadJoinHandle},
324 };
325 use tempfile::Builder as TempfileBuilder;
326
327 #[test]
328 fn test_sync_seekable_file_data_source() -> Result<()> {
329 let temp_file_path = {
330 let mut temp_file = TempfileBuilder::new().tempfile()?;
331 for i in 0..255u8 {
332 let buf = vec![i; 1 << 20];
333 temp_file.write_all(&buf)?;
334 }
335 temp_file.into_temp_path()
336 };
337 let data_source = FileDataSource::<Sha1>::new(&temp_file_path);
338 let mut source_readers = Vec::with_capacity(256);
339 for _ in 0..255u8 {
340 source_readers.push(data_source.slice(PartSize::new(1 << 20).unwrap())?.unwrap());
341 }
342 assert!(data_source.slice(PartSize::new(1 << 20).unwrap())?.is_none());
343
344 let mut threads: Vec<ThreadJoinHandle<IoResult<()>>> = Vec::with_capacity(256);
345 for (i, mut source_reader) in source_readers.into_iter().enumerate() {
346 threads.push(thread_spawn(move || {
347 let mut buf = Vec::new();
348 let have_read = source_reader.read_to_end(&mut buf)?;
349 assert_eq!(have_read, 1 << 20);
350 assert_eq!(buf, vec![i as u8; 1 << 20]);
351 Ok(())
352 }));
353 }
354 for thread in threads {
355 thread.join().unwrap()?;
356 }
357
358 Ok(())
359 }
360
361 #[test]
362 #[cfg(target_os = "unix")]
363 fn test_sync_unseekable_file_data_source() -> Result<()> {
364 use defer_lite::defer;
365 use ipipe::Pipe;
366 use std::fs::remove_file;
367
368 let mut pipe = Pipe::create()?;
369 let pipe_path = pipe.path().to_owned();
370 defer! {
371 remove_file(&pipe_path).ok();
372 }
373 let producer_thread: ThreadJoinHandle<IoResult<()>> = {
374 thread_spawn(move || {
375 for i in 0..255u8 {
376 let buf = vec![i; 1 << 20];
377 pipe.write_all(&buf)?;
378 }
379 pipe.close()?;
380 Ok(())
381 })
382 };
383 let data_source = FileDataSource::new(&pipe_path);
384 let mut source_readers = Vec::with_capacity(256);
385 for _ in 0..255u8 {
386 source_readers.push(data_source.slice(1 << 20)?.unwrap());
387 }
388 assert!(data_source.slice(1 << 20)?.is_none());
389
390 let mut threads: Vec<ThreadJoinHandle<IoResult<()>>> = Vec::with_capacity(257);
391 for (i, mut source_reader) in source_readers.into_iter().enumerate() {
392 threads.push(thread_spawn(move || {
393 let mut buf = Vec::new();
394 let have_read = source_reader.read_to_end(&mut buf)?;
395 assert_eq!(have_read, 1 << 20);
396 assert_eq!(buf, vec![i as u8; 1 << 20]);
397 Ok(())
398 }));
399 }
400 threads.push(producer_thread);
401 for thread in threads {
402 thread.join().unwrap()?;
403 }
404
405 Ok(())
406 }
407}