cyfs_bdt/ndn/download/
group.rs1use std::{
2 collections::{HashMap},
3 sync::{Arc, RwLock}
4};
5use cyfs_base::*;
6use crate::{
7 types::*
8};
9use super::super::{
10 types::*
11};
12use super::{
13 common::*
14};
15
16struct DownloadingState {
17 entries: HashMap<String, Box<dyn DownloadTask>>,
18 running: Vec<Box<dyn DownloadTask>>,
19 closed: bool,
20 history_downloaded: u64,
21 downloaded: u64,
22 history_speed: HistorySpeed,
23}
24
25enum TaskStateImpl {
26 Downloading(DownloadingState),
27 Finished(u64),
28 Error(BuckyError),
29}
30
31enum ControlStateImpl {
32 Normal(StateWaiter),
33 Canceled,
34}
35
36struct StateImpl {
37 task_state: TaskStateImpl,
38 control_state: ControlStateImpl,
39}
40
41struct TaskImpl {
42 history_speed: HistorySpeedConfig,
43 state: RwLock<StateImpl>
44}
45
46#[derive(Clone)]
47pub struct DownloadGroup(Arc<TaskImpl>);
48
49impl DownloadGroup {
50 pub fn new(history_speed: HistorySpeedConfig) -> Self {
51 Self(Arc::new(TaskImpl {
52 history_speed: history_speed.clone(),
53 state: RwLock::new(StateImpl {
54 task_state: TaskStateImpl::Downloading(DownloadingState {
55 entries: Default::default(),
56 running: Default::default(),
57 history_speed: HistorySpeed::new(0, history_speed),
58 history_downloaded: 0,
59 downloaded: 0,
60 closed: false,
61 }),
62 control_state: ControlStateImpl::Normal(StateWaiter::new()),
63 })
64 }))
65 }
66
67 pub fn history_config(&self) -> &HistorySpeedConfig {
68 &self.0.history_speed
69 }
70}
71
72
73impl NdnTask for DownloadGroup {
74 fn clone_as_task(&self) -> Box<dyn NdnTask> {
75 Box::new(self.clone())
76 }
77
78 fn state(&self) -> NdnTaskState {
79 match &self.0.state.read().unwrap().task_state {
80 TaskStateImpl::Downloading(_) => NdnTaskState::Running,
81 TaskStateImpl::Finished(_) => NdnTaskState::Finished,
82 TaskStateImpl::Error(err) => NdnTaskState::Error(err.clone())
83 }
84
85 }
86
87 fn control_state(&self) -> NdnTaskControlState {
88 match &self.0.state.read().unwrap().control_state {
89 ControlStateImpl::Normal(_) => NdnTaskControlState::Normal,
90 ControlStateImpl::Canceled => NdnTaskControlState::Canceled
91 }
92 }
93
94 fn close(&self, recursion: bool) -> BuckyResult<()> {
95 let children: Option<Vec<_>> = {
96 let mut state = self.0.state.write().unwrap();
97 match &mut state.task_state {
98 TaskStateImpl::Downloading(downloading) => {
99 let running = if recursion {
100 Some(downloading.running.iter().map(|t| t.clone_as_download_task()).collect())
101 } else {
102 None
103 };
104 downloading.closed = true;
105 if downloading.running.len() == 0 {
106 state.task_state = TaskStateImpl::Finished(downloading.downloaded);
107 }
108 running
109 },
110 _ => None
111 }
112 };
113
114 if recursion {
115 for task in children.unwrap() {
116 let _ = task.close(recursion);
117 }
118 }
119
120 Ok(())
121 }
122
123 fn cur_speed(&self) -> u32 {
124 let state = self.0.state.read().unwrap();
125 match &state.task_state {
126 TaskStateImpl::Downloading(downloading) => downloading.history_speed.latest(),
127 _ => 0
128 }
129 }
130
131 fn history_speed(&self) -> u32 {
132 let state = self.0.state.read().unwrap();
133 match &state.task_state {
134 TaskStateImpl::Downloading(downloading) => downloading.history_speed.average(),
135 _ => 0
136 }
137 }
138
139 fn transfered(&self) -> u64 {
140 let state = self.0.state.read().unwrap();
141 match &state.task_state {
142 TaskStateImpl::Downloading(downloading) => downloading.downloaded,
143 TaskStateImpl::Finished(downloaded) => *downloaded,
144 _ => 0
145 }
146 }
147
148
149 fn cancel_by_error(&self, err: BuckyError) -> BuckyResult<NdnTaskControlState> {
150 let (tasks, waiters) = {
151 let mut state = self.0.state.write().unwrap();
152 let waiters = match &mut state.control_state {
153 ControlStateImpl::Normal(waiters) => {
154 let waiters = Some(waiters.transfer());
155 state.control_state = ControlStateImpl::Canceled;
156 waiters
157 },
158 _ => None
159 };
160
161 let tasks = match &mut state.task_state {
162 TaskStateImpl::Downloading(downloading) => {
163 let tasks: Vec<Box<dyn DownloadTask>> = downloading.running.iter().map(|t| t.clone_as_download_task()).collect();
164 state.task_state = TaskStateImpl::Error(err.clone());
165 tasks
166 },
167 _ => vec![]
168 };
169
170 (tasks, waiters)
171 };
172
173 if let Some(waiters) = waiters {
174 waiters.wake();
175 }
176
177 for task in tasks {
178 let _ = task.cancel_by_error(err.clone());
179 }
180
181 Ok(NdnTaskControlState::Canceled)
182 }
183}
184
185
186#[async_trait::async_trait]
187impl DownloadTask for DownloadGroup {
188 fn clone_as_download_task(&self) -> Box<dyn DownloadTask> {
189 Box::new(self.clone())
190 }
191
192 fn add_task(&self, path: Option<String>, sub: Box<dyn DownloadTask>) -> BuckyResult<()> {
193 let mut state = self.0.state.write().unwrap();
194 match &mut state.task_state {
195 TaskStateImpl::Downloading(downloading) => {
196 if !downloading.closed {
197 downloading.running.push(sub.clone_as_download_task());
198 if let Some(path) = path {
199 if let Some(exists) = downloading.entries.insert(path, sub) {
200 let _ = exists.cancel();
201 }
202 }
203 Ok(())
204 } else {
205 Err(BuckyError::new(BuckyErrorCode::ErrorState, ""))
206 }
207 },
208 _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, ""))
209 }
210 }
211
212 fn sub_task(&self, path: &str) -> Option<Box<dyn DownloadTask>> {
213 if path.len() == 0 {
214 Some(self.clone_as_download_task())
215 } else {
216 let mut names = path.split("/");
217 let name = names.next().unwrap();
218
219 let state = self.0.state.read().unwrap();
220 match &state.task_state {
221 TaskStateImpl::Downloading(downloading) => {
222 let mut sub = downloading.entries.get(name).map(|t| t.clone_as_download_task());
223 if sub.is_none() {
224 sub
225 } else {
226 for name in names {
227 sub = sub.and_then(|t| t.sub_task(name));
228 if sub.is_none() {
229 break;
230 }
231 }
232 sub
233 }
234 },
235 _ => None
236 }
237 }
238 }
239
240 fn calc_speed(&self, when: Timestamp) -> u32 {
241 let mut state = self.0.state.write().unwrap();
242 let mut running = vec![];
243 let mut cur_speed = 0;
244 let mut running_downloaded = 0;
245 match &mut state.task_state {
246 TaskStateImpl::Downloading(downloading) => {
247 for sub in &downloading.running {
248 cur_speed += sub.calc_speed(when);
249 match sub.state() {
250 NdnTaskState::Finished | NdnTaskState::Error(_) => {
251 downloading.history_downloaded += sub.transfered();
252 },
253 _ => {
254 running_downloaded += sub.transfered();
255 running.push(sub.clone_as_download_task());
256 }
257 }
258 }
259 downloading.downloaded = downloading.history_downloaded + running_downloaded;
260 downloading.history_speed.update(Some(cur_speed), when);
261 if running.len() == 0 && downloading.closed {
262 state.task_state = TaskStateImpl::Finished(downloading.downloaded);
263 } else {
264 downloading.running = running;
265 }
266 cur_speed
267 },
268 _ => 0
269 }
270 }
271
272 async fn wait_user_canceled(&self) -> BuckyError {
273 let waiter = {
274 let mut state = self.0.state.write().unwrap();
275 match &mut state.control_state {
276 ControlStateImpl::Normal(waiters) => Some(waiters.new_waiter()),
277 _ => None
278 }
279 };
280
281
282 if let Some(waiter) = waiter {
283 let _ = StateWaiter::wait(waiter, || self.control_state()).await;
284 }
285
286 BuckyError::new(BuckyErrorCode::UserCanceled, "")
287 }
288}