cyfs_bdt/ndn/upload/
group.rs1use std::{
2 collections::{HashMap},
3 sync::{Arc, RwLock}
4};
5use cyfs_base::*;
6use crate::{
7 types::*
8};
9use super::super::{
10 types::*
11};
12use super::{
13 common::*
14};
15
16enum ControlStateImpl {
17 Normal(StateWaiter),
18 Canceled,
19}
20
21struct UploadingState {
22 entries: HashMap<String, Box<dyn UploadTask>>,
23 running: Vec<Box<dyn UploadTask>>,
24 closed: bool,
25 uploaded: u64,
26 history_uploaded: u64,
27 history_speed: HistorySpeed,
28}
29
30
31enum TaskStateImpl {
32 Uploading(UploadingState),
33 Finished(u64),
34 Error(BuckyError),
35}
36
37struct StateImpl {
38 task_state: TaskStateImpl,
39 control_state: ControlStateImpl
40}
41
42struct TaskImpl {
43 history_speed: HistorySpeedConfig,
44 state: RwLock<StateImpl>
45}
46
47#[derive(Clone)]
48pub struct UploadGroup(Arc<TaskImpl>);
49
50impl UploadGroup {
51 pub fn new(history_speed: HistorySpeedConfig) -> Self {
52 Self(Arc::new(TaskImpl {
53 history_speed: history_speed.clone(),
54 state: RwLock::new(StateImpl {
55 task_state: TaskStateImpl::Uploading(UploadingState {
56 entries: Default::default(),
57 running: Default::default(),
58 closed: false,
59 uploaded: 0,
60 history_uploaded: 0,
61 history_speed: HistorySpeed::new(0, history_speed),
62 }),
63 control_state: ControlStateImpl::Normal(StateWaiter::new())
64 })
65 }))
66 }
67
68 pub fn history_config(&self) -> &HistorySpeedConfig {
69 &self.0.history_speed
70 }
71}
72
73impl NdnTask for UploadGroup {
74 fn clone_as_task(&self) -> Box<dyn NdnTask> {
75 Box::new(self.clone())
76 }
77
78 fn state(&self) -> NdnTaskState {
79 match &self.0.state.read().unwrap().task_state {
80 TaskStateImpl::Uploading(_) => NdnTaskState::Running,
81 TaskStateImpl::Finished(_) => NdnTaskState::Finished,
82 TaskStateImpl::Error(err) => NdnTaskState::Error(err.clone())
83 }
84 }
85
86 fn control_state(&self) -> NdnTaskControlState {
87 match &self.0.state.read().unwrap().control_state {
88 ControlStateImpl::Normal(_) => NdnTaskControlState::Normal,
89 ControlStateImpl::Canceled => NdnTaskControlState::Canceled
90 }
91 }
92
93
94 fn cancel(&self) -> BuckyResult<NdnTaskControlState> {
95 let (tasks, waiters) = {
96 let mut state = self.0.state.write().unwrap();
97 let waiters = match &mut state.control_state {
98 ControlStateImpl::Normal(waiters) => {
99 let waiters = Some(waiters.transfer());
100 state.control_state = ControlStateImpl::Canceled;
101 waiters
102 },
103 _ => None
104 };
105
106 let tasks = match &mut state.task_state {
107 TaskStateImpl::Uploading(uploading) => {
108 let tasks: Vec<Box<dyn UploadTask>> = uploading.running.iter().map(|t| t.clone_as_upload_task()).collect();
109 state.task_state = TaskStateImpl::Error(BuckyError::new(BuckyErrorCode::UserCanceled, "cancel invoked"));
110 tasks
111 },
112 _ => vec![]
113 };
114
115 (tasks, waiters)
116 };
117
118 if let Some(waiters) = waiters {
119 waiters.wake();
120 }
121
122 for task in tasks {
123 let _ = task.cancel();
124 }
125
126 Ok(NdnTaskControlState::Canceled)
127 }
128
129 fn close(&self, recursion: bool) -> BuckyResult<()> {
130 let children: Option<Vec<_>> = {
131 let mut state = self.0.state.write().unwrap();
132 match &mut state.task_state {
133 TaskStateImpl::Uploading(uploading) => {
134 let running = if recursion {
135 Some(uploading.running.iter().map(|t| t.clone_as_upload_task()).collect())
136 } else {
137 None
138 };
139 uploading.closed = true;
140 if uploading.running.len() == 0 {
141 state.task_state = TaskStateImpl::Finished(uploading.uploaded);
142 }
143 running
144 },
145 _ => None
146 }
147 };
148
149 if recursion {
150 for task in children.unwrap() {
151 let _ = task.close(recursion);
152 }
153 }
154
155 Ok(())
156 }
157
158 fn transfered(&self) -> u64 {
159 let state = self.0.state.read().unwrap();
160 match &state.task_state {
161 TaskStateImpl::Uploading(uploading) => uploading.uploaded,
162 TaskStateImpl::Finished(uploaded) => *uploaded,
163 _ => 0
164 }
165 }
166
167 fn cur_speed(&self) -> u32 {
168 let state = self.0.state.read().unwrap();
169 match &state.task_state {
170 TaskStateImpl::Uploading(uploading) => uploading.history_speed.latest(),
171 _ => 0
172 }
173 }
174
175 fn history_speed(&self) -> u32 {
176 let state = self.0.state.read().unwrap();
177 match &state.task_state {
178 TaskStateImpl::Uploading(uploading) => uploading.history_speed.average(),
179 _ => 0
180 }
181 }
182}
183
184#[async_trait::async_trait]
185impl UploadTask for UploadGroup {
186 fn clone_as_upload_task(&self) -> Box<dyn UploadTask> {
187 Box::new(self.clone())
188 }
189
190 fn add_task(&self, path: Option<String>, sub: Box<dyn UploadTask>) -> BuckyResult<()> {
191 let mut state = self.0.state.write().unwrap();
192 match &mut state.task_state {
193 TaskStateImpl::Uploading(uploading) => {
194 if !uploading.closed {
195 uploading.running.push(sub.clone_as_upload_task());
196 if let Some(path) = path {
197 if let Some(exists) = uploading.entries.insert(path, sub) {
198 let _ = exists.cancel();
199 }
200 }
201 Ok(())
202 } else {
203 Err(BuckyError::new(BuckyErrorCode::ErrorState, ""))
204 }
205 },
206 _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, ""))
207 }
208 }
209
210 fn sub_task(&self, path: &str) -> Option<Box<dyn UploadTask>> {
211 if path.len() == 0 {
212 Some(self.clone_as_upload_task())
213 } else {
214 let mut names = path.split("/");
215 let name = names.next().unwrap();
216
217 let state = self.0.state.read().unwrap();
218 match &state.task_state {
219 TaskStateImpl::Uploading(uploading) => {
220 let mut sub = uploading.entries.get(name).map(|t| t.clone_as_upload_task());
221 if sub.is_none() {
222 sub
223 } else {
224 for name in names {
225 sub = sub.and_then(|t| t.sub_task(name));
226 if sub.is_none() {
227 break;
228 }
229 }
230 sub
231 }
232 },
233 _ => None
234 }
235 }
236 }
237
238 fn calc_speed(&self, when: Timestamp) -> u32 {
239 let mut state = self.0.state.write().unwrap();
240 let mut running = vec![];
241 let mut cur_speed = 0;
242 let mut running_uploaded = 0;
243 match &mut state.task_state {
244 TaskStateImpl::Uploading(uploading) => {
245 for sub in &uploading.running {
246 cur_speed += sub.calc_speed(when);
247 match sub.state() {
248 NdnTaskState::Finished | NdnTaskState::Error(_) => {
249 uploading.history_uploaded += sub.transfered();
250 },
251 _ => {
252 running_uploaded += sub.transfered();
253 running.push(sub.clone_as_upload_task());
254 }
255 }
256 }
257 uploading.uploaded = uploading.history_uploaded + running_uploaded;
258 uploading.history_speed.update(Some(cur_speed), when);
259 if running.len() == 0 && uploading.closed {
260 state.task_state = TaskStateImpl::Finished(uploading.uploaded);
261 } else {
262 uploading.running = running;
263 }
264 cur_speed
265 },
266 _ => 0
267 }
268 }
269}