1use std::{
2 sync::{Arc, RwLock, Mutex},
3 ops::Range,
4 io::SeekFrom,
5 collections::BTreeMap
6};
7use async_std::{
8 task
9};
10use once_cell::sync::OnceCell;
11use cyfs_base::*;
12use cyfs_util::*;
13use crate::{
14 interface::udp::MTU,
15 types::*
16};
17use super::super::super::{
18 types::*,
19 channel::{protocol::v0::*}
20};
21use super::{
22 encode::*,
23 raw_cache::*
24};
25
26
27struct StateImpl {
28 raw_cache: OnceCell<Box<dyn RawCache>>,
29 pushed_len: usize,
30 indices: IncomeIndexQueue,
31 waiters: BTreeMap::<u32, StateWaiter>
32}
33
34struct CacheImpl {
35 chunk: ChunkId,
36 state: RwLock<StateImpl>
37}
38
39#[derive(Clone)]
40pub struct ChunkStreamCache(Arc<CacheImpl>);
41
42impl std::fmt::Display for CacheImpl {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "ChunkStreamCache{{chunk:{}}}", self.chunk)
45 }
46}
47
48impl Drop for CacheImpl {
49 fn drop(&mut self) {
50 info!("{} released", self);
51 }
52}
53
54impl std::fmt::Display for ChunkStreamCache {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 write!(f, "{}", self.0)
57 }
58}
59
60
61impl ChunkStreamCache {
62 pub fn new(chunk: &ChunkId) -> Self {
63 let end = PieceDesc::stream_end_index(chunk, PieceData::max_payload() as u32) + 1;
64 Self(Arc::new(CacheImpl {
65 chunk: chunk.clone(),
66 state: RwLock::new(StateImpl {
67 pushed_len: 0,
68 raw_cache: OnceCell::new(),
69 indices: IncomeIndexQueue::new(end),
70 waiters: BTreeMap::new()
71 })
72 }))
73 }
74
75 pub fn create_encoder(&self, desc: &ChunkCodecDesc) -> Box<dyn ChunkEncoder> {
76 SyncStreamEncoder::new(self.clone(), desc).clone_as_encoder()
77 }
78
79 pub fn loaded(&self) -> bool {
80 self.0.state.read().unwrap().raw_cache.get().is_some()
81 }
82
83 pub fn load(
84 &self,
85 finished: bool,
86 raw_cache: Box<dyn RawCache>,
87 ) -> BuckyResult<()> {
88 info!("{} load finished:{}", self, finished);
89 let waiters = {
90 let mut state = self.0.state.write().unwrap();
91 match state.raw_cache.set(raw_cache) {
92 Ok(_) => {
93 if finished {
94 let end = PieceDesc::stream_end_index(self.chunk(), PieceData::max_payload() as u32) + 1;
95 state.indices.push(0..end);
96 let mut waiters = Default::default();
97 std::mem::swap(&mut waiters, &mut state.waiters);
98 Ok(waiters.into_values().collect())
99 } else {
100 Ok(vec![])
101 }
102 },
103 Err(_) => Err(BuckyError::new(BuckyErrorCode::ErrorState, "loaded"))
104 }
105 }?;
106
107 for waiter in waiters {
108 waiter.wake();
109 }
110
111 Ok(())
112 }
113
114 pub fn chunk(&self) -> &ChunkId {
115 &self.0.chunk
116 }
117
118 fn require_index(&self, desc: &ChunkCodecDesc) -> Option<(Option<u32>, Option<Vec<Range<u32>>>)> {
119 let (start, end, step) = desc.unwrap_as_stream();
120 self.0.state.read().unwrap().indices.require(start, end, step)
121 }
122
123 fn push_piece_data(&self, piece: &PieceData) -> BuckyResult<PushIndexResult> {
124 trace!("{} push piece data:{:?}", self, piece.desc);
125
126 let (index, range) = piece.desc.stream_piece_range(self.chunk());
127 let index_result = self.0.state.read().unwrap().indices.try_push(index..index + 1);
128 if !index_result.pushed() {
129 trace!("{} push piece data:{:?}, result:{:?}", self, piece.desc, index_result);
130 return Ok(index_result);
131 }
132
133 let mut writer = {
134 let state = self.0.state.read().unwrap();
135 state.raw_cache.get().unwrap().sync_writer()
136 }?;
137
138 if range.start == writer.seek(SeekFrom::Start(range.start))
139 .map_err(|err| {
140 trace!("{} push piece data:{:?}, result:{}", self, piece.desc, err);
141 err
142 })? {
143 let len = (range.end - range.start) as usize;
144 writer.write_all(&piece.data[..len]).map_err(|err| {
145 trace!("{} push piece data:{:?}, result:{}", self, piece.desc, err);
146 err
147 })?;
148 let (result, waiter) = {
149 let mut state = self.0.state.write().unwrap();
150 let result = state.indices.push(index..index + 1);
151 if result.pushed() {
152 state.pushed_len += len;
153 }
154 (result, state.waiters.remove(&index))
155 };
156 if let Some(waiter) = waiter {
157 waiter.wake();
158 }
159 trace!("{} push piece data:{:?}, result:{:?}", self, piece.desc, result);
160 Ok(result)
161 } else {
162 let err = BuckyError::new(BuckyErrorCode::InvalidInput, "len mismatch");
163 trace!("{} push piece data:{:?}, result:{}", self, piece.desc, err);
164 Err(err)
165 }
166 }
167
168 pub fn exists(&self, index: u32) -> BuckyResult<bool> {
169 self.0.state.read().unwrap().indices.exists(index)
170 }
171
172 pub fn len(&self) -> usize {
173 self.0.state.read().unwrap().pushed_len
174 }
175
176 pub async fn wait_exists<T: futures::Future<Output=BuckyError>>(&self, index: u32, abort: T) -> BuckyResult<()> {
177 trace!("{} wait_exists:{}", self, index);
178
179 let waiter = {
180 let mut state = self.0.state.write().unwrap();
181 match state.indices.exists(index) {
182 Ok(exists) => {
183 if exists {
184 return Ok(());
185 }
186 },
187 Err(err) => {
188 return Err(err);
189 }
190 }
191
192 if let Some(waiters) = state.waiters.get_mut(&index) {
193 waiters.new_waiter()
194 } else {
195 let mut waiters = StateWaiter::new();
196 let waiter = waiters.new_waiter();
197 state.waiters.insert(index, waiters);
198 waiter
199 }
200 };
201 let result = StateWaiter::abort_wait(abort, waiter, || ()).await;
202 match &result {
203 Ok(_) => {
204 trace!("{} wait_exists:{} returned", self, index);
205 },
206 Err(err) => {
207 trace!("{} wait_exists:{} failed: {}", self, index, err);
208 }
209 }
210
211 result
212 }
213
214 pub async fn async_read<T: futures::Future<Output=BuckyError>>(
215 &self,
216 piece_desc: &PieceDesc,
217 offset_in_piece: usize,
218 buffer: &mut [u8],
219 abort: T
220 ) -> BuckyResult<usize> {
221 trace!("{} async read:{:?}", self, piece_desc);
222
223 let (index, range) = piece_desc.stream_piece_range(self.chunk());
224 if self.wait_exists(index, abort).await.is_err() {
225 trace!("{} async read:{:?}, read:{}", self, piece_desc, 0);
226 return Ok(0);
227 }
228 let raw_cache = self.0.state.read().unwrap().raw_cache.get().unwrap().clone_as_raw_cache();
229 let mut reader = raw_cache.async_reader().await
230 .map_err(|err| {
231 trace!("{} async read:{:?}, read:{}", self, piece_desc, err);
232 err
233 })?;
234 use async_std::io::prelude::*;
235 let start = range.start + offset_in_piece as u64;
236 if start == reader.seek(SeekFrom::Start(start)).await.map_err(|err| {
237 trace!("{} async read:{:?}, read:{}", self, piece_desc, err);
238 err
239 })? {
240 let len = (range.end - start) as usize;
241 let len = len.min(buffer.len());
242 reader.read_exact(&mut buffer[..len]).await.map_err(|err| {
243 trace!("{} async read:{:?}, read:{}", self, piece_desc, err);
244 err
245 })?;
246 trace!("{} async read:{:?}, read:{}", self, piece_desc, len);
247 Ok(len)
248 } else {
249 let err = BuckyError::new(BuckyErrorCode::InvalidInput, "len mismatch");
250 trace!("{} async read:{:?}, read:{}", self, piece_desc, err);
251 Err(err)
252 }
253 }
254
255 pub fn sync_try_read(
256 &self,
257 piece_desc: &PieceDesc,
258 offset_in_piece: usize,
259 buffer: &mut [u8]
260 ) -> BuckyResult<usize> {
261 trace!("{} sync_try_read desc: {:?},offset_in_piece: {}, buffer: {} ", self, piece_desc, offset_in_piece, buffer.len());
262
263 let (index, range) = piece_desc.stream_piece_range(self.chunk());
264 match self.exists(index) {
265 Ok(exists) => {
266 if !exists {
267 trace!("{} sync_try_read not exists, desc: {:?},offset_in_piece: {}, buffer: {} ", self, piece_desc, offset_in_piece, buffer.len());
268 return Err(BuckyError::new(BuckyErrorCode::NotFound, "not exists"));
269 }
270 },
271 Err(_) => {
272 trace!("{} sync_try_read exists 0, desc: {:?},offset_in_piece: {}, buffer: {} ", self, piece_desc, offset_in_piece, buffer.len());
273 return Ok(0);
274 }
275 }
276 let raw_cache = self.0.state.read().unwrap().raw_cache.get().unwrap().clone_as_raw_cache();
277 let mut reader = raw_cache.sync_reader()?;
278 use std::io::{Read, Seek};
279 let start = range.start + offset_in_piece as u64;
280 if start == reader.seek(SeekFrom::Start(start))? {
281 let len = (range.end - start) as usize;
282 let len = len.min(buffer.len());
283 reader.read_exact(&mut buffer[..len])
284 .map_err(|err| {
285 trace!("{} sync_try_read {}, desc: {:?},offset_in_piece: {}, buffer: {} ", self, err, piece_desc, offset_in_piece, buffer.len());
286 err
287 })?;
288 trace!("{} sync_try_read {}, desc: {:?},offset_in_piece: {}, buffer: {} ", self, len, piece_desc, offset_in_piece, buffer.len());
289 Ok(len)
290 } else {
291 trace!("{} sync_try_read invalid, desc: {:?},offset_in_piece: {}, buffer: {} ", self, piece_desc, offset_in_piece, buffer.len());
292 Err(BuckyError::new(BuckyErrorCode::InvalidInput, "len mismatch"))
293 }
294 }
295
296 fn raw_cache(&self) -> Option<Box<dyn RawCache>> {
297 self.0.state.read().unwrap().raw_cache.get().map(|c| c.clone_as_raw_cache())
298 }
299
300
301 async fn async_try_read(
302 &self,
303 piece_desc: &PieceDesc,
304 offset_in_piece: usize,
305 buffer: &mut [u8]
306 ) -> BuckyResult<usize> {
307 let (index, range) = piece_desc.stream_piece_range(self.chunk());
308 match self.exists(index) {
309 Ok(exists) => {
310 if !exists {
311 return Err(BuckyError::new(BuckyErrorCode::NotFound, "not exists"));
312 }
313 },
314 Err(_) => {
315 return Ok(0);
316 }
317 }
318 let raw_cache = self.0.state.read().unwrap().raw_cache.get().unwrap().clone_as_raw_cache();
319 let mut reader = raw_cache.async_reader().await?;
320 use async_std::io::prelude::*;
321 let start = range.start + offset_in_piece as u64;
322 if start == reader.seek(SeekFrom::Start(start)).await? {
323 let len = (range.end - start) as usize;
324 let len = len.min(buffer.len());
325 reader.read_exact(&mut buffer[..len]).await?;
326 Ok(len)
327 } else {
328 Err(BuckyError::new(BuckyErrorCode::InvalidInput, "len mismatch"))
329 }
330 }
331}
332
333
334
335struct DecoderImpl {
336 chunk: ChunkId,
337 desc: ChunkCodecDesc,
338 cache: ChunkStreamCache,
339}
340
341#[derive(Clone)]
342pub struct StreamDecoder(Arc<DecoderImpl>);
343
344
345impl std::fmt::Display for StreamDecoder {
346 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347 write!(f, "StreamDecoder{{chunk:{}}}", self.chunk())
348 }
349}
350
351impl StreamDecoder {
352 pub fn new(
353 chunk: &ChunkId,
354 desc: &ChunkCodecDesc,
355 cache: ChunkStreamCache
356 ) -> Self {
357 Self(Arc::new(DecoderImpl {
358 chunk: chunk.clone(),
359 desc: desc.clone(),
360 cache,
361 }))
362 }
363}
364
365impl ChunkDecoder for StreamDecoder {
366 fn clone_as_decoder(&self) -> Box<dyn ChunkDecoder> {
367 Box::new(self.clone())
368 }
369
370 fn chunk(&self) -> &ChunkId {
371 &self.0.chunk
372 }
373
374 fn desc(&self) -> &ChunkCodecDesc {
375 &self.0.desc
376 }
377
378 fn require_index(&self) -> Option<(Option<u32>, Option<Vec<Range<u32>>>)> {
379 self.0.cache.require_index(self.desc())
380 }
381
382 fn push_piece_data(&self, piece: &PieceData) -> BuckyResult<PushIndexResult> {
383 trace!("{} push piece desc {:?}", self, piece.desc);
384 let (start, end, _) = self.desc().unwrap_as_stream();
385 let (index, _) = piece.desc.unwrap_as_stream();
386 if index < start || index >= end {
387 return Ok(PushIndexResult {
388 valid: false,
389 exists: false,
390 finished: false
391 });
392 }
393
394 let result = self.0.cache.push_piece_data(piece)?;
395 if result.pushed() {
396 if self.0.cache.require_index(self.desc()).is_none() {
397 Ok(PushIndexResult {
398 valid: true,
399 exists: false,
400 finished: true })
401 } else {
402 Ok(result)
403 }
404 } else {
405 Ok(result)
406 }
407 }
408
409}
410
411
412
413enum AsyncEncoderPendingState {
414 None,
415 Pending(PieceDesc),
416 Waiting(PieceDesc, BuckyResult<Vec<u8>>)
418}
419
420struct AsyncEncoderStateImpl {
421 pending: AsyncEncoderPendingState,
422 indices: OutcomeIndexQueue,
423}
424
425struct AsyncEncoderImpl {
426 desc: ChunkCodecDesc,
427 cache: ChunkStreamCache,
428 state: RwLock<AsyncEncoderStateImpl>
429}
430
431#[derive(Clone)]
432pub struct AsyncStreamEncoder(Arc<AsyncEncoderImpl>);
433
434impl std::fmt::Display for AsyncStreamEncoder {
435 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436 write!(f, "StreamEncoder{{chunk:{},desc:{:?}}}", self.chunk(), self.desc())
437 }
438}
439
440
441impl AsyncStreamEncoder {
442 pub fn new(
443 cache: ChunkStreamCache,
444 desc: &ChunkCodecDesc
445 ) -> Self {
446 let (start, end, step) = desc.unwrap_as_stream();
447 Self(Arc::new(AsyncEncoderImpl {
448 desc: desc.clone(),
449 cache,
450 state: RwLock::new(AsyncEncoderStateImpl {
451 pending: AsyncEncoderPendingState::None,
452 indices: OutcomeIndexQueue::new(start, end, step)
453 })
454 }))
455 }
456
457 fn cache(&self) -> &ChunkStreamCache {
458 &self.0.cache
459 }
460
461 async fn async_next_piece(&self, piece_desc: PieceDesc) {
462 let mut buffer = vec![0u8; MTU];
463 let result = self.cache().async_try_read(&piece_desc, 0, &mut buffer[..]).await;
464 let mut state = self.0.state.write().unwrap();
465 if let AsyncEncoderPendingState::Pending(pending_desc) = &state.pending {
466 if pending_desc.eq(&piece_desc) {
467 state.pending = AsyncEncoderPendingState::Waiting(piece_desc, result.map(|len| {
468 buffer.truncate(len);
469 buffer
470 }));
471 }
472 }
473 }
474}
475
476impl ChunkEncoder for AsyncStreamEncoder {
477 fn clone_as_encoder(&self) -> Box<dyn ChunkEncoder> {
478 Box::new(self.clone())
479 }
480
481 fn chunk(&self) -> &ChunkId {
482 self.cache().chunk()
483 }
484
485 fn desc(&self) -> &ChunkCodecDesc {
486 &self.0.desc
487 }
488
489 fn next_piece(&self, session_id: &TempSeq, buf: &mut [u8]) -> BuckyResult<usize> {
490 let mut state = self.0.state.write().unwrap();
491 match &mut state.pending {
492 AsyncEncoderPendingState::Pending(_) => Ok(0),
493 AsyncEncoderPendingState::Waiting(piece_desc, _result) => {
494 let mut result = Err(BuckyError::new(BuckyErrorCode::Ok, ""));
495 std::mem::swap(&mut result, _result);
496 let piece_desc = piece_desc.clone();
497 state.pending = AsyncEncoderPendingState::None;
498 match result {
499 Ok(buffer) => {
500 let (index, _) = piece_desc.unwrap_as_stream();
501 if state.indices.next() == Some(index) {
502 let _ = state.indices.pop_next();
503 let buf_len = buf.len();
504 let buf = PieceData::encode_header(
505 buf,
506 session_id,
507 self.chunk(),
508 &piece_desc)?;
509 let header_len = buf_len - buf.len();
510 buf[..buffer.len()].copy_from_slice(&buffer[..]);
511 let piece_len = header_len + buffer.len();
512 Ok(piece_len)
513 } else {
514 Ok(0)
515 }
516 },
517 Err(err) => {
518 Err(err)
519 }
520 }
521 },
522 AsyncEncoderPendingState::None => {
523 if let Some(index) = state.indices.next() {
524 trace!("{} try pop next piece {}", self, index);
525 if self.cache().exists(index)
526 .map_err(|err| {
527 error!("{} exists error {}", self, index);
528 err
529 }).unwrap() {
530 let (_, _, step) = self.desc().unwrap_as_stream();
531 let piece_desc = PieceDesc::Range(index, step.abs() as u16);
532 let buf_len = buf.len();
533 let buf = PieceData::encode_header(
534 buf,
535 session_id,
536 self.chunk(),
537 &piece_desc)?;
538 let header_len = buf_len - buf.len();
539 match self.cache().sync_try_read(&piece_desc, 0, buf) {
540 Ok(len) => {
541 let _ = state.indices.pop_next();
542 trace!("{} pop next piece {:?}", self, piece_desc);
543 Ok(header_len + len)
544 },
545 Err(err) => {
546 if BuckyErrorCode::NotSupport == err.code() {
547 state.pending = AsyncEncoderPendingState::Pending(piece_desc.clone());
548 let encoder = self.clone();
549 task::spawn(async move {
550 encoder.async_next_piece(piece_desc).await;
551 });
552 Ok(0)
553 } else if BuckyErrorCode::WouldBlock == err.code() {
554 Ok(0)
555 } else {
556 Err(err)
557 }
558 }
559 }
560 } else {
561 Ok(0)
562 }
563 } else {
564 Ok(0)
565 }
566 }
567 }
568 }
569
570 fn reset(&self) -> bool {
571 let mut state = self.0.state.write().unwrap();
572 if state.indices.reset() {
573 match &state.pending {
574 AsyncEncoderPendingState::Pending(next_desc) => {
575 let (index, _) = next_desc.unwrap_as_stream();
576 if state.indices.next() != Some(index) {
577 state.pending = AsyncEncoderPendingState::None;
578 }
579 },
580 AsyncEncoderPendingState::Waiting(next_desc, _) => {
581 let (index, _) = next_desc.unwrap_as_stream();
582 if state.indices.next() != Some(index) {
583 state.pending = AsyncEncoderPendingState::None;
584 }
585 },
586 _ => {}
587 }
588 true
589 } else {
590 false
591 }
592 }
593
594 fn merge(&self, max_index: u32, lost_index: Vec<Range<u32>>) -> bool {
595 let mut state = self.0.state.write().unwrap();
596 if state.indices.merge(max_index, lost_index) {
597 match &state.pending {
598 AsyncEncoderPendingState::Pending(next_desc) => {
599 let (index, _) = next_desc.unwrap_as_stream();
600 if state.indices.next() != Some(index) {
601 state.pending = AsyncEncoderPendingState::None;
602 }
603 },
604 AsyncEncoderPendingState::Waiting(next_desc, _) => {
605 let (index, _) = next_desc.unwrap_as_stream();
606 if state.indices.next() != Some(index) {
607 state.pending = AsyncEncoderPendingState::None;
608 }
609 },
610 _ => {}
611 }
612 true
613 } else {
614 false
615 }
616 }
617}
618
619
620
621
622
623struct SyncEncoderStateImpl {
624 reader: Option<Box<dyn SyncReadWithSeek + Send + Sync>>,
625 indices: OutcomeIndexQueue,
626}
627
628struct SyncEncoderImpl {
629 desc: ChunkCodecDesc,
630 cache: ChunkStreamCache,
631 state: Mutex<SyncEncoderStateImpl>
632}
633
634#[derive(Clone)]
635pub struct SyncStreamEncoder(Arc<SyncEncoderImpl>);
636
637impl std::fmt::Display for SyncStreamEncoder {
638 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
639 write!(f, "StreamEncoder{{chunk:{},desc:{:?}}}", self.chunk(), self.desc())
640 }
641}
642
643
644impl SyncStreamEncoder {
645 pub fn new(
646 cache: ChunkStreamCache,
647 desc: &ChunkCodecDesc
648 ) -> Self {
649 let (start, end, step) = desc.unwrap_as_stream();
650 Self(Arc::new(SyncEncoderImpl {
651 desc: desc.clone(),
652 cache,
653 state: Mutex::new(SyncEncoderStateImpl {
654 reader: None,
655 indices: OutcomeIndexQueue::new(start, end, step)
656 })
657 }))
658 }
659
660 fn cache(&self) -> &ChunkStreamCache {
661 &self.0.cache
662 }
663}
664
665impl ChunkEncoder for SyncStreamEncoder {
666 fn clone_as_encoder(&self) -> Box<dyn ChunkEncoder> {
667 Box::new(self.clone())
668 }
669
670 fn chunk(&self) -> &ChunkId {
671 self.cache().chunk()
672 }
673
674 fn desc(&self) -> &ChunkCodecDesc {
675 &self.0.desc
676 }
677
678 fn next_piece(&self, session_id: &TempSeq, buf: &mut [u8]) -> BuckyResult<usize> {
679 let mut state = self.0.state.lock().unwrap();
680 if let Some(index) = state.indices.next() {
681 if self.cache().exists(index)
682 .map_err(|err| {
683 error!("{} exists error {}", self, index);
684 err
685 }).unwrap() {
686 let (_, _, step) = self.desc().unwrap_as_stream();
687 let piece_desc = PieceDesc::Range(index, step.abs() as u16);
688 let buf_len = buf.len();
689 let buf = PieceData::encode_header(
690 buf,
691 session_id,
692 self.chunk(),
693 &piece_desc)?;
694 let header_len = buf_len - buf.len();
695 if state.reader.is_none() {
696 let raw_cache = self.cache().raw_cache().unwrap();
697 match raw_cache.sync_reader() {
698 Ok(reader) => {
699 state.reader = Some(reader);
700 },
701 Err(err) => {
702 let ret = if BuckyErrorCode::WouldBlock == err.code() {
703 Ok(0)
704 } else {
705 Err(err)
706 };
707 return ret;
708 }
709 }
710 }
711 let reader = state.reader.as_mut().unwrap();
712 let (_, range) = piece_desc.stream_piece_range(self.chunk());
713 use std::io::{Read, Seek};
714
715 let start = range.start;
716 if start == reader.seek(SeekFrom::Start(start))? {
717 let len = (range.end - start) as usize;
718 let len = len.min(buf.len());
719 reader.read_exact(&mut buf[..len])
720 .map_err(|err| {
721 trace!("{} sync_try_read {}, desc: {:?}, buffer: {} ", self, err, piece_desc, buf.len());
722 err
723 })?;
724 trace!("{} sync_try_read {}, desc: {:?},buffer: {} ", self, len, piece_desc, buf.len());
725 let _ = state.indices.pop_next();
726 trace!("{} pop next piece {:?}", self, piece_desc);
727 Ok(header_len + len)
728 } else {
729 trace!("{} sync_try_read invalid, desc: {:?}, buffer: {} ", self, piece_desc, buf.len());
730 Err(BuckyError::new(BuckyErrorCode::InvalidInput, "len mismatch"))
731 }
732 } else {
733 Ok(0)
734 }
735 } else {
736 Ok(0)
737 }
738 }
739
740 fn reset(&self) -> bool {
741 self.0.state.lock().unwrap().indices.reset()
742 }
743
744 fn merge(&self, max_index: u32, lost_index: Vec<Range<u32>>) -> bool {
745 self.0.state.lock().unwrap().indices.merge(max_index, lost_index)
746 }
747}