qiniu_upload_manager/data_source/
seekable.rs

1use super::{first_part_number, DataSource, DataSourceReader, PartSize, SourceKey};
2use digest::Digest;
3use qiniu_apis::http::Reset;
4use std::{
5    fmt::Debug,
6    io::{Read, Result as IoResult, Seek, SeekFrom},
7    num::NonZeroUsize,
8    sync::{Arc, Mutex},
9};
10
11#[derive(Debug)]
12struct SourceOffset {
13    offset: u64,
14    part_number: NonZeroUsize,
15}
16
17#[derive(Debug, Clone)]
18pub(crate) struct SeekableDataSource {
19    source: SeekableSource,
20    current: Arc<Mutex<SourceOffset>>,
21    size: u64,
22    original_offset: u64,
23}
24
25impl SeekableDataSource {
26    pub(crate) fn new(mut source: impl Read + Seek + Debug + Send + Sync + 'static, size: u64) -> IoResult<Self> {
27        let original_offset = source.stream_position()?;
28        Ok(Self {
29            size,
30            original_offset,
31            current: Arc::new(Mutex::new(SourceOffset {
32                offset: original_offset,
33                part_number: first_part_number(),
34            })),
35            source: SeekableSource::new(source, 0, 0),
36        })
37    }
38}
39
40impl<D: Digest> DataSource<D> for SeekableDataSource {
41    fn slice(&self, size: PartSize) -> IoResult<Option<DataSourceReader>> {
42        let mut cur = self.current.lock().unwrap();
43        if cur.offset < self.size {
44            let size = size.as_u64();
45            let source_reader = DataSourceReader::seekable(
46                cur.part_number,
47                self.source.clone_with_new_offset_and_length(cur.offset, size),
48            );
49            cur.offset += size;
50            cur.part_number = NonZeroUsize::new(cur.part_number.get() + 1).expect("Page number is too big");
51            Ok(Some(source_reader))
52        } else {
53            Ok(None)
54        }
55    }
56
57    #[inline]
58    fn reset(&self) -> IoResult<()> {
59        let mut cur = self.current.lock().unwrap();
60        cur.offset = self.original_offset;
61        cur.part_number = first_part_number();
62        Ok(())
63    }
64
65    #[inline]
66    fn total_size(&self) -> IoResult<Option<u64>> {
67        Ok(Some(self.size))
68    }
69
70    #[inline]
71    fn source_key(&self) -> IoResult<Option<SourceKey<D>>> {
72        Ok(None)
73    }
74}
75
76/// 可寻址的数据源
77///
78/// 用于表示一个分片,需要传入可寻址的数据源,以及分片的起始位置和长度
79#[derive(Debug, Clone)]
80pub struct SeekableSource {
81    source: Arc<Mutex<SeekableSourceInner<dyn ReadSeek>>>,
82    source_offset: u64,
83    offset: u64,
84    len: u64,
85}
86
87impl SeekableSource {
88    /// 创建可寻址的数据源
89    ///
90    /// 需要传入可寻址的数据源,以及分片的起始位置和长度
91    #[inline]
92    pub fn new(source: impl Read + Seek + Debug + Send + Sync + 'static, offset: u64, len: u64) -> Self {
93        Self {
94            source: Arc::new(Mutex::new(SeekableSourceInner::new(source))),
95            source_offset: 0,
96            offset,
97            len,
98        }
99    }
100
101    pub(super) fn clone_with_new_offset_and_length(&self, offset: u64, len: u64) -> Self {
102        let mut cloned = self.to_owned();
103        cloned.source_offset = 0;
104        cloned.offset = offset;
105        cloned.len = len;
106        cloned
107    }
108
109    pub(super) fn offset(&self) -> u64 {
110        self.offset
111    }
112
113    pub(super) fn len(&self) -> IoResult<u64> {
114        let mut locked = self.source.lock().unwrap();
115        let new_pos = locked.source.seek(SeekFrom::End(0))?;
116        if Some(new_pos) != locked.pos {
117            locked.pos = Some(new_pos);
118        }
119        Ok(self.len.min(new_pos - self.offset))
120    }
121}
122
123impl Read for SeekableSource {
124    fn read(&mut self, mut buf: &mut [u8]) -> IoResult<usize> {
125        let mut locked = self.source.lock().unwrap();
126        let max_read = self.len - self.source_offset;
127        if max_read == 0 {
128            return Ok(0);
129        } else if max_read < buf.len() as u64 {
130            let max_read: usize = max_read.try_into().unwrap_or(usize::MAX);
131            buf = &mut buf[..max_read];
132        }
133        let seek_pos = self.offset + self.source_offset;
134        if Some(seek_pos) != locked.pos {
135            locked.pos = Some(locked.source.seek(SeekFrom::Start(seek_pos))?);
136        }
137        let have_read = locked.source.read(buf)?;
138        self.source_offset += have_read as u64;
139        if let Some(ref mut pos) = locked.pos {
140            *pos += have_read as u64;
141        }
142        Ok(have_read)
143    }
144}
145
146impl Reset for SeekableSource {
147    #[inline]
148    fn reset(&mut self) -> IoResult<()> {
149        self.source_offset = 0;
150        Ok(())
151    }
152}
153
154trait ReadSeek: Read + Seek + Send + Sync + Debug {}
155impl<T: Read + Seek + Send + Sync + Debug> ReadSeek for T {}
156
157#[derive(Debug)]
158struct SeekableSourceInner<T: Read + Seek + Send + Sync + Debug + ?Sized> {
159    pos: Option<u64>,
160    source: T,
161}
162
163impl<T: Read + Seek + Send + Sync + Debug> SeekableSourceInner<T> {
164    fn new(source: T) -> Self {
165        Self { source, pos: None }
166    }
167}
168
169#[cfg(feature = "async")]
170mod async_reader {
171    use super::*;
172    use futures::{
173        future::{BoxFuture, FutureExt},
174        lock::Mutex,
175        ready, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Future,
176    };
177    use qiniu_apis::http::AsyncReset;
178    use std::{
179        fmt,
180        pin::Pin,
181        sync::atomic::{AtomicU64, Ordering::SeqCst},
182        task::{Context, Poll},
183    };
184
185    /// 可异步寻址的数据源
186    ///
187    /// 用于表示一个分片,需要传入可异步寻址的数据源,以及分片的起始位置和长度
188    #[derive(Debug)]
189    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
190    pub struct AsyncSeekableSource {
191        source: Arc<Mutex<AsyncSeekableSourceInner<dyn ReadSeek>>>,
192        source_offset: Arc<AtomicU64>,
193        offset: u64,
194        len: u64,
195        step: AsyncSeekableSourceReadStep,
196    }
197
198    #[derive(Debug)]
199    struct AsyncSeekableSourceInner<T: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin + ?Sized> {
200        pos: Option<u64>,
201        source: T,
202    }
203
204    impl<T: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin> AsyncSeekableSourceInner<T> {
205        fn new(source: T) -> Self {
206            Self { source, pos: None }
207        }
208    }
209
210    enum AsyncSeekableSourceReadStep {
211        Buffered {
212            buffer: Vec<u8>,
213            consumed: usize,
214        },
215        Waiting {
216            task: Pin<Box<dyn Future<Output = IoResult<Vec<u8>>> + Send + Sync + 'static>>,
217        },
218        Done,
219    }
220
221    impl Default for AsyncSeekableSourceReadStep {
222        #[inline]
223        fn default() -> Self {
224            Self::Buffered { buffer: Default::default(), consumed: Default::default() }
225        }
226    }
227
228    impl Debug for AsyncSeekableSourceReadStep {
229        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230            match self {
231                Self::Buffered { buffer, consumed } => f
232                    .debug_struct("Buffered")
233                    .field("buffer", buffer)
234                    .field("consumed", consumed)
235                    .finish(),
236                Self::Waiting { .. } => f.debug_struct("Waiting").finish(),
237                Self::Done => write!(f, "Done"),
238            }
239        }
240    }
241
242    impl AsyncSeekableSource {
243        /// 创建可异步寻址的数据源
244        ///
245        /// 需要传入可异步寻址的数据源,以及分片的起始位置和长度
246        #[inline]
247        pub fn new(
248            source: impl AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin + 'static,
249            offset: u64,
250            len: u64,
251        ) -> Self {
252            Self {
253                step: Default::default(),
254                source: Arc::new(Mutex::new(AsyncSeekableSourceInner::new(source))),
255                source_offset: Arc::new(AtomicU64::new(0)),
256                offset,
257                len,
258            }
259        }
260
261        pub(in super::super) fn clone_with_new_offset_and_length(&self, offset: u64, len: u64) -> Self {
262            Self {
263                step: Default::default(),
264                source: self.source.to_owned(),
265                source_offset: Arc::new(AtomicU64::new(0)),
266                offset,
267                len,
268            }
269        }
270
271        pub(in super::super) fn offset(&self) -> u64 {
272            self.offset
273        }
274
275        pub(in super::super) async fn len(&self) -> IoResult<u64> {
276            let mut locked = self.source.lock().await;
277            let new_pos = locked.source.seek(SeekFrom::End(0)).await?;
278            if Some(new_pos) != locked.pos {
279                locked.pos = Some(new_pos);
280            }
281            Ok(self.len.min(new_pos - self.offset))
282        }
283
284        fn poll_from_task(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<IoResult<usize>> {
285            match &mut self.step {
286                AsyncSeekableSourceReadStep::Waiting { task } => {
287                    let buffer = ready!(task.poll_unpin(cx))?;
288                    self.step = if buffer.is_empty() {
289                        AsyncSeekableSourceReadStep::Done
290                    } else {
291                        AsyncSeekableSourceReadStep::Buffered { buffer, consumed: 0 }
292                    };
293                    self.poll_read(cx, buf)
294                }
295                _ => unreachable!(),
296            }
297        }
298
299        fn poll_from_buffer(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<IoResult<usize>> {
300            match &mut self.step {
301                AsyncSeekableSourceReadStep::Buffered { buffer, consumed } => {
302                    let rested = buf.len().min(buffer.len() - *consumed);
303                    if rested > 0 {
304                        buf[..rested].copy_from_slice(&buffer[*consumed..(*consumed + rested)]);
305                        *consumed += rested;
306                        Poll::Ready(Ok(rested))
307                    } else {
308                        let buffer_request_size = buf.len().max(1 << 22);
309                        let source = self.source.to_owned();
310                        let source_offset = self.source_offset.to_owned();
311                        let len = self.len;
312                        let offset = self.offset;
313                        self.step = AsyncSeekableSourceReadStep::Waiting {
314                            task: Box::pin(async move {
315                                let mut locked = source.lock().await;
316                                let source_offset_value = source_offset.load(SeqCst);
317                                let max_read = len - source_offset_value;
318                                if max_read == 0 {
319                                    Ok(Vec::new())
320                                } else {
321                                    let max_read: usize = max_read.try_into().unwrap_or(usize::MAX);
322                                    let mut buffer = vec![0u8; buffer_request_size.min(max_read)];
323                                    let seek_pos = offset + source_offset_value;
324                                    if Some(seek_pos) != locked.pos {
325                                        locked.pos = Some(locked.source.seek(SeekFrom::Start(seek_pos)).await?);
326                                    }
327                                    let have_read = locked.source.read(&mut buffer).await?;
328                                    buffer.truncate(have_read);
329                                    let have_read = have_read as u64;
330                                    source_offset.fetch_add(have_read, SeqCst);
331                                    if let Some(ref mut pos) = locked.pos {
332                                        *pos += have_read;
333                                    }
334                                    Ok(buffer)
335                                }
336                            }),
337                        };
338                        self.poll_read(cx, buf)
339                    }
340                }
341                _ => unreachable!(),
342            }
343        }
344
345        fn poll_done(self: Pin<&mut Self>) -> Poll<IoResult<usize>> {
346            match &self.step {
347                AsyncSeekableSourceReadStep::Done => Poll::Ready(Ok(0)),
348                _ => unreachable!(),
349            }
350        }
351    }
352
353    impl AsyncRead for AsyncSeekableSource {
354        #[inline]
355        fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<IoResult<usize>> {
356            match self.step {
357                AsyncSeekableSourceReadStep::Waiting { .. } => self.poll_from_task(cx, buf),
358                AsyncSeekableSourceReadStep::Buffered { .. } => self.poll_from_buffer(cx, buf),
359                AsyncSeekableSourceReadStep::Done => self.poll_done(),
360            }
361        }
362    }
363
364    impl AsyncReset for AsyncSeekableSource {
365        #[inline]
366        fn reset(&mut self) -> BoxFuture<IoResult<()>> {
367            Box::pin(async move {
368                self.step = Default::default();
369                self.source_offset.store(0, SeqCst);
370                Ok(())
371            })
372        }
373    }
374
375    trait ReadSeek: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin {}
376    impl<T: AsyncRead + AsyncSeek + Debug + Send + Sync + Unpin> ReadSeek for T {}
377}
378
379#[cfg(feature = "async")]
380pub use async_reader::*;