cyfs_bdt/ndn/download/
chunk_list.rs

1use std::{
2    sync::{RwLock},
3    io::SeekFrom, 
4    ops::Range
5};
6use async_std::{
7    sync::Arc, 
8    pin::Pin, 
9    task::{Context, Poll}, 
10};
11
12use cyfs_base::*;
13use crate::{
14    types::*, 
15    stack::{WeakStack, Stack}, 
16};
17use super::super::{
18    chunk::*, 
19    types::*
20};
21use super::{
22    common::*, 
23};
24
25
26struct DownloadingState { 
27    downloaded: u64,  
28    cur_speed: ProgressCounter,  
29    cur_chunk: (ChunkDownloader, usize), 
30    history_speed: HistorySpeed,
31}
32
33enum ControlStateImpl {
34    Normal(StateWaiter), 
35    Canceled,
36}
37
38enum TaskStateImpl {
39    Pending, 
40    Downloading(DownloadingState), 
41    Error(BuckyError), 
42    Finished(u64)
43}
44
45struct StateImpl {
46    abs_path: Option<String>, 
47    control_state: ControlStateImpl, 
48    task_state: TaskStateImpl,
49}
50
51struct TaskImpl {
52    stack: WeakStack, 
53    name: String, 
54    chunk_list: ChunkListDesc,
55    context: Box<dyn DownloadContext>,
56    state: RwLock<StateImpl>,  
57}
58
59#[derive(Clone)]
60pub struct ChunkListTask(Arc<TaskImpl>);
61
62impl std::fmt::Display for ChunkListTask {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        write!(f, "ChunkListTask::{{name:{}}}", self.name())
65    }
66}
67
68impl ChunkListTask {
69    pub fn new(
70        stack: WeakStack,  
71        name: String,
72        chunk_list: ChunkListDesc, 
73        context: Box<dyn DownloadContext>, 
74    ) -> Self {
75        Self(Arc::new(TaskImpl {
76            stack, 
77            name,
78            context, 
79            state: RwLock::new(StateImpl {
80                abs_path: None, 
81                task_state: if chunk_list.total_len() > 0 {
82                    TaskStateImpl::Pending
83                } else {
84                    TaskStateImpl::Finished(0)
85                },
86                control_state: ControlStateImpl::Normal(StateWaiter::new()),
87            }),
88            chunk_list, 
89        }))
90    } 
91
92    pub fn name(&self) -> &str {
93        self.0.name.as_str()
94    }
95
96    pub fn chunk_list(&self) -> &ChunkListDesc {
97        &self.0.chunk_list
98    }
99
100    fn create_cache(&self, index: usize) -> BuckyResult<ChunkCache> {
101        let stack = Stack::from(&self.0.stack);
102        let chunk = &self.chunk_list().chunks()[index];
103
104        let mut state = self.0.state.write().unwrap();
105        match &mut state.task_state {
106            TaskStateImpl::Pending => {
107                debug!("{} create cache from pending, index={}, chunk={}", self, index, chunk);
108                let downloader = stack.ndn().chunk_manager().create_downloader(chunk, self.clone_as_leaf_task());
109                state.task_state = TaskStateImpl::Downloading(DownloadingState { 
110                    downloaded: 0, 
111                    cur_speed: ProgressCounter::new(0), 
112                    cur_chunk: (downloader.clone(), index), 
113                    history_speed: HistorySpeed::new(0, stack.config().ndn.channel.history_speed.clone()), 
114                });
115                Ok(downloader.cache().clone())
116            }, 
117            TaskStateImpl::Downloading(downloading) => {
118                let (downloader, cur_index) = &downloading.cur_chunk;
119                if *cur_index != index {
120                    debug!("{} create new cache, old_index={}, old_chunk={}, index={}, chunk={}", self, *cur_index, downloader.cache().chunk(), index, chunk);
121                    downloading.downloaded += downloader.cache().stream().len() as u64;
122                    downloading.cur_chunk = (stack.ndn().chunk_manager().create_downloader(chunk, self.clone_as_leaf_task()), index);
123                }
124                Ok(downloading.cur_chunk.0.cache().clone())
125            },
126            TaskStateImpl::Finished(_) => unreachable!(), 
127            TaskStateImpl::Error(err) => Err(err.clone())
128        }
129    }
130}
131
132#[async_trait::async_trait]
133impl LeafDownloadTask for ChunkListTask {
134    fn clone_as_leaf_task(&self) -> Box<dyn LeafDownloadTask> {
135        Box::new(self.clone())
136    }
137
138    fn abs_group_path(&self) -> Option<String> {
139        self.0.state.read().unwrap().abs_path.clone()
140    }
141
142    fn context(&self) -> &dyn DownloadContext {
143        self.0.context.as_ref()
144    }
145
146    fn finish(&self) {
147        let mut state = self.0.state.write().unwrap();
148
149        match &mut state.task_state {
150            TaskStateImpl::Downloading(downloading) => {
151                info!("{} mark finished", self);
152                downloading.downloaded += downloading.cur_chunk.0.cache().stream().len() as u64;
153                state.task_state = TaskStateImpl::Finished(downloading.downloaded);
154            }, 
155            _ => {}
156        };
157    }
158}
159
160impl NdnTask for ChunkListTask {
161    fn clone_as_task(&self) -> Box<dyn NdnTask> {
162        Box::new(self.clone())
163    }
164
165    fn state(&self) -> NdnTaskState {
166        match &self.0.state.read().unwrap().task_state {
167            TaskStateImpl::Pending => NdnTaskState::Running, 
168            TaskStateImpl::Downloading(_) => NdnTaskState::Running, 
169            TaskStateImpl::Finished(_) => NdnTaskState::Finished, 
170            TaskStateImpl::Error(err) => NdnTaskState::Error(err.clone()),
171        }
172    }
173
174    fn control_state(&self) -> NdnTaskControlState {
175        match &self.0.state.read().unwrap().control_state {
176            ControlStateImpl::Normal(_) => NdnTaskControlState::Normal, 
177            ControlStateImpl::Canceled => NdnTaskControlState::Canceled
178        }
179    }
180
181    fn cur_speed(&self) -> u32 {
182        let state = self.0.state.read().unwrap();
183        match &state.task_state {
184            TaskStateImpl::Downloading(downloading) => downloading.history_speed.latest(), 
185            _ => 0,
186        }
187    }
188
189    fn history_speed(&self) -> u32 {
190        let state = self.0.state.read().unwrap();
191        match &state.task_state {
192            TaskStateImpl::Downloading(downloading) => downloading.history_speed.average(), 
193            _ => 0,
194        }
195    }
196
197    fn transfered(&self) -> u64 {
198        let state = self.0.state.read().unwrap();
199        match &state.task_state {
200            TaskStateImpl::Downloading(downloading) => downloading.downloaded + downloading.cur_chunk.0.cache().stream().len() as u64, 
201            TaskStateImpl::Finished(downloaded) => *downloaded, 
202            _ => 0,
203        }
204
205    }
206
207    fn cancel_by_error(&self, err: BuckyError) -> BuckyResult<NdnTaskControlState> {
208        let waiters = {
209            let mut state = self.0.state.write().unwrap();
210            let waiters = match &mut state.control_state {
211                ControlStateImpl::Normal(waiters) => {
212                    let waiters = Some(waiters.transfer());
213                    state.control_state = ControlStateImpl::Canceled;
214                    waiters
215                }, 
216                _ => None
217            };
218
219            match &state.task_state {
220                TaskStateImpl::Downloading(_) => {
221                    info!("{} cancel by err {}", self, err);
222                    state.task_state = TaskStateImpl::Error(err);
223                }, 
224                _ => {}
225            };
226
227            waiters
228        };
229
230        if let Some(waiters) = waiters {
231            waiters.wake();
232        }
233
234        Ok(NdnTaskControlState::Canceled)
235    }
236}
237
238#[async_trait::async_trait]
239impl DownloadTask for ChunkListTask {
240    fn clone_as_download_task(&self) -> Box<dyn DownloadTask> {
241        Box::new(self.clone())
242    }
243
244    fn on_post_add_to_root(&self, abs_path: String) {
245        self.0.state.write().unwrap().abs_path = Some(abs_path);
246    }
247
248    fn calc_speed(&self, when: Timestamp) -> u32 {
249        let mut state = self.0.state.write().unwrap();
250        match &mut state.task_state {
251            TaskStateImpl::Downloading(downloading) => {
252                let downloaded = downloading.downloaded + downloading.cur_chunk.0.cache().stream().len() as u64;
253                let cur_speed = downloading.cur_speed.update(downloaded, when);
254                debug!("{} calc_speed update cur_speed {}", self, cur_speed);
255                downloading.history_speed.update(Some(cur_speed), when);
256                cur_speed
257            }
258            _ => 0,
259        }
260    }
261
262    async fn wait_user_canceled(&self) -> BuckyError {
263        let waiter = {
264            let mut state = self.0.state.write().unwrap();
265
266            match &mut state.control_state {
267                ControlStateImpl::Normal(waiters) => Some(waiters.new_waiter()), 
268                _ => None
269            }
270        };
271        
272        if let Some(waiter) = waiter {
273            let _ = StateWaiter::wait(waiter, || self.control_state()).await;
274        } 
275
276        BuckyError::new(BuckyErrorCode::UserCanceled, "")
277    }
278}
279
280
281pub struct ChunkListTaskReader {
282    offset: u64,
283    task: ChunkListTask
284} 
285
286impl ChunkListTaskReader {
287    fn new(task: ChunkListTask) -> Self {
288        Self {
289            offset: 0, 
290            task
291        }
292    }
293
294    pub fn task(&self) -> &dyn LeafDownloadTask {
295        &self.task
296    }
297}
298
299impl Drop for ChunkListTaskReader {
300    fn drop(&mut self) {
301        if self.offset == self.task.chunk_list().total_len() {
302            info!("{} drop after finished", self.task());
303            self.task.finish();
304        } else {
305            info!("{} drop before finished", self.task());
306            let _ = self.task.cancel();
307        }
308    }
309}
310
311impl std::io::Seek for ChunkListTaskReader {
312    fn seek(
313        self: &mut Self,
314        pos: SeekFrom,
315    ) -> std::io::Result<u64> {
316        let len = self.task.chunk_list().total_len();
317        let new_offset = match pos {
318            SeekFrom::Start(offset) => len.min(offset), 
319            SeekFrom::Current(offset) => {
320                let offset = (self.offset as i64) + offset;
321                let offset = offset.max(0) as u64;
322                len.min(offset)
323            },
324            SeekFrom::End(offset) => {
325                let offset = (len as i64) + offset;
326                let offset = offset.max(0) as u64;
327                len.min(offset)
328            }
329        };
330        if new_offset < self.offset {
331            Err(std::io::Error::new(std::io::ErrorKind::Unsupported, "single directed stream"))
332        } else {
333            self.offset = new_offset;
334
335            Ok(new_offset)
336        }
337    }
338}
339
340
341impl DownloadTaskSplitRead for ChunkListTaskReader {
342    fn poll_split_read(
343        self: Pin<&mut Self>,
344        cx: &mut Context<'_>,
345        buffer: &mut [u8],
346    ) -> Poll<std::io::Result<Option<(ChunkCache, Range<usize>)>>> {
347        let pined = self.get_mut();
348        debug!("{} poll split read, buffer={}, offset={}", pined.task(), buffer.len(), pined.offset);
349        let ranges = pined.task.chunk_list().range_of(pined.offset..pined.offset + buffer.len() as u64);
350        if ranges.is_empty() {
351            debug!("{} poll split read break, buffer={}, offset={}", pined.task(), buffer.len(), pined.offset);
352            return Poll::Ready(Ok(None));
353        }
354        if let NdnTaskState::Error(err) = pined.task.state() {
355            debug!("{} poll split read break, buffer={}, offset={}", pined.task(), buffer.len(), pined.offset);
356            return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, BuckyError::new(err, ""))));
357        } 
358        let (index, range) = ranges[0].clone();
359        debug!("{} poll split on chunk index, buffer={}, offset={}, index={}", pined.task(), buffer.len(), pined.offset, index);
360        let result = match pined.task.create_cache(index) {
361            Ok(cache) => {
362                let mut reader = DownloadTaskReader::new(cache, pined.task.clone_as_leaf_task());
363                use std::{io::{Seek}};
364                match reader.seek(SeekFrom::Start(range.start)) {
365                    Ok(_) => {
366                        let result = DownloadTaskSplitRead::poll_split_read(Pin::new(&mut reader), cx, &mut buffer[0..(range.end - range.start) as usize]);
367                        if let Poll::Ready(result) = &result {
368                            if let Some((_, r)) = result.as_ref().ok().and_then(|r| r.as_ref()) {
369                                let old_offset = pined.offset;
370                                pined.offset += (r.end - r.start) as u64;
371                                debug!("{} poll split offset changed, buffer={}, offset={}, new_offset={}", pined.task(), buffer.len(), old_offset, pined.offset);
372                            }
373                        }
374                        result
375                    },
376                    Err(err) => {
377                        return Poll::Ready(Err(err));
378                    }
379                }
380            }
381            Err(err) => {
382                return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err)));
383            }
384        };
385        result
386    }
387}
388
389
390impl async_std::io::Read for ChunkListTaskReader {
391    fn poll_read(
392        self: Pin<&mut Self>,
393        cx: &mut Context<'_>,
394        buffer: &mut [u8],
395    ) -> Poll<std::io::Result<usize>> {
396        self.poll_split_read(cx, buffer).map(|result| result.map(|r| if let Some((_, r)) = r {
397            r.end - r.start
398        } else {
399            0
400        }))
401    }
402}
403
404
405impl ChunkListTask {
406    pub fn reader(
407        stack: WeakStack,  
408        name: String,
409        chunk_list: ChunkListDesc, 
410        context: Box<dyn DownloadContext>
411    ) -> (Self, ChunkListTaskReader) {
412        let task = Self::new(stack, name, chunk_list, context);
413        let reader = ChunkListTaskReader::new(task.clone());
414        
415        (task, reader)
416    }
417}