qiniu_upload_manager/data_source/
unseekable.rs1use super::{first_part_number, DataSource, DataSourceReader, PartSize, SourceKey};
2use sha1::{digest::Digest, Sha1};
3use std::{
4 fmt::{self, Debug},
5 io::{Error as IoError, ErrorKind as IoErrorKind, Read, Result as IoResult},
6 num::NonZeroUsize,
7 sync::{Arc, Mutex},
8};
9
10pub struct UnseekableDataSource<R: Read + Debug + Send + Sync + 'static + ?Sized, A: Digest = Sha1>(
14 Arc<Mutex<UnseekableDataSourceInner<R, A>>>,
15);
16
17impl<R: Read + Debug + Send + Sync + 'static, A: Digest> Debug for UnseekableDataSource<R, A> {
18 #[inline]
19 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20 f.debug_tuple("UnseekableDataSource").field(&self.0).finish()
21 }
22}
23
24impl<R: Read + Debug + Send + Sync + 'static, A: Digest> Clone for UnseekableDataSource<R, A> {
25 #[inline]
26 fn clone(&self) -> Self {
27 Self(self.0.clone())
28 }
29}
30
31struct UnseekableDataSourceInner<R: Read + Debug + Send + Sync + 'static + ?Sized, A: Digest> {
32 current_offset: u64,
33 current_part_number: NonZeroUsize,
34 source_key: Option<SourceKey<A>>,
35 reader: R,
36}
37
38impl<R: Read + Debug + Send + Sync + 'static, A: Digest> UnseekableDataSource<R, A> {
39 #[inline]
41 pub fn new(reader: R) -> Self {
42 Self(Arc::new(Mutex::new(UnseekableDataSourceInner {
43 reader,
44 current_offset: 0,
45 current_part_number: first_part_number(),
46 source_key: None,
47 })))
48 }
49}
50
51impl<R: Read + Debug + Send + Sync + 'static, A: Digest> DataSource<A> for UnseekableDataSource<R, A> {
52 fn slice(&self, size: PartSize) -> IoResult<Option<DataSourceReader>> {
53 let mut buf = Vec::new();
54 let guard = &mut self.0.lock().unwrap();
55 let have_read = (&mut guard.reader).take(size.as_u64()).read_to_end(&mut buf)?;
56 if have_read > 0 {
57 let source_reader = DataSourceReader::unseekable(guard.current_part_number, buf, guard.current_offset);
58 guard.current_offset += have_read as u64;
59 guard.current_part_number =
60 NonZeroUsize::new(guard.current_part_number.get() + 1).expect("Page number is too big");
61 Ok(Some(source_reader))
62 } else {
63 Ok(None)
64 }
65 }
66
67 #[inline]
68 fn reset(&self) -> IoResult<()> {
69 Err(unsupported_reset_error())
70 }
71
72 #[inline]
73 fn source_key(&self) -> IoResult<Option<SourceKey<A>>> {
74 Ok(self.0.lock().unwrap().source_key.to_owned())
75 }
76
77 #[inline]
78 fn total_size(&self) -> IoResult<Option<u64>> {
79 Ok(None)
80 }
81}
82
83impl<R: Read + Debug + Send + Sync + 'static, A: Digest> Debug for UnseekableDataSourceInner<R, A> {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 f.debug_struct("UnseekableDataSourceInner")
86 .field("reader", &self.reader)
87 .field("current_offset", &self.current_offset)
88 .field("current_part_number", &self.current_part_number)
89 .field("source_key", &self.source_key)
90 .finish()
91 }
92}
93
94#[cfg(feature = "async")]
95mod async_unseekable {
96 use super::{
97 super::{AsyncDataSource, AsyncDataSourceReader},
98 *,
99 };
100 use futures::{
101 future::{self, BoxFuture},
102 lock::Mutex,
103 AsyncRead, AsyncReadExt,
104 };
105
106 pub struct AsyncUnseekableDataSource<R: AsyncRead + Debug + Unpin + Send + Sync + 'static + ?Sized, A: Digest = Sha1>(
110 Arc<Mutex<AsyncUnseekableDataSourceInner<R, A>>>,
111 );
112
113 impl<R: AsyncRead + Debug + Unpin + Send + Sync + 'static, A: Digest> Debug for AsyncUnseekableDataSource<R, A> {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 f.debug_tuple("AsyncUnseekableDataSource").field(&self.0).finish()
116 }
117 }
118
119 impl<R: AsyncRead + Debug + Unpin + Send + Sync + 'static, A: Digest> Clone for AsyncUnseekableDataSource<R, A> {
120 #[inline]
121 fn clone(&self) -> Self {
122 Self(self.0.clone())
123 }
124 }
125
126 struct AsyncUnseekableDataSourceInner<R: AsyncRead + Debug + Unpin + Send + Sync + 'static + ?Sized, A: Digest> {
127 current_offset: u64,
128 current_part_number: NonZeroUsize,
129 source_key: Option<SourceKey<A>>,
130 reader: R,
131 }
132
133 impl<R: AsyncRead + Debug + Unpin + Send + Sync + 'static, A: Digest> AsyncUnseekableDataSource<R, A> {
134 pub fn new(reader: R) -> Self {
136 Self(Arc::new(Mutex::new(AsyncUnseekableDataSourceInner {
137 reader,
138 current_offset: 0,
139 current_part_number: first_part_number(),
140 source_key: None,
141 })))
142 }
143 }
144
145 impl<R: AsyncRead + Debug + Unpin + Send + Sync + 'static, A: Digest> AsyncDataSource<A>
146 for AsyncUnseekableDataSource<R, A>
147 {
148 fn slice(&self, size: PartSize) -> BoxFuture<IoResult<Option<AsyncDataSourceReader>>> {
149 Box::pin(async move {
150 let mut buf = Vec::new();
151 let guard = &mut self.0.lock().await;
152 let have_read = (&mut guard.reader).take(size.as_u64()).read_to_end(&mut buf).await?;
153 if have_read > 0 {
154 let source_reader =
155 AsyncDataSourceReader::unseekable(guard.current_part_number, buf, guard.current_offset);
156 guard.current_offset += have_read as u64;
157 guard.current_part_number =
158 NonZeroUsize::new(guard.current_part_number.get() + 1).expect("Page number is too big");
159 Ok(Some(source_reader))
160 } else {
161 Ok(None)
162 }
163 })
164 }
165
166 #[inline]
167 fn reset(&self) -> BoxFuture<IoResult<()>> {
168 Box::pin(async move { Err(unsupported_reset_error()) })
169 }
170
171 #[inline]
172 fn source_key(&self) -> BoxFuture<IoResult<Option<SourceKey<A>>>> {
173 Box::pin(async move { Ok(self.0.lock().await.source_key.to_owned()) })
174 }
175
176 #[inline]
177 fn total_size(&self) -> BoxFuture<IoResult<Option<u64>>> {
178 Box::pin(future::ok(None))
179 }
180 }
181
182 impl<R: AsyncRead + Debug + Unpin + Send + Sync + 'static, A: Digest> Debug for AsyncUnseekableDataSourceInner<R, A> {
183 #[inline]
184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185 f.debug_struct("AsyncUnseekableDataSourceInner")
186 .field("reader", &self.reader)
187 .field("current_offset", &self.current_offset)
188 .field("current_part_number", &self.current_part_number)
189 .field("source_key", &self.source_key)
190 .finish()
191 }
192 }
193}
194
195#[cfg(feature = "async")]
196pub use async_unseekable::*;
197
198fn unsupported_reset_error() -> IoError {
199 IoError::new(IoErrorKind::Unsupported, "Cannot reset unseekable source")
200}