cyfs_bdt/ndn/chunk/cache/
stream.rs

1use std::{
2    sync::{Arc, RwLock, Mutex}, 
3    ops::Range, 
4    io::SeekFrom, 
5    collections::BTreeMap
6};
7use async_std::{
8    task
9};
10use once_cell::sync::OnceCell;
11use cyfs_base::*;
12use cyfs_util::*;
13use crate::{
14    interface::udp::MTU, 
15    types::*
16};
17use super::super::super::{
18    types::*, 
19    channel::{protocol::v0::*}
20};
21use super::{
22    encode::*, 
23    raw_cache::*
24};
25
26
27struct StateImpl {
28    raw_cache: OnceCell<Box<dyn RawCache>>, 
29    pushed_len: usize, 
30    indices: IncomeIndexQueue, 
31    waiters: BTreeMap::<u32, StateWaiter>
32}
33
34struct CacheImpl {
35    chunk: ChunkId, 
36    state: RwLock<StateImpl>
37} 
38
39#[derive(Clone)]
40pub struct ChunkStreamCache(Arc<CacheImpl>);
41
42impl std::fmt::Display for CacheImpl {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "ChunkStreamCache{{chunk:{}}}", self.chunk)
45    }
46}
47
48impl Drop for CacheImpl {
49    fn drop(&mut self) {
50        info!("{} released", self);
51    }
52}
53
54impl std::fmt::Display for ChunkStreamCache {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        write!(f, "{}", self.0)
57    }
58}
59
60
61impl ChunkStreamCache {
62    pub fn new(chunk: &ChunkId) -> Self {
63        let end = PieceDesc::stream_end_index(chunk, PieceData::max_payload() as u32) + 1;
64        Self(Arc::new(CacheImpl {
65            chunk: chunk.clone(),
66            state: RwLock::new(StateImpl {
67                pushed_len: 0,
68                raw_cache: OnceCell::new(), 
69                indices: IncomeIndexQueue::new(end), 
70                waiters: BTreeMap::new()
71            })
72        }))
73    }
74
75    pub fn create_encoder(&self, desc: &ChunkCodecDesc) -> Box<dyn ChunkEncoder> {
76        SyncStreamEncoder::new(self.clone(), desc).clone_as_encoder()
77    }
78
79    pub fn loaded(&self) -> bool {
80        self.0.state.read().unwrap().raw_cache.get().is_some()
81    }
82
83    pub fn load(
84        &self, 
85        finished: bool, 
86        raw_cache: Box<dyn RawCache>, 
87    ) -> BuckyResult<()> {
88        info!("{} load finished:{}", self, finished);
89        let waiters = {
90            let mut state = self.0.state.write().unwrap();
91            match state.raw_cache.set(raw_cache) {
92                Ok(_) => {
93                    if finished {
94                        let end = PieceDesc::stream_end_index(self.chunk(), PieceData::max_payload() as u32) + 1;
95                        state.indices.push(0..end);
96                        let mut waiters = Default::default();
97                        std::mem::swap(&mut waiters, &mut state.waiters);
98                        Ok(waiters.into_values().collect())
99                    } else {
100                        Ok(vec![])
101                    }
102                },
103                Err(_) => Err(BuckyError::new(BuckyErrorCode::ErrorState, "loaded"))
104            }
105        }?;
106        
107        for waiter in waiters {
108            waiter.wake();
109        }
110
111        Ok(())
112    }
113
114    pub fn chunk(&self) -> &ChunkId {
115        &self.0.chunk
116    }
117
118    fn require_index(&self, desc: &ChunkCodecDesc) -> Option<(Option<u32>, Option<Vec<Range<u32>>>)> {
119        let (start, end, step) = desc.unwrap_as_stream();
120        self.0.state.read().unwrap().indices.require(start, end, step)
121    }
122
123    fn push_piece_data(&self, piece: &PieceData) -> BuckyResult<PushIndexResult> {
124        trace!("{} push piece data:{:?}", self, piece.desc);
125
126        let (index, range) = piece.desc.stream_piece_range(self.chunk());
127        let index_result = self.0.state.read().unwrap().indices.try_push(index..index + 1);
128        if !index_result.pushed() {
129            trace!("{} push piece data:{:?}, result:{:?}", self, piece.desc, index_result);
130            return Ok(index_result);
131        }
132
133        let mut writer = {
134            let state = self.0.state.read().unwrap();
135            state.raw_cache.get().unwrap().sync_writer()
136        }?;
137
138        if range.start == writer.seek(SeekFrom::Start(range.start))
139            .map_err(|err| {
140                trace!("{} push piece data:{:?}, result:{}", self, piece.desc, err);
141                err
142            })? {
143            let len = (range.end - range.start) as usize;
144            writer.write_all(&piece.data[..len]).map_err(|err| {
145                trace!("{} push piece data:{:?}, result:{}", self, piece.desc, err);
146                err
147            })?; 
148            let (result, waiter) = {
149                let mut state = self.0.state.write().unwrap();
150                let result = state.indices.push(index..index + 1);
151                if result.pushed() {
152                    state.pushed_len += len;
153                }
154                (result, state.waiters.remove(&index))
155            };
156            if let Some(waiter) = waiter {
157                waiter.wake();
158            }
159            trace!("{} push piece data:{:?}, result:{:?}", self, piece.desc, result);
160            Ok(result)
161        } else {
162            let err = BuckyError::new(BuckyErrorCode::InvalidInput, "len mismatch");
163            trace!("{} push piece data:{:?}, result:{}", self, piece.desc, err);
164            Err(err)
165        }
166    }
167
168    pub fn exists(&self, index: u32) -> BuckyResult<bool> {
169        self.0.state.read().unwrap().indices.exists(index)
170    }
171
172    pub fn len(&self) -> usize {
173        self.0.state.read().unwrap().pushed_len
174    }
175
176    pub async fn wait_exists<T: futures::Future<Output=BuckyError>>(&self, index: u32, abort: T) -> BuckyResult<()> {
177        trace!("{} wait_exists:{}", self, index);
178
179        let waiter = {
180            let mut state = self.0.state.write().unwrap();
181            match state.indices.exists(index) {
182                Ok(exists) => {
183                    if exists {
184                        return Ok(());
185                    }
186                }, 
187                Err(err) => {
188                    return Err(err); 
189                }
190            }
191
192            if let Some(waiters) = state.waiters.get_mut(&index) {
193                waiters.new_waiter()
194            } else {
195                let mut waiters = StateWaiter::new();
196                let waiter = waiters.new_waiter();
197                state.waiters.insert(index, waiters);
198                waiter
199            }
200        };
201        let result = StateWaiter::abort_wait(abort, waiter, || ()).await;
202        match &result {
203            Ok(_) => {
204                trace!("{} wait_exists:{} returned", self, index);
205            },
206            Err(err) => {
207                trace!("{} wait_exists:{} failed: {}", self, index, err);
208            }
209        }
210        
211        result
212    }
213
214    pub async fn async_read<T: futures::Future<Output=BuckyError>>(
215        &self, 
216        piece_desc: &PieceDesc, 
217        offset_in_piece: usize,  
218        buffer: &mut [u8], 
219        abort: T
220    ) -> BuckyResult<usize> {
221        trace!("{} async read:{:?}", self, piece_desc);
222
223        let (index, range) = piece_desc.stream_piece_range(self.chunk());
224        if self.wait_exists(index, abort).await.is_err() {
225            trace!("{} async read:{:?}, read:{}", self, piece_desc, 0);
226            return Ok(0);
227        }
228        let raw_cache = self.0.state.read().unwrap().raw_cache.get().unwrap().clone_as_raw_cache();
229        let mut reader = raw_cache.async_reader().await
230            .map_err(|err| {
231                trace!("{} async read:{:?}, read:{}", self, piece_desc, err);
232                err
233            })?;
234        use async_std::io::prelude::*;
235        let start = range.start + offset_in_piece as u64;
236        if start == reader.seek(SeekFrom::Start(start)).await.map_err(|err| {
237            trace!("{} async read:{:?}, read:{}", self, piece_desc, err);
238            err
239        })? {
240            let len = (range.end - start) as usize;
241            let len = len.min(buffer.len());
242            reader.read_exact(&mut buffer[..len]).await.map_err(|err| {
243                trace!("{} async read:{:?}, read:{}", self, piece_desc, err);
244                err
245            })?;
246            trace!("{} async read:{:?}, read:{}", self, piece_desc, len);
247            Ok(len)
248        } else {
249            let err = BuckyError::new(BuckyErrorCode::InvalidInput, "len mismatch");
250            trace!("{} async read:{:?}, read:{}", self, piece_desc, err);
251            Err(err)
252        }
253    }
254
255    pub fn sync_try_read(
256        &self, 
257        piece_desc: &PieceDesc, 
258        offset_in_piece: usize,  
259        buffer: &mut [u8]
260    ) -> BuckyResult<usize> {
261        trace!("{} sync_try_read desc: {:?},offset_in_piece: {}, buffer: {} ", self, piece_desc, offset_in_piece, buffer.len());
262
263        let (index, range) = piece_desc.stream_piece_range(self.chunk());
264        match self.exists(index) {
265            Ok(exists) => {
266                if !exists {
267                    trace!("{} sync_try_read not exists, desc: {:?},offset_in_piece: {}, buffer: {} ", self, piece_desc, offset_in_piece, buffer.len());
268                    return Err(BuckyError::new(BuckyErrorCode::NotFound, "not exists"));
269                }
270            }, 
271            Err(_) => {
272                trace!("{} sync_try_read exists 0, desc: {:?},offset_in_piece: {}, buffer: {} ", self, piece_desc, offset_in_piece, buffer.len());
273                return Ok(0);
274            }
275        }
276        let raw_cache = self.0.state.read().unwrap().raw_cache.get().unwrap().clone_as_raw_cache();
277        let mut reader = raw_cache.sync_reader()?;
278        use std::io::{Read, Seek};
279        let start = range.start + offset_in_piece as u64;
280        if start == reader.seek(SeekFrom::Start(start))? {
281            let len = (range.end - start) as usize;
282            let len = len.min(buffer.len());
283            reader.read_exact(&mut buffer[..len])
284                .map_err(|err| {
285                    trace!("{} sync_try_read {}, desc: {:?},offset_in_piece: {}, buffer: {} ", self, err, piece_desc, offset_in_piece, buffer.len());
286                    err
287                })?;
288            trace!("{} sync_try_read {}, desc: {:?},offset_in_piece: {}, buffer: {} ", self, len, piece_desc, offset_in_piece, buffer.len());
289            Ok(len)
290        } else {
291            trace!("{} sync_try_read invalid, desc: {:?},offset_in_piece: {}, buffer: {} ", self, piece_desc, offset_in_piece, buffer.len());
292            Err(BuckyError::new(BuckyErrorCode::InvalidInput, "len mismatch"))
293        }
294    }
295
296    fn raw_cache(&self) -> Option<Box<dyn RawCache>> {
297        self.0.state.read().unwrap().raw_cache.get().map(|c| c.clone_as_raw_cache())
298    }
299
300
301    async fn async_try_read(
302        &self, 
303        piece_desc: &PieceDesc, 
304        offset_in_piece: usize,  
305        buffer: &mut [u8]
306    ) -> BuckyResult<usize> {
307        let (index, range) = piece_desc.stream_piece_range(self.chunk());
308        match self.exists(index) {
309            Ok(exists) => {
310                if !exists {
311                    return Err(BuckyError::new(BuckyErrorCode::NotFound, "not exists"));
312                }
313            }, 
314            Err(_) => {
315                return Ok(0);
316            }
317        }
318        let raw_cache = self.0.state.read().unwrap().raw_cache.get().unwrap().clone_as_raw_cache();
319        let mut reader = raw_cache.async_reader().await?;
320        use async_std::io::prelude::*;
321        let start = range.start + offset_in_piece as u64;
322        if start == reader.seek(SeekFrom::Start(start)).await? {
323            let len = (range.end - start) as usize;
324            let len = len.min(buffer.len());
325            reader.read_exact(&mut buffer[..len]).await?;
326            Ok(len)
327        } else {
328            Err(BuckyError::new(BuckyErrorCode::InvalidInput, "len mismatch"))
329        }
330    }
331}
332
333
334
335struct DecoderImpl {
336    chunk: ChunkId, 
337    desc: ChunkCodecDesc,  
338    cache: ChunkStreamCache, 
339}
340
341#[derive(Clone)]
342pub struct StreamDecoder(Arc<DecoderImpl>);
343
344
345impl std::fmt::Display for StreamDecoder {
346    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347        write!(f, "StreamDecoder{{chunk:{}}}", self.chunk())
348    }
349}
350
351impl StreamDecoder {
352    pub fn new(
353        chunk: &ChunkId, 
354        desc: &ChunkCodecDesc, 
355        cache: ChunkStreamCache
356    ) -> Self {
357        Self(Arc::new(DecoderImpl {
358            chunk: chunk.clone(), 
359            desc: desc.clone(), 
360            cache, 
361        }))
362    }
363}
364
365impl ChunkDecoder for StreamDecoder {
366    fn clone_as_decoder(&self) -> Box<dyn ChunkDecoder> {
367        Box::new(self.clone())
368    }
369
370    fn chunk(&self) -> &ChunkId {
371        &self.0.chunk
372    }
373
374    fn desc(&self) -> &ChunkCodecDesc {
375        &self.0.desc
376    }
377
378    fn require_index(&self) -> Option<(Option<u32>, Option<Vec<Range<u32>>>)> {
379        self.0.cache.require_index(self.desc())
380    }
381
382    fn push_piece_data(&self, piece: &PieceData) -> BuckyResult<PushIndexResult> {
383        trace!("{} push piece desc {:?}", self, piece.desc);
384        let (start, end, _) = self.desc().unwrap_as_stream();
385        let (index, _) = piece.desc.unwrap_as_stream();
386        if index < start || index >= end {
387            return Ok(PushIndexResult {
388                valid: false, 
389                exists: false, 
390                finished: false
391            });
392        }
393
394        let result = self.0.cache.push_piece_data(piece)?;
395        if result.pushed() {
396            if self.0.cache.require_index(self.desc()).is_none() {
397                Ok(PushIndexResult { 
398                    valid: true, 
399                    exists: false,
400                    finished: true })
401            } else {
402                Ok(result)
403            }
404        } else {
405            Ok(result)
406        }
407    }
408
409}
410
411
412
413enum AsyncEncoderPendingState {
414    None, 
415    Pending(PieceDesc), 
416    // FIXME: may not allocated every time
417    Waiting(PieceDesc, BuckyResult<Vec<u8>>)
418}
419
420struct AsyncEncoderStateImpl {
421    pending: AsyncEncoderPendingState, 
422    indices: OutcomeIndexQueue, 
423}
424
425struct AsyncEncoderImpl {
426    desc: ChunkCodecDesc, 
427    cache: ChunkStreamCache,  
428    state: RwLock<AsyncEncoderStateImpl>
429}
430
431#[derive(Clone)]
432pub struct AsyncStreamEncoder(Arc<AsyncEncoderImpl>);
433
434impl std::fmt::Display for AsyncStreamEncoder {
435    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436        write!(f, "StreamEncoder{{chunk:{},desc:{:?}}}", self.chunk(), self.desc())
437    }
438}
439
440
441impl AsyncStreamEncoder {
442    pub fn new(
443        cache: ChunkStreamCache, 
444        desc: &ChunkCodecDesc
445    ) -> Self {
446        let (start, end, step) = desc.unwrap_as_stream();
447        Self(Arc::new(AsyncEncoderImpl {
448            desc: desc.clone(), 
449            cache, 
450            state: RwLock::new(AsyncEncoderStateImpl {
451                pending: AsyncEncoderPendingState::None, 
452                indices: OutcomeIndexQueue::new(start, end, step)
453            })
454        }))
455    }
456
457    fn cache(&self) -> &ChunkStreamCache {
458        &self.0.cache
459    }
460
461    async fn async_next_piece(&self, piece_desc: PieceDesc) {
462        let mut buffer = vec![0u8; MTU];
463        let result = self.cache().async_try_read(&piece_desc, 0, &mut buffer[..]).await;
464        let mut state = self.0.state.write().unwrap();
465        if let AsyncEncoderPendingState::Pending(pending_desc) = &state.pending {
466            if pending_desc.eq(&piece_desc) {
467                state.pending = AsyncEncoderPendingState::Waiting(piece_desc, result.map(|len| {
468                    buffer.truncate(len);
469                    buffer
470                }));
471            }
472        }
473    }
474}
475
476impl ChunkEncoder for AsyncStreamEncoder {
477    fn clone_as_encoder(&self) -> Box<dyn ChunkEncoder> {
478        Box::new(self.clone())
479    }
480
481    fn chunk(&self) -> &ChunkId {
482        self.cache().chunk()
483    }
484
485    fn desc(&self) -> &ChunkCodecDesc {
486        &self.0.desc
487    }
488
489    fn next_piece(&self, session_id: &TempSeq, buf: &mut [u8]) -> BuckyResult<usize> {
490        let mut state = self.0.state.write().unwrap();
491        match &mut state.pending {
492            AsyncEncoderPendingState::Pending(_) => Ok(0), 
493            AsyncEncoderPendingState::Waiting(piece_desc, _result) => {
494                let mut result = Err(BuckyError::new(BuckyErrorCode::Ok, ""));
495                std::mem::swap(&mut result, _result);
496                let piece_desc = piece_desc.clone(); 
497                state.pending = AsyncEncoderPendingState::None;
498                match result {
499                    Ok(buffer) => {
500                        let (index, _) = piece_desc.unwrap_as_stream();
501                        if state.indices.next() == Some(index) {
502                            let _ = state.indices.pop_next();
503                            let buf_len = buf.len();
504                            let buf = PieceData::encode_header(
505                                buf, 
506                                session_id,
507                                self.chunk(), 
508                                &piece_desc)?;
509                            let header_len = buf_len - buf.len();
510                            buf[..buffer.len()].copy_from_slice(&buffer[..]);
511                            let piece_len = header_len + buffer.len();
512                            Ok(piece_len)
513                        } else {
514                            Ok(0)
515                        }
516                    }, 
517                    Err(err) => {
518                        Err(err)
519                    }
520                }
521            }, 
522            AsyncEncoderPendingState::None => {
523                if let Some(index) = state.indices.next() {
524                    trace!("{} try pop next piece {}", self, index);
525                    if self.cache().exists(index)
526                        .map_err(|err| {
527                            error!("{} exists error {}", self, index);
528                            err
529                        }).unwrap() {
530                        let (_, _, step) = self.desc().unwrap_as_stream();
531                        let piece_desc = PieceDesc::Range(index, step.abs() as u16);
532                        let buf_len = buf.len();
533                        let buf = PieceData::encode_header(
534                            buf, 
535                            session_id,
536                            self.chunk(), 
537                            &piece_desc)?;
538                        let header_len = buf_len - buf.len();
539                        match self.cache().sync_try_read(&piece_desc, 0, buf) {
540                            Ok(len) => {
541                                let _ = state.indices.pop_next();
542                                trace!("{} pop next piece {:?}", self, piece_desc);
543                                Ok(header_len + len)
544                            }, 
545                            Err(err) => {
546                                if BuckyErrorCode::NotSupport == err.code() {
547                                    state.pending = AsyncEncoderPendingState::Pending(piece_desc.clone());
548                                    let encoder = self.clone();
549                                    task::spawn(async move {
550                                        encoder.async_next_piece(piece_desc).await;
551                                    });
552                                    Ok(0)
553                                } else if BuckyErrorCode::WouldBlock == err.code() {
554                                    Ok(0)
555                                } else {
556                                    Err(err)
557                                }
558                            }
559                        }
560                    } else {
561                        Ok(0)
562                    }
563                } else {
564                    Ok(0)
565                }
566            }
567        }
568    }
569
570    fn reset(&self) -> bool {
571        let mut state = self.0.state.write().unwrap();
572        if state.indices.reset() {
573            match &state.pending {
574                AsyncEncoderPendingState::Pending(next_desc) => {
575                    let (index, _) = next_desc.unwrap_as_stream();
576                    if state.indices.next() != Some(index) {
577                        state.pending = AsyncEncoderPendingState::None;
578                    }
579                },
580                AsyncEncoderPendingState::Waiting(next_desc, _) => {
581                    let (index, _) = next_desc.unwrap_as_stream();
582                    if state.indices.next() != Some(index) {
583                        state.pending = AsyncEncoderPendingState::None;
584                    }
585                },
586                _ => {}
587            }
588            true
589        } else {
590            false
591        }
592    }
593
594    fn merge(&self, max_index: u32, lost_index: Vec<Range<u32>>) -> bool {
595        let mut state = self.0.state.write().unwrap();
596        if state.indices.merge(max_index, lost_index) {
597            match &state.pending {
598                AsyncEncoderPendingState::Pending(next_desc) => {
599                    let (index, _) = next_desc.unwrap_as_stream();
600                    if state.indices.next() != Some(index) {
601                        state.pending = AsyncEncoderPendingState::None;
602                    }
603                },
604                AsyncEncoderPendingState::Waiting(next_desc, _) => {
605                    let (index, _) = next_desc.unwrap_as_stream();
606                    if state.indices.next() != Some(index) {
607                        state.pending = AsyncEncoderPendingState::None;
608                    }
609                },
610                _ => {}
611            }
612            true
613        } else {
614            false
615        }
616    }
617}
618
619
620
621
622
623struct SyncEncoderStateImpl {
624    reader: Option<Box<dyn SyncReadWithSeek + Send + Sync>>, 
625    indices: OutcomeIndexQueue, 
626}
627
628struct SyncEncoderImpl {
629    desc: ChunkCodecDesc, 
630    cache: ChunkStreamCache,  
631    state: Mutex<SyncEncoderStateImpl>
632}
633
634#[derive(Clone)]
635pub struct SyncStreamEncoder(Arc<SyncEncoderImpl>);
636
637impl std::fmt::Display for SyncStreamEncoder {
638    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
639        write!(f, "StreamEncoder{{chunk:{},desc:{:?}}}", self.chunk(), self.desc())
640    }
641}
642
643
644impl SyncStreamEncoder {
645    pub fn new(
646        cache: ChunkStreamCache, 
647        desc: &ChunkCodecDesc
648    ) -> Self {
649        let (start, end, step) = desc.unwrap_as_stream();
650        Self(Arc::new(SyncEncoderImpl {
651            desc: desc.clone(), 
652            cache, 
653            state: Mutex::new(SyncEncoderStateImpl {
654                reader: None, 
655                indices: OutcomeIndexQueue::new(start, end, step)
656            })
657        }))
658    }
659
660    fn cache(&self) -> &ChunkStreamCache {
661        &self.0.cache
662    }
663}
664
665impl ChunkEncoder for SyncStreamEncoder {
666    fn clone_as_encoder(&self) -> Box<dyn ChunkEncoder> {
667        Box::new(self.clone())
668    }
669
670    fn chunk(&self) -> &ChunkId {
671        self.cache().chunk()
672    }
673
674    fn desc(&self) -> &ChunkCodecDesc {
675        &self.0.desc
676    }
677
678    fn next_piece(&self, session_id: &TempSeq, buf: &mut [u8]) -> BuckyResult<usize> {
679        let mut state = self.0.state.lock().unwrap();
680        if let Some(index) = state.indices.next() {
681            if self.cache().exists(index)
682                .map_err(|err| {
683                    error!("{} exists error {}", self, index);
684                    err
685                }).unwrap() {
686                let (_, _, step) = self.desc().unwrap_as_stream();
687                let piece_desc = PieceDesc::Range(index, step.abs() as u16);
688                let buf_len = buf.len();
689                let buf = PieceData::encode_header(
690                    buf, 
691                    session_id,
692                    self.chunk(), 
693                    &piece_desc)?;
694                let header_len = buf_len - buf.len();
695                if state.reader.is_none() {
696                    let raw_cache = self.cache().raw_cache().unwrap();
697                    match raw_cache.sync_reader() {
698                        Ok(reader) => {
699                            state.reader = Some(reader);
700                        },
701                        Err(err) => {
702                            let ret = if BuckyErrorCode::WouldBlock == err.code() {
703                                Ok(0)
704                            } else {
705                                Err(err)
706                            };
707                            return ret;
708                        }
709                    }
710                }
711                let reader = state.reader.as_mut().unwrap();
712                let (_, range) = piece_desc.stream_piece_range(self.chunk());
713                use std::io::{Read, Seek};
714
715                let start = range.start;
716                if start == reader.seek(SeekFrom::Start(start))? {
717                    let len = (range.end - start) as usize;
718                    let len = len.min(buf.len());
719                    reader.read_exact(&mut buf[..len])
720                        .map_err(|err| {
721                            trace!("{} sync_try_read {}, desc: {:?}, buffer: {} ", self, err, piece_desc, buf.len());
722                            err
723                        })?;
724                    trace!("{} sync_try_read {}, desc: {:?},buffer: {} ", self, len, piece_desc, buf.len());
725                    let _ = state.indices.pop_next();
726                    trace!("{} pop next piece {:?}", self, piece_desc);
727                    Ok(header_len + len)
728                } else {
729                    trace!("{} sync_try_read invalid, desc: {:?}, buffer: {} ", self, piece_desc, buf.len());
730                    Err(BuckyError::new(BuckyErrorCode::InvalidInput, "len mismatch"))
731                }
732            } else {
733                Ok(0)
734            }
735        } else {
736            Ok(0)
737        }
738    }
739
740    fn reset(&self) -> bool {
741        self.0.state.lock().unwrap().indices.reset()
742    }
743
744    fn merge(&self, max_index: u32, lost_index: Vec<Range<u32>>) -> bool {
745        self.0.state.lock().unwrap().indices.merge(max_index, lost_index)
746    }
747}