qiniu_upload_manager/data_source/
unseekable.rs

1use super::{first_part_number, DataSource, DataSourceReader, PartSize, SourceKey};
2use sha1::{digest::Digest, Sha1};
3use std::{
4    fmt::{self, Debug},
5    io::{Error as IoError, ErrorKind as IoErrorKind, Read, Result as IoResult},
6    num::NonZeroUsize,
7    sync::{Arc, Mutex},
8};
9
10/// 不可寻址的数据源
11///
12/// 基于一个不可寻址的阅读器实现了数据源接口
13pub struct UnseekableDataSource<R: Read + Debug + Send + Sync + 'static + ?Sized, A: Digest = Sha1>(
14    Arc<Mutex<UnseekableDataSourceInner<R, A>>>,
15);
16
17impl<R: Read + Debug + Send + Sync + 'static, A: Digest> Debug for UnseekableDataSource<R, A> {
18    #[inline]
19    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20        f.debug_tuple("UnseekableDataSource").field(&self.0).finish()
21    }
22}
23
24impl<R: Read + Debug + Send + Sync + 'static, A: Digest> Clone for UnseekableDataSource<R, A> {
25    #[inline]
26    fn clone(&self) -> Self {
27        Self(self.0.clone())
28    }
29}
30
31struct UnseekableDataSourceInner<R: Read + Debug + Send + Sync + 'static + ?Sized, A: Digest> {
32    current_offset: u64,
33    current_part_number: NonZeroUsize,
34    source_key: Option<SourceKey<A>>,
35    reader: R,
36}
37
38impl<R: Read + Debug + Send + Sync + 'static, A: Digest> UnseekableDataSource<R, A> {
39    /// 创建不可寻址的数据源
40    #[inline]
41    pub fn new(reader: R) -> Self {
42        Self(Arc::new(Mutex::new(UnseekableDataSourceInner {
43            reader,
44            current_offset: 0,
45            current_part_number: first_part_number(),
46            source_key: None,
47        })))
48    }
49}
50
51impl<R: Read + Debug + Send + Sync + 'static, A: Digest> DataSource<A> for UnseekableDataSource<R, A> {
52    fn slice(&self, size: PartSize) -> IoResult<Option<DataSourceReader>> {
53        let mut buf = Vec::new();
54        let guard = &mut self.0.lock().unwrap();
55        let have_read = (&mut guard.reader).take(size.as_u64()).read_to_end(&mut buf)?;
56        if have_read > 0 {
57            let source_reader = DataSourceReader::unseekable(guard.current_part_number, buf, guard.current_offset);
58            guard.current_offset += have_read as u64;
59            guard.current_part_number =
60                NonZeroUsize::new(guard.current_part_number.get() + 1).expect("Page number is too big");
61            Ok(Some(source_reader))
62        } else {
63            Ok(None)
64        }
65    }
66
67    #[inline]
68    fn reset(&self) -> IoResult<()> {
69        Err(unsupported_reset_error())
70    }
71
72    #[inline]
73    fn source_key(&self) -> IoResult<Option<SourceKey<A>>> {
74        Ok(self.0.lock().unwrap().source_key.to_owned())
75    }
76
77    #[inline]
78    fn total_size(&self) -> IoResult<Option<u64>> {
79        Ok(None)
80    }
81}
82
83impl<R: Read + Debug + Send + Sync + 'static, A: Digest> Debug for UnseekableDataSourceInner<R, A> {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        f.debug_struct("UnseekableDataSourceInner")
86            .field("reader", &self.reader)
87            .field("current_offset", &self.current_offset)
88            .field("current_part_number", &self.current_part_number)
89            .field("source_key", &self.source_key)
90            .finish()
91    }
92}
93
94#[cfg(feature = "async")]
95mod async_unseekable {
96    use super::{
97        super::{AsyncDataSource, AsyncDataSourceReader},
98        *,
99    };
100    use futures::{
101        future::{self, BoxFuture},
102        lock::Mutex,
103        AsyncRead, AsyncReadExt,
104    };
105
106    /// 不可寻址的异步数据源
107    ///
108    /// 基于一个不可寻址的异步阅读器实现了异步数据源接口
109    pub struct AsyncUnseekableDataSource<R: AsyncRead + Debug + Unpin + Send + Sync + 'static + ?Sized, A: Digest = Sha1>(
110        Arc<Mutex<AsyncUnseekableDataSourceInner<R, A>>>,
111    );
112
113    impl<R: AsyncRead + Debug + Unpin + Send + Sync + 'static, A: Digest> Debug for AsyncUnseekableDataSource<R, A> {
114        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115            f.debug_tuple("AsyncUnseekableDataSource").field(&self.0).finish()
116        }
117    }
118
119    impl<R: AsyncRead + Debug + Unpin + Send + Sync + 'static, A: Digest> Clone for AsyncUnseekableDataSource<R, A> {
120        #[inline]
121        fn clone(&self) -> Self {
122            Self(self.0.clone())
123        }
124    }
125
126    struct AsyncUnseekableDataSourceInner<R: AsyncRead + Debug + Unpin + Send + Sync + 'static + ?Sized, A: Digest> {
127        current_offset: u64,
128        current_part_number: NonZeroUsize,
129        source_key: Option<SourceKey<A>>,
130        reader: R,
131    }
132
133    impl<R: AsyncRead + Debug + Unpin + Send + Sync + 'static, A: Digest> AsyncUnseekableDataSource<R, A> {
134        /// 创建不可寻址的异步数据源
135        pub fn new(reader: R) -> Self {
136            Self(Arc::new(Mutex::new(AsyncUnseekableDataSourceInner {
137                reader,
138                current_offset: 0,
139                current_part_number: first_part_number(),
140                source_key: None,
141            })))
142        }
143    }
144
145    impl<R: AsyncRead + Debug + Unpin + Send + Sync + 'static, A: Digest> AsyncDataSource<A>
146        for AsyncUnseekableDataSource<R, A>
147    {
148        fn slice(&self, size: PartSize) -> BoxFuture<IoResult<Option<AsyncDataSourceReader>>> {
149            Box::pin(async move {
150                let mut buf = Vec::new();
151                let guard = &mut self.0.lock().await;
152                let have_read = (&mut guard.reader).take(size.as_u64()).read_to_end(&mut buf).await?;
153                if have_read > 0 {
154                    let source_reader =
155                        AsyncDataSourceReader::unseekable(guard.current_part_number, buf, guard.current_offset);
156                    guard.current_offset += have_read as u64;
157                    guard.current_part_number =
158                        NonZeroUsize::new(guard.current_part_number.get() + 1).expect("Page number is too big");
159                    Ok(Some(source_reader))
160                } else {
161                    Ok(None)
162                }
163            })
164        }
165
166        #[inline]
167        fn reset(&self) -> BoxFuture<IoResult<()>> {
168            Box::pin(async move { Err(unsupported_reset_error()) })
169        }
170
171        #[inline]
172        fn source_key(&self) -> BoxFuture<IoResult<Option<SourceKey<A>>>> {
173            Box::pin(async move { Ok(self.0.lock().await.source_key.to_owned()) })
174        }
175
176        #[inline]
177        fn total_size(&self) -> BoxFuture<IoResult<Option<u64>>> {
178            Box::pin(future::ok(None))
179        }
180    }
181
182    impl<R: AsyncRead + Debug + Unpin + Send + Sync + 'static, A: Digest> Debug for AsyncUnseekableDataSourceInner<R, A> {
183        #[inline]
184        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185            f.debug_struct("AsyncUnseekableDataSourceInner")
186                .field("reader", &self.reader)
187                .field("current_offset", &self.current_offset)
188                .field("current_part_number", &self.current_part_number)
189                .field("source_key", &self.source_key)
190                .finish()
191        }
192    }
193}
194
195#[cfg(feature = "async")]
196pub use async_unseekable::*;
197
198fn unsupported_reset_error() -> IoError {
199    IoError::new(IoErrorKind::Unsupported, "Cannot reset unseekable source")
200}