qiniu_upload_manager/data_source/
seekable.rs1use super::{first_part_number, DataSource, DataSourceReader, PartSize, SourceKey};
2use digest::Digest;
3use qiniu_apis::http::Reset;
4use std::{
5 fmt::Debug,
6 io::{Read, Result as IoResult, Seek, SeekFrom},
7 num::NonZeroUsize,
8 sync::{Arc, Mutex},
9};
10
11#[derive(Debug)]
12struct SourceOffset {
13 offset: u64,
14 part_number: NonZeroUsize,
15}
16
17#[derive(Debug, Clone)]
18pub(crate) struct SeekableDataSource {
19 source: SeekableSource,
20 current: Arc<Mutex<SourceOffset>>,
21 size: u64,
22 original_offset: u64,
23}
24
25impl SeekableDataSource {
26 pub(crate) fn new(mut source: impl Read + Seek + Debug + Send + Sync + 'static, size: u64) -> IoResult<Self> {
27 let original_offset = source.stream_position()?;
28 Ok(Self {
29 size,
30 original_offset,
31 current: Arc::new(Mutex::new(SourceOffset {
32 offset: original_offset,
33 part_number: first_part_number(),
34 })),
35 source: SeekableSource::new(source, 0, 0),
36 })
37 }
38}
39
40impl<D: Digest> DataSource<D> for SeekableDataSource {
41 fn slice(&self, size: PartSize) -> IoResult<Option<DataSourceReader>> {
42 let mut cur = self.current.lock().unwrap();
43 if cur.offset < self.size {
44 let size = size.as_u64();
45 let source_reader = DataSourceReader::seekable(
46 cur.part_number,
47 self.source.clone_with_new_offset_and_length(cur.offset, size),
48 );
49 cur.offset += size;
50 cur.part_number = NonZeroUsize::new(cur.part_number.get() + 1).expect("Page number is too big");
51 Ok(Some(source_reader))
52 } else {
53 Ok(None)
54 }
55 }
56
57 #[inline]
58 fn reset(&self) -> IoResult<()> {
59 let mut cur = self.current.lock().unwrap();
60 cur.offset = self.original_offset;
61 cur.part_number = first_part_number();
62 Ok(())
63 }
64
65 #[inline]
66 fn total_size(&self) -> IoResult<Option<u64>> {
67 Ok(Some(self.size))
68 }
69
70 #[inline]
71 fn source_key(&self) -> IoResult<Option<SourceKey<D>>> {
72 Ok(None)
73 }
74}
75
76#[derive(Debug, Clone)]
80pub struct SeekableSource {
81 source: Arc<Mutex<SeekableSourceInner<dyn ReadSeek>>>,
82 source_offset: u64,
83 offset: u64,
84 len: u64,
85}
86
87impl SeekableSource {
88 #[inline]
92 pub fn new(source: impl Read + Seek + Debug + Send + Sync + 'static, offset: u64, len: u64) -> Self {
93 Self {
94 source: Arc::new(Mutex::new(SeekableSourceInner::new(source))),
95 source_offset: 0,
96 offset,
97 len,
98 }
99 }
100
101 pub(super) fn clone_with_new_offset_and_length(&self, offset: u64, len: u64) -> Self {
102 let mut cloned = self.to_owned();
103 cloned.source_offset = 0;
104 cloned.offset = offset;
105 cloned.len = len;
106 cloned
107 }
108
109 pub(super) fn offset(&self) -> u64 {
110 self.offset
111 }
112
113 pub(super) fn len(&self) -> IoResult<u64> {
114 let mut locked = self.source.lock().unwrap();
115 let new_pos = locked.source.seek(SeekFrom::End(0))?;
116 if Some(new_pos) != locked.pos {
117 locked.pos = Some(new_pos);
118 }
119 Ok(self.len.min(new_pos - self.offset))
120 }
121}
122
123impl Read for SeekableSource {
124 fn read(&mut self, mut buf: &mut [u8]) -> IoResult<usize> {
125 let mut locked = self.source.lock().unwrap();
126 let max_read = self.len - self.source_offset;
127 if max_read == 0 {
128 return Ok(0);
129 } else if max_read < buf.len() as u64 {
130 let max_read: usize = max_read.try_into().unwrap_or(usize::MAX);
131 buf = &mut buf[..max_read];
132 }
133 let seek_pos = self.offset + self.source_offset;
134 if Some(seek_pos) != locked.pos {
135 locked.pos = Some(locked.source.seek(SeekFrom::Start(seek_pos))?);
136 }
137 let have_read = locked.source.read(buf)?;
138 self.source_offset += have_read as u64;
139 if let Some(ref mut pos) = locked.pos {
140 *pos += have_read as u64;
141 }
142 Ok(have_read)
143 }
144}
145
146impl Reset for SeekableSource {
147 #[inline]
148 fn reset(&mut self) -> IoResult<()> {
149 self.source_offset = 0;
150 Ok(())
151 }
152}
153
154trait ReadSeek: Read + Seek + Send + Sync + Debug {}
155impl<T: Read + Seek + Send + Sync + Debug> ReadSeek for T {}
156
157#[derive(Debug)]
158struct SeekableSourceInner<T: Read + Seek + Send + Sync + Debug + ?Sized> {
159 pos: Option<u64>,
160 source: T,
161}
162
163impl<T: Read + Seek + Send + Sync + Debug> SeekableSourceInner<T> {
164 fn new(source: T) -> Self {
165 Self { source, pos: None }
166 }
167}
168
169#[cfg(feature = "async")]
170mod async_reader {
171 use super::*;
172 use futures::{
173 future::{BoxFuture, FutureExt},
174 lock::Mutex,
175 ready, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Future,
176 };
177 use qiniu_apis::http::AsyncReset;
178 use std::{
179 fmt,
180 pin::Pin,
181 sync::atomic::{AtomicU64, Ordering::SeqCst},
182 task::{Context, Poll},
183 };
184
185 #[derive(Debug)]
189 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
190 pub struct AsyncSeekableSource {
191 source: Arc<Mutex<AsyncSeekableSourceInner<dyn ReadSeek>>>,
192 source_offset: Arc<AtomicU64>,
193 offset: u64,
194 len: u64,
195 step: AsyncSeekableSourceReadStep,
196 }
197
198 #[derive(Debug)]
199 struct AsyncSeekableSourceInner<T: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin + ?Sized> {
200 pos: Option<u64>,
201 source: T,
202 }
203
204 impl<T: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin> AsyncSeekableSourceInner<T> {
205 fn new(source: T) -> Self {
206 Self { source, pos: None }
207 }
208 }
209
210 enum AsyncSeekableSourceReadStep {
211 Buffered {
212 buffer: Vec<u8>,
213 consumed: usize,
214 },
215 Waiting {
216 task: Pin<Box<dyn Future<Output = IoResult<Vec<u8>>> + Send + Sync + 'static>>,
217 },
218 Done,
219 }
220
221 impl Default for AsyncSeekableSourceReadStep {
222 #[inline]
223 fn default() -> Self {
224 Self::Buffered { buffer: Default::default(), consumed: Default::default() }
225 }
226 }
227
228 impl Debug for AsyncSeekableSourceReadStep {
229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 match self {
231 Self::Buffered { buffer, consumed } => f
232 .debug_struct("Buffered")
233 .field("buffer", buffer)
234 .field("consumed", consumed)
235 .finish(),
236 Self::Waiting { .. } => f.debug_struct("Waiting").finish(),
237 Self::Done => write!(f, "Done"),
238 }
239 }
240 }
241
242 impl AsyncSeekableSource {
243 #[inline]
247 pub fn new(
248 source: impl AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin + 'static,
249 offset: u64,
250 len: u64,
251 ) -> Self {
252 Self {
253 step: Default::default(),
254 source: Arc::new(Mutex::new(AsyncSeekableSourceInner::new(source))),
255 source_offset: Arc::new(AtomicU64::new(0)),
256 offset,
257 len,
258 }
259 }
260
261 pub(in super::super) fn clone_with_new_offset_and_length(&self, offset: u64, len: u64) -> Self {
262 Self {
263 step: Default::default(),
264 source: self.source.to_owned(),
265 source_offset: Arc::new(AtomicU64::new(0)),
266 offset,
267 len,
268 }
269 }
270
271 pub(in super::super) fn offset(&self) -> u64 {
272 self.offset
273 }
274
275 pub(in super::super) async fn len(&self) -> IoResult<u64> {
276 let mut locked = self.source.lock().await;
277 let new_pos = locked.source.seek(SeekFrom::End(0)).await?;
278 if Some(new_pos) != locked.pos {
279 locked.pos = Some(new_pos);
280 }
281 Ok(self.len.min(new_pos - self.offset))
282 }
283
284 fn poll_from_task(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<IoResult<usize>> {
285 match &mut self.step {
286 AsyncSeekableSourceReadStep::Waiting { task } => {
287 let buffer = ready!(task.poll_unpin(cx))?;
288 self.step = if buffer.is_empty() {
289 AsyncSeekableSourceReadStep::Done
290 } else {
291 AsyncSeekableSourceReadStep::Buffered { buffer, consumed: 0 }
292 };
293 self.poll_read(cx, buf)
294 }
295 _ => unreachable!(),
296 }
297 }
298
299 fn poll_from_buffer(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<IoResult<usize>> {
300 match &mut self.step {
301 AsyncSeekableSourceReadStep::Buffered { buffer, consumed } => {
302 let rested = buf.len().min(buffer.len() - *consumed);
303 if rested > 0 {
304 buf[..rested].copy_from_slice(&buffer[*consumed..(*consumed + rested)]);
305 *consumed += rested;
306 Poll::Ready(Ok(rested))
307 } else {
308 let buffer_request_size = buf.len().max(1 << 22);
309 let source = self.source.to_owned();
310 let source_offset = self.source_offset.to_owned();
311 let len = self.len;
312 let offset = self.offset;
313 self.step = AsyncSeekableSourceReadStep::Waiting {
314 task: Box::pin(async move {
315 let mut locked = source.lock().await;
316 let source_offset_value = source_offset.load(SeqCst);
317 let max_read = len - source_offset_value;
318 if max_read == 0 {
319 Ok(Vec::new())
320 } else {
321 let max_read: usize = max_read.try_into().unwrap_or(usize::MAX);
322 let mut buffer = vec![0u8; buffer_request_size.min(max_read)];
323 let seek_pos = offset + source_offset_value;
324 if Some(seek_pos) != locked.pos {
325 locked.pos = Some(locked.source.seek(SeekFrom::Start(seek_pos)).await?);
326 }
327 let have_read = locked.source.read(&mut buffer).await?;
328 buffer.truncate(have_read);
329 let have_read = have_read as u64;
330 source_offset.fetch_add(have_read, SeqCst);
331 if let Some(ref mut pos) = locked.pos {
332 *pos += have_read;
333 }
334 Ok(buffer)
335 }
336 }),
337 };
338 self.poll_read(cx, buf)
339 }
340 }
341 _ => unreachable!(),
342 }
343 }
344
345 fn poll_done(self: Pin<&mut Self>) -> Poll<IoResult<usize>> {
346 match &self.step {
347 AsyncSeekableSourceReadStep::Done => Poll::Ready(Ok(0)),
348 _ => unreachable!(),
349 }
350 }
351 }
352
353 impl AsyncRead for AsyncSeekableSource {
354 #[inline]
355 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<IoResult<usize>> {
356 match self.step {
357 AsyncSeekableSourceReadStep::Waiting { .. } => self.poll_from_task(cx, buf),
358 AsyncSeekableSourceReadStep::Buffered { .. } => self.poll_from_buffer(cx, buf),
359 AsyncSeekableSourceReadStep::Done => self.poll_done(),
360 }
361 }
362 }
363
364 impl AsyncReset for AsyncSeekableSource {
365 #[inline]
366 fn reset(&mut self) -> BoxFuture<IoResult<()>> {
367 Box::pin(async move {
368 self.step = Default::default();
369 self.source_offset.store(0, SeqCst);
370 Ok(())
371 })
372 }
373 }
374
375 trait ReadSeek: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin {}
376 impl<T: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin> ReadSeek for T {}
377}
378
379#[cfg(feature = "async")]
380pub use async_reader::*;