cyfs_bdt/ndn/channel/
download.rs

1use log::*;
2use std::{
3    sync::{RwLock}, 
4};
5use async_std::{
6    sync::Arc, 
7};
8use futures::future::AbortRegistration;
9use cyfs_base::*;
10use crate::{
11    types::*
12};
13use super::super::{
14    chunk::*, 
15    types::*, 
16    download::*
17};
18use super::{
19    protocol::v0::*,
20    channel::{Channel, WeakChannel},
21    tunnel::{
22        DynamicChannelTunnel, 
23        TunnelDownloadState
24    } 
25};
26
27
28struct InterestingState {
29    waiters: StateWaiter,  
30    last_send_time: Option<Timestamp>, 
31    next_send_time: Option<Timestamp>,  
32    history_speed: HistorySpeed, 
33    cache: ChunkStreamCache, 
34    channel: Channel
35}
36
37struct DownloadingState {
38    waiters: StateWaiter, 
39    tunnel_state: Box<dyn TunnelDownloadState>, 
40    decoder: Box<dyn ChunkDecoder>, 
41    speed_counter: SpeedCounter, 
42    history_speed: HistorySpeed, 
43    channel: Channel
44}
45
46
47struct FinishedState {
48    send_ctrl_time: Option<(WeakChannel, Timestamp)>, 
49}
50
51struct CanceledState {
52    send_ctrl_time: Option<(WeakChannel, Timestamp)>, 
53    err: BuckyError
54}
55
56#[derive(Debug, Clone)]
57pub enum DownloadSessionState {
58    Downloading, 
59    Finished,
60    Canceled(BuckyError),
61}
62
63enum StateImpl {
64    Interesting(InterestingState), 
65    Downloading(DownloadingState),
66    Finished(FinishedState), 
67    Canceled(CanceledState),
68} 
69
70impl StateImpl {
71    fn to_session_state(&self) -> DownloadSessionState {
72        match self {
73            Self::Interesting(_) => DownloadSessionState::Downloading, 
74            Self::Downloading(_) => DownloadSessionState::Downloading, 
75            Self::Finished(_) => DownloadSessionState::Finished, 
76            Self::Canceled(canceled) => DownloadSessionState::Canceled(canceled.err.clone()),
77        }
78    }
79}
80
81
82struct SessionImpl {
83    chunk: ChunkId, 
84    session_id: TempSeq, 
85    source: DownloadSource<DeviceId>, 
86    referer: Option<String>,  
87    group_path: Option<String>, 
88    state: RwLock<StateImpl>, 
89}
90
91#[derive(Clone)]
92pub struct DownloadSession(Arc<SessionImpl>);
93
94impl std::fmt::Display for DownloadSession {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        write!(f, "DownloadSession{{session_id:{:?}, chunk:{}, source:{}}}", self.session_id(), self.chunk(), self.source().target)
97    }
98}
99
100
101impl DownloadSession {
102    pub fn error(
103        chunk: ChunkId, 
104        session_id: Option<TempSeq>, 
105        source: DownloadSource<DeviceId>, 
106        referer: Option<String>, 
107        group_path: Option<String>,
108        err: BuckyError
109    ) -> Self {
110        Self(Arc::new(SessionImpl {
111            chunk, 
112            session_id: session_id.unwrap_or_default(), 
113            source, 
114            referer, 
115            group_path, 
116            state: RwLock::new(StateImpl::Canceled(CanceledState {
117                send_ctrl_time: None, 
118                err
119            })), 
120        }))
121    }
122
123    pub fn interest(
124        chunk: ChunkId, 
125        session_id: TempSeq, 
126        channel: Channel, 
127        source: DownloadSource<DeviceId>, 
128        cache: ChunkStreamCache,
129        referer: Option<String>, 
130        group_path: Option<String>
131    ) -> Self { 
132        Self(Arc::new(SessionImpl {
133            chunk, 
134            session_id,  
135            source, 
136            referer, 
137            group_path, 
138            state: RwLock::new(StateImpl::Interesting(InterestingState { 
139                history_speed: HistorySpeed::new(0, channel.config().history_speed.clone()), 
140                waiters: StateWaiter::new(), 
141                last_send_time: None, 
142                next_send_time: None, 
143                channel, 
144                cache
145            })), 
146        }))
147    }
148
149    pub fn source(&self) -> &DownloadSource<DeviceId> {
150        &self.0.source
151    }
152
153    pub fn referer(&self) -> &Option<String> {
154        &self.0.referer
155    }
156
157    pub fn group_path(&self) -> &Option<String> {
158        &self.0.group_path
159    }
160
161    pub fn start(&self) {
162        let send = {
163            let state = &mut *self.0.state.write().unwrap();
164            match state {
165                StateImpl::Interesting(interesting) => {
166                    if interesting.last_send_time.is_none() {
167                        let now = bucky_time_now();
168                        interesting.last_send_time = Some(now);
169                        interesting.next_send_time = Some(now + interesting.channel.config().resend_interval.as_micros() as u64);
170                        Some(interesting.channel.clone())
171                    } else {
172                        None
173                    }
174                }, 
175                _ => None
176            }
177        };
178        
179        if let Some(channel) = send {
180            let interest = Interest {
181                session_id: self.session_id().clone(), 
182                chunk: self.chunk().clone(), 
183                prefer_type: self.source().codec_desc.clone(), 
184                referer: self.referer().clone(), 
185                group_path: self.group_path().clone(), 
186                from: None, 
187            };
188            info!("{} sent {:?}", self, interest);
189            channel.interest(interest);
190        }
191       
192    }
193
194    pub fn chunk(&self) -> &ChunkId {
195        &self.0.chunk
196    }
197
198    pub fn state(&self) -> DownloadSessionState {
199        (&self.0.state.read().unwrap()).to_session_state()
200    }
201
202    pub fn session_id(&self) -> &TempSeq {
203        &self.0.session_id
204    }
205
206    pub fn ptr_eq(&self, other: &Self) -> bool {
207        Arc::ptr_eq(&self.0, &other.0)
208    }
209
210    pub async fn wait_finish(&self) -> DownloadSessionState {
211        enum NextStep {
212            Wait(AbortRegistration), 
213            Return(DownloadSessionState)
214        }
215        let next_step = {
216            let state = &mut *self.0.state.write().unwrap();
217            match state {
218                StateImpl::Interesting(interesting) => NextStep::Wait(interesting.waiters.new_waiter()), 
219                StateImpl::Downloading(downloading) => NextStep::Wait(downloading.waiters.new_waiter()),
220                StateImpl::Finished(_) => NextStep::Return(DownloadSessionState::Finished), 
221                StateImpl::Canceled(canceled) => NextStep::Return(DownloadSessionState::Canceled(canceled.err.clone())),
222            }
223        };
224        match next_step {
225            NextStep::Wait(waker) => StateWaiter::wait(waker, || self.state()).await,
226            NextStep::Return(state) => state
227        }
228    }
229
230    pub(super) fn push_piece_data(&self, channel: &Channel, piece: &PieceData, tunnel: &DynamicChannelTunnel) {
231        enum NextStep {
232            EnterDownloading, 
233            RespControl(PieceControlCommand), 
234            Ignore, 
235            Push(Box<dyn ChunkDecoder>)
236        }
237        use NextStep::*;
238        use StateImpl::*;
239        let next_step = {
240            let state = &mut *self.0.state.write().unwrap();
241            match state {
242                Interesting(_) => EnterDownloading, 
243                Downloading(downloading) => {
244                    downloading.speed_counter.on_recv(piece.data.len());
245                    Push(downloading.decoder.clone_as_decoder())
246                },
247                Finished(finished) => {
248                    let now = bucky_time_now();
249                    if finished.send_ctrl_time.is_none() {
250                        finished.send_ctrl_time = Some((channel.to_weak(), now + channel.config().resend_interval.as_micros() as u64))
251                    } {
252                        Ignore
253                    }
254                }, 
255                Canceled(canceled) => {
256                    let now = bucky_time_now();
257                    if canceled.send_ctrl_time.is_none() {
258                        canceled.send_ctrl_time = Some((channel.to_weak(), now + channel.config().resend_interval.as_micros() as u64))
259                    } {
260                        Ignore
261                    }
262                }, 
263            }
264        };
265
266        let resp_control = |command: PieceControlCommand| {
267            channel.send_piece_control(PieceControl {
268                sequence: channel.gen_command_seq(), 
269                session_id: self.session_id().clone(), 
270                chunk: self.chunk().clone(), 
271                command, 
272                max_index: None, 
273                lost_index: None
274            });
275        };
276
277        let push_to_decoder = |provider: Box<dyn ChunkDecoder>| {
278            let result = provider.push_piece_data(piece).unwrap(); 
279            if let Some(waiters) = {
280                let state = &mut *self.0.state.write().unwrap();
281                match state {
282                    Downloading(downloading) => {
283                        if result.valid {
284                            downloading.tunnel_state.as_mut().on_piece_data();
285                        }
286                        if result.finished {
287                            let mut waiters = StateWaiter::new();
288                            std::mem::swap(&mut waiters, &mut downloading.waiters);
289                            info!("{} finished", self);
290                            *state = Finished(FinishedState {
291                                send_ctrl_time: None, 
292                            });
293                            Some(waiters)
294                        } else {
295                            None
296                        } 
297                    }, 
298                    _ => None
299                }
300            } {
301                waiters.wake();
302                resp_control(PieceControlCommand::Finish);
303            }
304        };
305
306        match next_step {
307            EnterDownloading => {
308                if let Some(decoder) = {
309                    let state = &mut *self.0.state.write().unwrap();
310                    match state {
311                        Interesting(interesting) => {
312                            let decoder = StreamDecoder::new(self.chunk(), &self.source().codec_desc, interesting.cache.clone());
313                            let mut downloading = DownloadingState {
314                                channel: channel.clone(), 
315                                tunnel_state: tunnel.as_ref().download_state(), 
316                                history_speed: interesting.history_speed.clone(), 
317                                speed_counter: SpeedCounter::new(piece.data.len()), 
318                                decoder: decoder.clone_as_decoder(), 
319                                waiters: StateWaiter::new(), 
320                            };
321                            std::mem::swap(&mut downloading.waiters, &mut interesting.waiters);
322                            *state = Downloading(downloading);
323                            Some(decoder.clone_as_decoder())
324                        }, 
325                        Downloading(downloading) => {
326                            Some(downloading.decoder.clone_as_decoder())
327                        }, 
328                        _ => None
329                    }
330                } {
331                    push_to_decoder(decoder)
332                }
333                
334            }, 
335            Push(decoder) => {
336                push_to_decoder(decoder)
337            }, 
338            RespControl(cmd) => resp_control(cmd), 
339            Ignore => {}
340        }
341    }
342
343    pub(super) fn on_resp_interest(&self, channel: &Channel, resp_interest: &RespInterest) -> BuckyResult<()> {
344        match resp_interest.err {
345            BuckyErrorCode::Ok => unimplemented!(), 
346            BuckyErrorCode::WouldBlock => {
347                use StateImpl::*;
348                let state = &mut *self.0.state.write().unwrap();
349                match state {
350                    Interesting(interesting) => {
351                        interesting.next_send_time = Some(bucky_time_now() + channel.config().block_interval.as_micros() as u64);  
352                    }, 
353                    Downloading(downloading) => {
354                        downloading.tunnel_state.on_resp_interest();
355                    }, 
356                    Finished(finished) => {
357                        let now = bucky_time_now();
358                        if finished.send_ctrl_time.is_none() {
359                            finished.send_ctrl_time = Some((channel.to_weak(), now + channel.config().resend_interval.as_micros() as u64))
360                        } 
361                    }, 
362                    Canceled(canceled) => {
363                        let now = bucky_time_now();
364                        if canceled.send_ctrl_time.is_none() {
365                            canceled.send_ctrl_time = Some((channel.to_weak(), now + channel.config().resend_interval.as_micros() as u64))
366                        } 
367                    }
368                }   
369            }, 
370            _ => {
371                error!("{} cancel by err {}", self, resp_interest.err);
372
373                let mut waiters = StateWaiter::new();
374                {
375                    let state = &mut *self.0.state.write().unwrap();
376                    match state {
377                        StateImpl::Interesting(interesting) => {
378                            std::mem::swap(&mut waiters, &mut interesting.waiters);
379                            *state = StateImpl::Canceled(CanceledState {
380                                send_ctrl_time: None, 
381                                err: BuckyError::new(resp_interest.err, "cancel by remote")
382                            });
383                        },
384                        StateImpl::Downloading(downloading) => {
385                            std::mem::swap(&mut waiters, &mut downloading.waiters);
386                            *state = StateImpl::Canceled(CanceledState {
387                                send_ctrl_time: None, 
388                                err: BuckyError::new(resp_interest.err, "cancel by remote")
389                            });
390                        },
391                        _ => {}
392                    };
393                }
394                
395                waiters.wake();
396            }
397        }
398        Ok(())
399    }
400
401    fn resend_interest(&self, channel: &Channel) -> BuckyResult<()> {
402        let interest = Interest {
403            session_id: self.session_id().clone(), 
404            chunk: self.chunk().clone(), 
405            prefer_type: self.source().codec_desc.clone(), 
406            referer: self.referer().clone(), 
407            from: None, 
408            group_path: None
409        };
410        info!("{} sent {:?}", self, interest);
411        channel.interest(interest);
412        Ok(())
413    }
414
415
416    pub fn cancel_by_error(&self, err: BuckyError) {
417        error!("{} cancel by err {}", self, err);
418
419        let mut waiters = StateWaiter::new();
420        let send = {
421            let state = &mut *self.0.state.write().unwrap();
422            match state {
423                StateImpl::Interesting(interesting) => {
424                    std::mem::swap(&mut waiters, &mut interesting.waiters);
425                    let channel = interesting.channel.clone();
426                    *state = StateImpl::Canceled(CanceledState {
427                        send_ctrl_time: None, 
428                        err
429                    });
430                    Some(channel)
431                },
432                StateImpl::Downloading(downloading) => {
433                    std::mem::swap(&mut waiters, &mut downloading.waiters);
434                    let channel = downloading.channel.clone();
435                    *state = StateImpl::Canceled(CanceledState {
436                        send_ctrl_time: None, 
437                        err
438                    });
439                    Some(channel)
440                },
441                _ => None
442            }
443        };
444        waiters.wake();
445
446        if let Some(channel) = send {
447            channel.send_piece_control(PieceControl {
448                sequence: channel.gen_command_seq(), 
449                session_id: self.session_id().clone(), 
450                chunk: self.chunk().clone(), 
451                command: PieceControlCommand::Cancel, 
452                max_index: None, 
453                lost_index: None
454            });
455        }
456    }
457
458    pub(super) fn on_time_escape(&self, now: Timestamp) -> BuckyResult<()> {
459        enum NextStep {
460            None, 
461            SendInterest(Channel), 
462            SendPieceControl(Channel, PieceControl), 
463        }
464
465        let next_step = {
466            let state = &mut *self.0.state.write().unwrap();
467            match state {
468                StateImpl::Interesting(interesting) => {
469                    if let Some(next_send_time) = interesting.next_send_time {
470                        if now > next_send_time {
471                            interesting.next_send_time = Some(now + 2 * (next_send_time - interesting.last_send_time.unwrap()));
472                            interesting.last_send_time = Some(now);
473                            NextStep::SendInterest(interesting.channel.clone())
474                        } else {
475                            NextStep::None
476                        }
477                    } else {
478                        NextStep::None
479                    }
480                   
481                }, 
482                StateImpl::Downloading(downloading) => {
483                    if downloading.tunnel_state.as_mut().on_time_escape(now) {
484                        if let Some((max_index, lost_index)) = downloading.decoder.require_index() {
485                            debug!("{} dectect loss piece max_index:{:?} lost_index:{:?}", self, max_index, lost_index);
486                            NextStep::SendPieceControl(downloading.channel.clone(), PieceControl {
487                                sequence: downloading.channel.gen_command_seq(), 
488                                session_id: self.session_id().clone(), 
489                                chunk: self.chunk().clone(), 
490                                command: PieceControlCommand::Continue, 
491                                max_index, 
492                                lost_index
493                            })
494                        } else {
495                            NextStep::None
496                        }
497                    } else {
498                        NextStep::None
499                    }
500                },
501                StateImpl::Finished(finished) => {
502                    if let Some((channel, send_time)) = &finished.send_ctrl_time {
503                        if now > *send_time {
504                            let channel = channel.to_strong();
505                            finished.send_ctrl_time = None;
506                            if let Some(channel) = channel {
507                                let ctrl = PieceControl {
508                                    sequence: channel.gen_command_seq(), 
509                                    session_id: self.session_id().clone(), 
510                                    chunk: self.chunk().clone(), 
511                                    command: PieceControlCommand::Finish, 
512                                    max_index: None, 
513                                    lost_index: None
514                                }; 
515                                
516                                NextStep::SendPieceControl(channel, ctrl) 
517                            } else {
518                                NextStep::None
519                            }
520                        } else {
521                            NextStep::None
522                        }
523                    } else {
524                        NextStep::None
525                    }
526                }
527                StateImpl::Canceled(canceled) => {
528                    if let Some((channel, send_time)) = &canceled.send_ctrl_time {
529                        if now > *send_time {
530                            let channel = channel.to_strong();
531                            canceled.send_ctrl_time = None;
532                            if let Some(channel) = channel {
533                                let ctrl = PieceControl {
534                                    sequence: channel.gen_command_seq(), 
535                                    session_id: self.session_id().clone(), 
536                                    chunk: self.chunk().clone(), 
537                                    command: PieceControlCommand::Cancel, 
538                                    max_index: None, 
539                                    lost_index: None
540                                };
541                                NextStep::SendPieceControl(channel, ctrl) 
542                            } else {
543                                NextStep::None
544                            }
545                        } else {
546                            NextStep::None
547                        }
548                    } else {
549                        NextStep::None
550                    }
551                }
552            }
553        };
554        
555        match next_step {
556            NextStep::None => Ok(()), 
557            NextStep::SendInterest(channel) => {
558                let _ = self.resend_interest(&channel);
559                Ok(())
560            }, 
561            NextStep::SendPieceControl(channel, ctrl) => {
562                channel.send_piece_control(ctrl);
563                Ok(())
564            }
565        }
566    }
567
568    pub fn calc_speed(&self, when: Timestamp) -> u32 {
569        let state = &mut *self.0.state.write().unwrap();
570        match state {
571            StateImpl::Interesting(interesting) => {
572                interesting.history_speed.update(Some(0), when);
573                0
574            },
575            StateImpl::Downloading(downloading) => {
576                let cur_speed = downloading.speed_counter.update(when);
577                downloading.history_speed.update(Some(cur_speed), when);
578                cur_speed
579            },
580            _ => 0
581        }
582    }
583
584    pub fn cur_speed(&self) -> u32 {
585        let state = &*self.0.state.read().unwrap();
586        match state {
587            StateImpl::Downloading(downloading) => downloading.history_speed.latest(),
588            _ => 0
589        }
590    }
591
592    pub fn history_speed(&self) -> u32 {
593        let state = &*self.0.state.read().unwrap();
594        match state {
595            StateImpl::Interesting(interesting) => interesting.history_speed.average(),
596            StateImpl::Downloading(downloading) => downloading.history_speed.average(),
597            _ => 0
598        }
599    }
600}
601
602
603
604