cyfs_bdt/ndn/channel/
upload.rs

1use log::*;
2use std::{
3    ops::Range, 
4    sync::{RwLock}
5};
6use async_std::{
7    sync::Arc, 
8};
9use cyfs_base::*;
10use crate::{
11    types::*
12};
13use super::super::{
14    chunk::*, 
15    upload::*,
16    types::*
17};
18use super::{ 
19    protocol::v0::*, 
20    channel::Channel, 
21};
22
23struct UploadingState {
24    channel: Channel, 
25    waiters: StateWaiter, 
26    speed_counter: SpeedCounter,  
27    uploaded: u64, 
28    history_speed: HistorySpeed, 
29    encoder: Box<dyn ChunkEncoder>
30}
31
32struct StateImpl {
33    task_state: TaskStateImpl, 
34    control_state: NdnTaskControlState, 
35}
36
37enum TaskStateImpl {
38    Uploading(UploadingState),
39    Finished(u64), 
40    Error(BuckyError),
41}
42
43struct SessionImpl {
44    remote: DeviceId, 
45    chunk: ChunkId, 
46    session_id: TempSeq, 
47    piece_type: ChunkCodecDesc, 
48    state: RwLock<StateImpl>, 
49}
50
51#[derive(Clone)]
52pub struct UploadSession(Arc<SessionImpl>);
53
54impl std::fmt::Display for UploadSession {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        write!(f, "UploadSession{{session_id:{:?}, chunk:{}, remote:{}}}", self.session_id(), self.chunk(), self.remote())
57    }
58}
59
60impl UploadSession {
61    pub fn new(
62        chunk: ChunkId, 
63        session_id: TempSeq, 
64        piece_type: ChunkCodecDesc, 
65        encoder: Box<dyn ChunkEncoder>, 
66        channel: Channel
67    ) -> Self {
68        Self(Arc::new(SessionImpl {
69            remote: channel.tunnel().remote().clone(), 
70            chunk, 
71            session_id, 
72            piece_type, 
73            state: RwLock::new(StateImpl{
74                task_state: TaskStateImpl::Uploading(UploadingState {
75                    waiters: StateWaiter::new(), 
76                    history_speed: HistorySpeed::new(0, channel.config().history_speed.clone()), 
77                    speed_counter: SpeedCounter::new(0), 
78                    uploaded: 0, 
79                    encoder, 
80                    channel
81                }),
82                control_state: NdnTaskControlState::Normal
83            }), 
84        }))
85    }
86
87    pub fn remote(&self) -> &DeviceId {
88        &self.0.remote
89    }
90
91    pub fn chunk(&self) -> &ChunkId {
92        &self.0.chunk
93    }
94
95    pub fn piece_type(&self) -> &ChunkCodecDesc {
96        &self.0.piece_type
97    }
98
99    pub fn session_id(&self) -> &TempSeq {
100        &self.0.session_id
101    }
102
103
104    pub(super) fn next_piece(&self, buf: &mut [u8]) -> BuckyResult<usize> {
105        let encoder = {
106            let state = self.0.state.read().unwrap();
107            match &state.task_state {
108                TaskStateImpl::Uploading(uploading) => {
109                    Some(uploading.encoder.clone_as_encoder())
110                }, 
111                _ => None
112            }
113        };
114        if let Some(encoder) = encoder {
115            match encoder.next_piece(self.session_id(), buf) {
116                Ok(len) => {
117                    let mut state = self.0.state.write().unwrap();
118                    match &mut state.task_state {
119                        TaskStateImpl::Uploading(uploading) => {
120                            if len > 0 {
121                                uploading.speed_counter.on_recv(len);
122                                uploading.uploaded += len as u64;
123                            }
124                            Ok(len)
125                        },
126                        _ => {
127                            Err(BuckyError::new(BuckyErrorCode::ErrorState, "not uploading"))
128                        }
129                    }
130                   
131                }, 
132                Err(err) => {
133                    self.cancel_by_error(BuckyError::new(err.code(), "encoder failed"));
134                    Err(err)
135                }
136            }
137        } else {
138            Ok(0)
139        }
140    }
141
142    pub(super) fn cancel_by_error(&self, err: BuckyError) {
143        let send = {
144            let mut state = self.0.state.write().unwrap();
145            match &mut state.task_state {
146                TaskStateImpl::Error(_) => None, 
147                TaskStateImpl::Finished(_) => None,
148                TaskStateImpl::Uploading(uploading) => {
149                    let mut waiters = StateWaiter::new();
150                    uploading.waiters.transfer_into(&mut waiters);
151                    let channel = uploading.channel.clone();
152                    info!("{} canceled by err:{}", self, err);
153                    state.task_state = TaskStateImpl::Error(err.clone());
154                    Some((waiters, channel))
155                }
156            }
157        };
158
159        if let Some((waiters, channel)) = send {
160            let resp_interest = RespInterest {
161                session_id: self.session_id().clone(), 
162                chunk: self.chunk().clone(), 
163                err: err.code(), 
164                redirect: None,
165                redirect_referer: None,
166                to: None,
167            };
168            channel.resp_interest(resp_interest);
169
170            waiters.wake();
171        }
172    }
173
174    // 把第一个包加到重发队列里去
175    pub fn on_interest(&self, channel: &Channel, _interest: &Interest) -> BuckyResult<()> {
176        enum NextStep {
177            ResetEncoder(Box<dyn ChunkEncoder>), 
178            RespInterest(BuckyErrorCode), 
179            None
180        }
181        let next_step = {
182            let state = self.0.state.read().unwrap();
183            match &state.task_state {
184                TaskStateImpl::Uploading(uploading) => {
185                    NextStep::ResetEncoder(uploading.encoder.clone_as_encoder())
186                }, 
187                TaskStateImpl::Error(err) => {
188                    NextStep::RespInterest(err.code())
189                }, 
190                _ => {
191                    NextStep::None
192                }
193            }
194        };
195
196        match next_step {
197            NextStep::ResetEncoder(encoder) => {
198                debug!("{} will reset index", self);
199                if !encoder.reset() {
200                    let resp_interest = RespInterest {
201                        session_id: self.session_id().clone(), 
202                        chunk: self.chunk().clone(), 
203                        err: BuckyErrorCode::WouldBlock, 
204                        redirect: None,
205                        redirect_referer: None,
206                        to: None,
207                    };
208                    channel.resp_interest(resp_interest);
209                }
210                Ok(())
211            }, 
212            NextStep::RespInterest(err) => {
213                let resp_interest = RespInterest {
214                    session_id: self.session_id().clone(), 
215                    chunk: self.chunk().clone(), 
216                    err, 
217                    redirect: None,
218                    redirect_referer: None,
219                    to: None,
220                };
221                channel.resp_interest(resp_interest);
222                Ok(())
223            }, 
224            NextStep::None => Ok(())
225        }
226    }
227
228    pub(super) fn on_piece_control(&self, channel: &Channel, ctrl: &PieceControl) -> BuckyResult<()> {
229        enum NextStep {
230            MergeIndex(Box<dyn ChunkEncoder>, u32, Vec<Range<u32>>), 
231            RespInterest(BuckyErrorCode), 
232            Notify(StateWaiter), 
233            None
234        }
235
236        let next_step = match ctrl.command {
237            PieceControlCommand::Finish => {
238                let mut state = self.0.state.write().unwrap();
239                match &mut state.task_state {
240                    TaskStateImpl::Uploading(uploading) => {
241                        info!("{} finished", self);
242                        let mut waiters = StateWaiter::new();
243                        uploading.waiters.transfer_into(&mut waiters); 
244                        state.task_state = TaskStateImpl::Finished(uploading.uploaded);
245                        NextStep::Notify(waiters)
246                    }, 
247                    _ => {
248                        NextStep::None
249                    }
250                }
251            }, 
252            PieceControlCommand::Cancel => {
253                info!("{} canceled by remote", self);
254                let mut state = self.0.state.write().unwrap();
255                match &mut state.task_state {
256                    TaskStateImpl::Uploading(uploading) => {
257                        info!("{} finished", self);
258                        let mut waiters = StateWaiter::new();
259                        uploading.waiters.transfer_into(&mut waiters); 
260                        state.task_state = TaskStateImpl::Error(BuckyError::new(BuckyErrorCode::Interrupted, "cancel by remote"));
261                        NextStep::Notify(waiters)
262                    }, 
263                    _ => {
264                        NextStep::None
265                    }
266                }
267            }, 
268            PieceControlCommand::Continue => {
269                let state = self.0.state.read().unwrap();
270                match &state.task_state {
271                    TaskStateImpl::Uploading(uploading) => {
272                        if let Some(max_index) = ctrl.max_index {
273                            NextStep::MergeIndex(uploading.encoder.clone_as_encoder(), max_index, ctrl.lost_index.clone().unwrap_or_default())
274                        } else {
275                            NextStep::None
276                        }
277                    },
278                    TaskStateImpl::Error(err) => NextStep::RespInterest(err.code()),  
279                    _ => NextStep::None
280                }
281            },
282            _ => unimplemented!()
283        };
284
285        match next_step {
286            NextStep::MergeIndex(encoder, max_index, lost_index) => {
287                if !encoder.merge(max_index, lost_index) {
288                    let resp_interest = RespInterest {
289                        session_id: self.session_id().clone(), 
290                        chunk: self.chunk().clone(), 
291                        err: BuckyErrorCode::WouldBlock, 
292                        redirect: None,
293                        redirect_referer: None,
294                        to: None,
295                    };
296                    channel.resp_interest(resp_interest);
297                }
298            }, 
299            NextStep::RespInterest(err) => {
300                let resp_interest = RespInterest {
301                    session_id: self.session_id().clone(), 
302                    chunk: self.chunk().clone(), 
303                    err: err,
304                    redirect: None,
305                    redirect_referer: None,
306                    to: None,
307                };
308                channel.resp_interest(resp_interest);
309            }, 
310            NextStep::Notify(waiters) => {
311                waiters.wake();
312            }, 
313            NextStep::None => {
314            }
315        }
316        Ok(())
317    }
318
319    pub async fn wait_finish(&self) -> NdnTaskState {
320        let waiter = match &mut self.0.state.write().unwrap().task_state {
321            TaskStateImpl::Uploading(uploading) => Some(uploading.waiters.new_waiter()), 
322            _ => None, 
323        };
324        
325        if let Some(waiter) = waiter {
326            StateWaiter::wait(waiter, || self.state()).await
327        } else {
328            self.state()
329        }
330    }
331}
332
333
334impl NdnTask for UploadSession {
335    fn clone_as_task(&self) -> Box<dyn NdnTask> {
336        Box::new(self.clone())
337    }
338    
339    fn state(&self) -> NdnTaskState {
340        match &self.0.state.read().unwrap().task_state {
341            TaskStateImpl::Uploading(_) => NdnTaskState::Running,
342            TaskStateImpl::Finished(_) => NdnTaskState::Finished, 
343            TaskStateImpl::Error(err) => NdnTaskState::Error(err.clone()),
344        }
345    }
346
347    fn control_state(&self) -> NdnTaskControlState {
348        self.0.state.read().unwrap().control_state.clone()
349    }
350
351    fn transfered(&self) -> u64 {
352        match &self.0.state.read().unwrap().task_state {
353            TaskStateImpl::Uploading(uploading) => uploading.uploaded,
354            TaskStateImpl::Finished(uploaded) => *uploaded, 
355            TaskStateImpl::Error(_) => 0,
356        }
357    }
358
359    fn cur_speed(&self) -> u32 {
360        match &self.0.state.read().unwrap().task_state {
361            TaskStateImpl::Uploading(uploading) => {
362                uploading.history_speed.latest()
363            }, 
364            _ => 0
365        }
366    }
367
368    fn history_speed(&self) -> u32 {
369        match &self.0.state.read().unwrap().task_state {
370            TaskStateImpl::Uploading(uploading) => {
371                uploading.history_speed.average()
372            }, 
373            _ => 0
374        }
375    }
376}
377
378#[async_trait::async_trait]
379impl UploadTask for UploadSession {
380    fn clone_as_upload_task(&self) -> Box<dyn UploadTask> {
381        Box::new(self.clone())
382    }
383
384    fn calc_speed(&self, when: Timestamp) -> u32 {
385        match &mut self.0.state.write().unwrap().task_state {
386            TaskStateImpl::Uploading(uploading) => {
387                let cur_speed = uploading.speed_counter.update(when);
388                uploading.history_speed.update(Some(cur_speed), when);
389                cur_speed
390            }, 
391            _ => 0
392        }
393    }
394}
395