cyfs_bdt/ndn/upload/
group.rs

1use std::{
2    collections::{HashMap}, 
3    sync::{Arc, RwLock}
4};
5use cyfs_base::*;
6use crate::{
7    types::*
8};
9use super::super::{
10    types::*
11};
12use super::{
13    common::*
14};
15
16enum ControlStateImpl {
17    Normal(StateWaiter), 
18    Canceled,
19}
20
21struct UploadingState {
22    entries: HashMap<String, Box<dyn UploadTask>>, 
23    running: Vec<Box<dyn UploadTask>>, 
24    closed: bool, 
25    uploaded: u64, 
26    history_uploaded: u64, 
27    history_speed: HistorySpeed, 
28}
29
30
31enum TaskStateImpl {
32    Uploading(UploadingState), 
33    Finished(u64), 
34    Error(BuckyError), 
35}
36
37struct StateImpl {
38    task_state: TaskStateImpl, 
39    control_state: ControlStateImpl
40}
41
42struct TaskImpl {
43    history_speed: HistorySpeedConfig, 
44    state: RwLock<StateImpl>
45}
46
47#[derive(Clone)]
48pub struct UploadGroup(Arc<TaskImpl>);
49
50impl UploadGroup {
51    pub fn new(history_speed: HistorySpeedConfig) -> Self {
52        Self(Arc::new(TaskImpl {
53            history_speed: history_speed.clone(), 
54            state: RwLock::new(StateImpl { 
55                task_state: TaskStateImpl::Uploading(UploadingState {
56                    entries: Default::default(), 
57                    running: Default::default(), 
58                    closed: false, 
59                    uploaded: 0, 
60                    history_uploaded: 0, 
61                    history_speed: HistorySpeed::new(0, history_speed), 
62                }), 
63                control_state: ControlStateImpl::Normal(StateWaiter::new())
64            })
65        }))
66    }
67
68    pub fn history_config(&self) -> &HistorySpeedConfig {
69        &self.0.history_speed
70    }
71}
72
73impl NdnTask for UploadGroup {
74    fn clone_as_task(&self) -> Box<dyn NdnTask> {
75        Box::new(self.clone())
76    }
77
78    fn state(&self) -> NdnTaskState {
79        match &self.0.state.read().unwrap().task_state {
80            TaskStateImpl::Uploading(_) => NdnTaskState::Running,
81            TaskStateImpl::Finished(_) => NdnTaskState::Finished, 
82            TaskStateImpl::Error(err) => NdnTaskState::Error(err.clone())
83        }
84    }
85
86    fn control_state(&self) -> NdnTaskControlState {
87        match &self.0.state.read().unwrap().control_state {
88            ControlStateImpl::Normal(_) => NdnTaskControlState::Normal, 
89            ControlStateImpl::Canceled => NdnTaskControlState::Canceled
90        }
91    }
92
93    
94    fn cancel(&self) -> BuckyResult<NdnTaskControlState> {
95        let (tasks, waiters) = {
96            let mut state = self.0.state.write().unwrap();
97            let waiters = match &mut state.control_state {
98                ControlStateImpl::Normal(waiters) => {
99                    let waiters = Some(waiters.transfer());
100                    state.control_state = ControlStateImpl::Canceled;
101                    waiters
102                }, 
103                _ => None
104            };
105
106            let tasks = match &mut state.task_state {
107                TaskStateImpl::Uploading(uploading) => {
108                    let tasks: Vec<Box<dyn UploadTask>> = uploading.running.iter().map(|t| t.clone_as_upload_task()).collect();
109                    state.task_state = TaskStateImpl::Error(BuckyError::new(BuckyErrorCode::UserCanceled, "cancel invoked"));
110                    tasks
111                },
112                _ => vec![]
113            };
114
115            (tasks, waiters)
116        };
117
118        if let Some(waiters) = waiters {
119            waiters.wake();
120        }
121
122        for task in tasks {
123            let _ = task.cancel();
124        }
125        
126        Ok(NdnTaskControlState::Canceled)
127    }
128
129    fn close(&self, recursion: bool) -> BuckyResult<()> {
130        let children: Option<Vec<_>> = {
131            let mut state = self.0.state.write().unwrap();
132            match &mut state.task_state {
133                TaskStateImpl::Uploading(uploading) => {
134                    let running = if recursion {
135                        Some(uploading.running.iter().map(|t| t.clone_as_upload_task()).collect())
136                    } else {
137                        None
138                    };
139                    uploading.closed = true;
140                    if uploading.running.len() == 0 {
141                        state.task_state = TaskStateImpl::Finished(uploading.uploaded);
142                    } 
143                    running
144                },
145                _ => None
146            }
147        };
148
149        if recursion {
150            for task in children.unwrap() {
151                let _ = task.close(recursion);
152            }
153        }
154        
155        Ok(())
156    }
157
158    fn transfered(&self) -> u64 {
159        let state = self.0.state.read().unwrap();
160        match &state.task_state {
161            TaskStateImpl::Uploading(uploading) => uploading.uploaded,
162            TaskStateImpl::Finished(uploaded) => *uploaded, 
163            _ => 0
164        }
165    }
166
167    fn cur_speed(&self) -> u32 {
168        let state = self.0.state.read().unwrap();
169        match &state.task_state {
170            TaskStateImpl::Uploading(uploading) => uploading.history_speed.latest(),
171            _ => 0
172        }
173    }
174
175    fn history_speed(&self) -> u32 {
176        let state = self.0.state.read().unwrap();
177        match &state.task_state {
178            TaskStateImpl::Uploading(uploading) => uploading.history_speed.average(),
179            _ => 0
180        }
181    }
182}
183
184#[async_trait::async_trait]
185impl UploadTask for UploadGroup {
186    fn clone_as_upload_task(&self) -> Box<dyn UploadTask> {
187        Box::new(self.clone())
188    }
189
190    fn add_task(&self, path: Option<String>, sub: Box<dyn UploadTask>) -> BuckyResult<()> {
191        let mut state = self.0.state.write().unwrap();
192        match &mut state.task_state {
193            TaskStateImpl::Uploading(uploading) => {
194                if !uploading.closed {
195                    uploading.running.push(sub.clone_as_upload_task());
196                    if let Some(path) = path {
197                        if let Some(exists) = uploading.entries.insert(path, sub) {
198                            let _ = exists.cancel();
199                        }
200                    }
201                    Ok(())
202                } else {
203                    Err(BuckyError::new(BuckyErrorCode::ErrorState, ""))
204                }
205            },
206            _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, ""))
207        }
208    }
209
210    fn sub_task(&self, path: &str) -> Option<Box<dyn UploadTask>> {
211        if path.len() == 0 {
212            Some(self.clone_as_upload_task())
213        } else {
214            let mut names = path.split("/");
215            let name = names.next().unwrap();
216    
217            let state = self.0.state.read().unwrap(); 
218            match &state.task_state {
219                TaskStateImpl::Uploading(uploading) => {
220                    let mut sub = uploading.entries.get(name).map(|t| t.clone_as_upload_task());
221                    if sub.is_none() {
222                        sub 
223                    } else {
224                        for name in names {
225                            sub = sub.and_then(|t| t.sub_task(name));
226                            if sub.is_none() {
227                                break;
228                            }
229                        }
230                        sub
231                    }
232                },
233                _ => None
234            }
235        }
236    }
237
238    fn calc_speed(&self, when: Timestamp) -> u32 {
239        let mut state = self.0.state.write().unwrap();
240        let mut running = vec![];
241        let mut cur_speed = 0;
242        let mut running_uploaded = 0;
243        match &mut state.task_state {
244            TaskStateImpl::Uploading(uploading) => {
245                for sub in &uploading.running {
246                    cur_speed += sub.calc_speed(when);
247                    match sub.state() {
248                        NdnTaskState::Finished | NdnTaskState::Error(_) => {
249                            uploading.history_uploaded += sub.transfered();
250                        }, 
251                        _ => {
252                            running_uploaded += sub.transfered();
253                            running.push(sub.clone_as_upload_task());
254                        }
255                    }  
256                }
257                uploading.uploaded = uploading.history_uploaded + running_uploaded;
258                uploading.history_speed.update(Some(cur_speed), when);
259                if running.len() == 0 && uploading.closed {
260                    state.task_state = TaskStateImpl::Finished(uploading.uploaded);
261                } else {
262                    uploading.running = running;
263                }
264                cur_speed
265            },
266            _ => 0
267        }
268    }
269}