cyfs_bdt/ndn/download/
group.rs

1use std::{
2    collections::{HashMap}, 
3    sync::{Arc, RwLock}
4};
5use cyfs_base::*;
6use crate::{
7    types::*
8};
9use super::super::{
10    types::*
11};
12use super::{
13    common::*
14};
15
16struct DownloadingState {
17    entries: HashMap<String, Box<dyn DownloadTask>>, 
18    running: Vec<Box<dyn DownloadTask>>, 
19    closed: bool, 
20    history_downloaded: u64, 
21    downloaded: u64, 
22    history_speed: HistorySpeed, 
23}
24
25enum TaskStateImpl {
26    Downloading(DownloadingState), 
27    Finished(u64), 
28    Error(BuckyError), 
29}
30
31enum ControlStateImpl {
32    Normal(StateWaiter), 
33    Canceled,
34}
35
36struct StateImpl {
37    task_state: TaskStateImpl, 
38    control_state: ControlStateImpl, 
39}
40
41struct TaskImpl {
42    history_speed: HistorySpeedConfig, 
43    state: RwLock<StateImpl>
44}
45
46#[derive(Clone)]
47pub struct DownloadGroup(Arc<TaskImpl>);
48
49impl DownloadGroup {
50    pub fn new(history_speed: HistorySpeedConfig) -> Self {
51        Self(Arc::new(TaskImpl {
52            history_speed: history_speed.clone(), 
53            state: RwLock::new(StateImpl {
54                task_state: TaskStateImpl::Downloading(DownloadingState {
55                    entries: Default::default(), 
56                    running: Default::default(), 
57                    history_speed: HistorySpeed::new(0, history_speed), 
58                    history_downloaded: 0, 
59                    downloaded: 0, 
60                    closed: false, 
61                }),
62                control_state: ControlStateImpl::Normal(StateWaiter::new()), 
63            })
64        }))
65    }
66
67    pub fn history_config(&self) -> &HistorySpeedConfig {
68        &self.0.history_speed
69    }
70}
71
72
73impl NdnTask for DownloadGroup {
74    fn clone_as_task(&self) -> Box<dyn NdnTask> {
75        Box::new(self.clone())
76    }
77
78    fn state(&self) -> NdnTaskState {
79        match &self.0.state.read().unwrap().task_state {
80            TaskStateImpl::Downloading(_) => NdnTaskState::Running,
81            TaskStateImpl::Finished(_) => NdnTaskState::Finished, 
82            TaskStateImpl::Error(err) => NdnTaskState::Error(err.clone())
83        }
84        
85    }
86
87    fn control_state(&self) -> NdnTaskControlState {
88        match &self.0.state.read().unwrap().control_state {
89            ControlStateImpl::Normal(_) => NdnTaskControlState::Normal, 
90            ControlStateImpl::Canceled => NdnTaskControlState::Canceled
91        }
92    }
93
94    fn close(&self, recursion: bool) -> BuckyResult<()> {
95        let children: Option<Vec<_>> = {
96            let mut state = self.0.state.write().unwrap();
97            match &mut state.task_state {
98                TaskStateImpl::Downloading(downloading) => {
99                    let running = if recursion {
100                        Some(downloading.running.iter().map(|t| t.clone_as_download_task()).collect())
101                    } else {
102                        None
103                    };
104                    downloading.closed = true;
105                    if downloading.running.len() == 0 {
106                        state.task_state = TaskStateImpl::Finished(downloading.downloaded);
107                    }
108                    running
109                },
110                _ => None
111            }
112        };
113
114        if recursion {
115            for task in children.unwrap() {
116                let _ = task.close(recursion);
117            }
118        }
119        
120        Ok(())
121    }
122
123    fn cur_speed(&self) -> u32 {
124        let state = self.0.state.read().unwrap();
125        match &state.task_state {
126            TaskStateImpl::Downloading(downloading) => downloading.history_speed.latest(),
127            _ => 0
128        }
129    }
130
131    fn history_speed(&self) -> u32 {
132        let state = self.0.state.read().unwrap();
133        match &state.task_state {
134            TaskStateImpl::Downloading(downloading) => downloading.history_speed.average(),
135            _ => 0
136        }
137    }
138
139    fn transfered(&self) -> u64 {
140        let state = self.0.state.read().unwrap();
141        match &state.task_state {
142            TaskStateImpl::Downloading(downloading) => downloading.downloaded,
143            TaskStateImpl::Finished(downloaded) => *downloaded, 
144            _ => 0
145        }
146    }
147
148
149    fn cancel_by_error(&self, err: BuckyError) -> BuckyResult<NdnTaskControlState> {
150        let (tasks, waiters) = {
151            let mut state = self.0.state.write().unwrap();
152            let waiters = match &mut state.control_state {
153                ControlStateImpl::Normal(waiters) => {
154                    let waiters = Some(waiters.transfer());
155                    state.control_state = ControlStateImpl::Canceled;
156                    waiters
157                }, 
158                _ => None
159            };
160
161            let tasks = match &mut state.task_state {
162                TaskStateImpl::Downloading(downloading) => {
163                    let tasks: Vec<Box<dyn DownloadTask>> = downloading.running.iter().map(|t| t.clone_as_download_task()).collect();
164                    state.task_state = TaskStateImpl::Error(err.clone());
165                    tasks
166                },
167                _ => vec![]
168            };
169
170            (tasks, waiters)
171        };
172
173        if let Some(waiters) = waiters {
174            waiters.wake();
175        }
176
177        for task in tasks {
178            let _ = task.cancel_by_error(err.clone());
179        }
180        
181        Ok(NdnTaskControlState::Canceled)
182    }
183}
184
185
186#[async_trait::async_trait]
187impl DownloadTask for DownloadGroup {
188    fn clone_as_download_task(&self) -> Box<dyn DownloadTask> {
189        Box::new(self.clone())
190    }
191
192    fn add_task(&self, path: Option<String>, sub: Box<dyn DownloadTask>) -> BuckyResult<()> {
193        let mut state = self.0.state.write().unwrap();
194        match &mut state.task_state {
195            TaskStateImpl::Downloading(downloading) => {
196                if !downloading.closed {
197                    downloading.running.push(sub.clone_as_download_task());
198                    if let Some(path) = path {
199                        if let Some(exists) = downloading.entries.insert(path, sub) {
200                            let _ = exists.cancel();
201                        }
202                    }
203                    Ok(())
204                } else {
205                    Err(BuckyError::new(BuckyErrorCode::ErrorState, ""))
206                }
207            },
208            _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, ""))
209        }
210    }
211
212    fn sub_task(&self, path: &str) -> Option<Box<dyn DownloadTask>> {
213        if path.len() == 0 {
214            Some(self.clone_as_download_task())
215        } else {
216            let mut names = path.split("/");
217            let name = names.next().unwrap();
218    
219            let state = self.0.state.read().unwrap(); 
220            match &state.task_state {
221                TaskStateImpl::Downloading(downloading) => {
222                    let mut sub = downloading.entries.get(name).map(|t| t.clone_as_download_task());
223                    if sub.is_none() {
224                        sub 
225                    } else {
226                        for name in names {
227                            sub = sub.and_then(|t| t.sub_task(name));
228                            if sub.is_none() {
229                                break;
230                            }
231                        }
232                        sub
233                    }
234                },
235                _ => None
236            }
237        }
238    }
239
240    fn calc_speed(&self, when: Timestamp) -> u32 {
241        let mut state = self.0.state.write().unwrap();
242        let mut running = vec![];
243        let mut cur_speed = 0;
244        let mut running_downloaded = 0;
245        match &mut state.task_state {
246            TaskStateImpl::Downloading(downloading) => {
247                for sub in &downloading.running {
248                    cur_speed += sub.calc_speed(when);
249                    match sub.state() {
250                        NdnTaskState::Finished | NdnTaskState::Error(_) => {
251                            downloading.history_downloaded += sub.transfered();
252                        }, 
253                        _ => {
254                            running_downloaded += sub.transfered();
255                            running.push(sub.clone_as_download_task());
256                        }
257                    }  
258                }
259                downloading.downloaded = downloading.history_downloaded + running_downloaded;
260                downloading.history_speed.update(Some(cur_speed), when);
261                if running.len() == 0 && downloading.closed {
262                    state.task_state = TaskStateImpl::Finished(downloading.downloaded);
263                } else {
264                    downloading.running = running;
265                }
266                cur_speed
267            },
268            _ => 0
269        }
270    }
271
272    async fn wait_user_canceled(&self) -> BuckyError {
273        let waiter = {
274            let mut state = self.0.state.write().unwrap();
275            match &mut state.control_state {
276                ControlStateImpl::Normal(waiters) => Some(waiters.new_waiter()), 
277                _ => None
278            }
279        };
280        
281        
282        if let Some(waiter) = waiter {
283            let _ = StateWaiter::wait(waiter, || self.control_state()).await;
284        } 
285
286        BuckyError::new(BuckyErrorCode::UserCanceled, "")
287    }
288}