cyfs_bdt/ndn/chunk/
download.rs

1use std::{
2    sync::{RwLock, Arc, Weak}, collections::LinkedList,
3};
4use async_std::{ 
5    task, 
6};
7use cyfs_base::*;
8use crate::{
9    types::*, 
10    stack::{WeakStack, Stack}
11};
12use super::super::{ 
13    types::*, 
14    channel::*, 
15    download::*,
16};
17use super::{
18    cache::*, 
19};
20
21#[derive(Debug)]
22struct QueryContextOp {
23    op_id: IncreaseId, 
24    filter: DownloadSourceFilter, 
25    limit: usize
26}
27
28#[derive(Debug)]
29struct StartSessionOp {
30    op_id: IncreaseId, 
31    update_at: Timestamp, 
32    source: DownloadSource<DeviceDesc>
33}
34
35#[derive(Debug)]
36enum SessionOp {
37    None, 
38    TrigerDrain(Timestamp), 
39    StartSession(StartSessionOp), 
40    QueryContext(QueryContextOp)
41}
42
43#[derive(Clone)]
44enum TryingSession {
45    None, 
46    Starting(IncreaseId), 
47    Running(DownloadSession)
48}
49
50impl TryingSession {
51    fn as_session(&self) -> Option<&DownloadSession> {
52        match self {
53            Self::Running(session) => Some(session),
54            _ => None
55        }
56    }
57}
58
59struct SingleStreamSession {
60    gen_id: IncreaseIdGenerator, 
61    update_at: Timestamp, 
62    tried: LinkedList<DownloadSession>, 
63    trying: TryingSession, 
64    querying: Option<IncreaseId>
65}
66
67impl SingleStreamSession {
68    fn new(update_at: Timestamp) -> (Self, QueryContextOp) {
69        let gen_id = IncreaseIdGenerator::new();
70        let op_id = gen_id.generate();
71        let session = Self {
72            gen_id, 
73            update_at, 
74            tried: LinkedList::new(), 
75            trying: TryingSession::None, 
76            querying: Some(op_id)
77        };
78        let op = QueryContextOp {
79            op_id, 
80            filter: session.next_filter(), 
81            limit: 1
82        }; 
83        (session, op)
84    }
85
86    fn check_context(&mut self, update_at: Timestamp) -> SessionOp {
87        if self.querying.is_some() {
88            return SessionOp::None;
89        }
90
91        match self.trying.clone() {
92            TryingSession::Starting(_) => SessionOp::None, 
93            TryingSession::None => {
94                if update_at != self.update_at {
95                    let op_id = self.gen_id.generate();
96                    let op = QueryContextOp {
97                        op_id, 
98                        filter: self.next_filter(), 
99                        limit: 1
100                    }; 
101                    self.querying = Some(op_id);
102                    SessionOp::QueryContext(op)
103                } else {
104                    SessionOp::None
105                }
106            }, 
107            TryingSession::Running(session) => {
108                match session.state() {
109                    DownloadSessionState::Downloading => {
110                        if update_at != self.update_at {
111                            let op_id = self.gen_id.generate();
112                            let op = QueryContextOp {
113                                op_id, 
114                                filter: self.check_filter(), 
115                                limit: 1
116                            }; 
117                            self.querying = Some(op_id);
118                            SessionOp::QueryContext(op)
119                        } else {
120                            SessionOp::None
121                        }
122                    }, 
123                    DownloadSessionState::Canceled(_) => {
124                        self.trying = TryingSession::None;
125                        self.tried.push_back(session);
126                        let op_id = self.gen_id.generate();
127                        let op = QueryContextOp {
128                            op_id, 
129                            filter: self.next_filter(), 
130                            limit: 1
131                        }; 
132                        self.querying = Some(op_id);
133                        SessionOp::QueryContext(op)
134                    },
135                    DownloadSessionState::Finished => SessionOp::None
136                }
137            }
138        } 
139    }
140
141    fn trying(&self) -> Option<&DownloadSession> {
142        self.trying.as_session()
143    }
144
145    fn next_filter(&self) -> DownloadSourceFilter {
146        DownloadSourceFilter {
147            exclude_target: Some(self.tried.iter().map(|session| session.source().target.clone()).collect()), 
148            include_target: None, 
149            include_codec: Some(vec![ChunkCodecDesc::Stream(None, None, None)]), 
150        }
151    }
152
153    fn check_filter(&self) -> DownloadSourceFilter {
154        DownloadSourceFilter {
155            exclude_target: Some(self.tried.iter().map(|session| session.source().target.clone()).collect()), 
156            include_target: self.trying.as_session().map(|session| vec![session.source().target.clone()]), 
157            include_codec: self.trying.as_session().map(|session| vec![session.source().codec_desc.clone()]), 
158        }
159    }
160
161    fn on_session_created(&mut self, op_id: IncreaseId, session: DownloadSession) -> bool {
162        let start = match &self.trying {
163            TryingSession::Starting(stub_id) => *stub_id == op_id,
164            _ => false
165        };
166        if !start {
167            return false;
168        }
169
170        self.trying = TryingSession::Running(session);
171        true
172    }
173
174    fn on_query_finished(&mut self, owner: ChunkDownloader, op: &QueryContextOp, result: (LinkedList<DownloadSource<DeviceDesc>>, Timestamp)) -> SessionOp {
175        let (mut sources, update_at) = result;
176
177        if !self.querying.map(|stub_id| stub_id == op.op_id).unwrap_or(false) {
178            info!("{} ignore queried sources for another query posted, op_id={}", owner, op.op_id);
179            return SessionOp::None;
180        }
181        self.querying = None;
182
183        let trying = self.trying.clone();
184        match trying {
185            TryingSession::None => {
186                if update_at != self.update_at {
187                    self.update_at = update_at;
188                } 
189                if sources.len() == 0 {
190                    SessionOp::TrigerDrain(update_at)
191                } else {
192                    let op_id = self.gen_id.generate();
193                    self.trying = TryingSession::Starting(op_id);
194                    SessionOp::StartSession(StartSessionOp {
195                        op_id, 
196                        update_at, 
197                        source: sources.pop_front().unwrap()
198                    })
199                }
200            }, 
201            TryingSession::Starting(_) => {
202                info!("{} ignore queried sources for another session starting, op_id={}", owner, op.op_id);
203                SessionOp::None
204            }, 
205            TryingSession::Running(session) => {
206                if update_at != self.update_at {
207                    self.update_at = update_at;
208                    if sources.len() == 0 {
209                        info!("{} cancel current session for context updated, op_id={}, session={}", owner, op.op_id, session);
210                        session.cancel_by_error(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled"));
211                        self.tried.push_back(session);
212                        self.trying = TryingSession::None;
213    
214                        let op_id = self.gen_id.generate();
215                        self.querying = Some(op_id);
216                        SessionOp::QueryContext(QueryContextOp {
217                            op_id, 
218                            filter: self.next_filter(), 
219                            limit: 1
220                        })
221                    } else {
222                        SessionOp::None
223                    }
224                } else {
225                    unreachable!()
226                }
227            }
228        }
229    }
230}
231
232
233
234enum StateImpl {
235    Loading, 
236    Downloading(SingleStreamSession), 
237    Finished
238}
239
240struct ChunkDowloaderImpl { 
241    stack: WeakStack, 
242    task: Box<dyn LeafDownloadTask>, 
243    cache: ChunkCache, 
244    state: RwLock<StateImpl>, 
245}
246
247impl std::fmt::Display for ChunkDowloaderImpl {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        write!(f, "ChunkDownloader{{chunk:{}}}", self.cache.chunk())
250    }
251}
252
253#[derive(Clone)]
254pub struct ChunkDownloader(Arc<ChunkDowloaderImpl>);
255
256impl Drop for ChunkDowloaderImpl {
257    fn drop(&mut self) {
258        let session = {
259            let state = &mut *self.state.write().unwrap();
260            match state {
261                StateImpl::Downloading(downloading) => downloading.trying.as_session().cloned(), 
262                _ => None
263            }
264        };
265       
266        if let Some(session) = session {
267            info!("{} canceled for drop", self);
268            session.cancel_by_error(BuckyError::new(BuckyErrorCode::UserCanceled, "user canceled"));
269        }
270    }
271}
272
273#[derive(Clone)]
274pub struct WeakChunkDownloader(Weak<ChunkDowloaderImpl>);
275
276impl WeakChunkDownloader {
277    pub fn to_strong(&self) -> Option<ChunkDownloader> {
278        Weak::upgrade(&self.0).map(|arc| ChunkDownloader(arc))
279    }
280}
281
282impl ChunkDownloader {
283    pub fn to_weak(&self) -> WeakChunkDownloader {
284        WeakChunkDownloader(Arc::downgrade(&self.0))
285    }
286}
287
288impl std::fmt::Display for ChunkDownloader {
289    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
290        write!(f, "{}", self.0)
291    }
292}
293
294
295impl ChunkDownloader {
296    pub fn new(
297        stack: WeakStack, 
298        cache: ChunkCache, 
299        task: Box<dyn LeafDownloadTask>
300    ) -> Self {
301        let downloader = Self(Arc::new(ChunkDowloaderImpl {
302            stack, 
303            cache, 
304            task, 
305            state: RwLock::new(StateImpl::Loading), 
306        }));
307
308        {
309            let downloader = downloader.clone();
310            
311            task::spawn(async move {
312                info!("{} begin load cache", downloader);
313                let finished = downloader.cache().wait_loaded().await;
314                let update_at = task::block_on(downloader.owner().context().update_at());
315                let op = {   
316                    let state = &mut *downloader.0.state.write().unwrap();
317                    if let StateImpl::Loading = state {
318                        if finished {
319                            *state = StateImpl::Finished;
320                            info!("{} finished for cache exists", downloader);
321                            None
322                        } else {
323                            info!("{} enter downloading", downloader);
324                            let (downloading, op) = SingleStreamSession::new(update_at);
325                            *state = StateImpl::Downloading(downloading);
326                            Some(op)
327                        }
328                    } else {
329                        unreachable!()
330                    }
331                };
332               
333                if let Some(op) = op {
334                    {
335                        let downloader = downloader.clone();
336                        task::spawn(async move { downloader.sync_finished().await; });
337                    }
338                    downloader.query_context(op).await;
339                }
340                
341            });
342        }
343        
344        downloader
345    }
346
347    fn on_session_op(&self, op: SessionOp) {
348        match op {
349            SessionOp::None => {},
350            SessionOp::QueryContext(op) => { 
351                let downloader = self.clone(); 
352                task::spawn(async move { downloader.query_context(op).await; }); 
353            }, 
354            SessionOp::TrigerDrain(update_at) => self.owner().context().on_drain(self.owner(), update_at), 
355            SessionOp::StartSession(op) => { 
356                let downloader = self.clone(); 
357                task::spawn(async move { downloader.start_session(op).await; }); 
358            }
359        }
360    }
361
362    async fn query_context(&self, mut op: QueryContextOp) {
363        op.filter.fill_values(self.chunk());
364        let result = self.owner().context().sources_of(&op.filter, op.limit).await;
365        info!("{} return sources from context, op_id={}, sources={:?}, update_at={}", self, op.op_id, result.0, result.1);
366        let next_op = {
367            let mut state = self.0.state.write().unwrap();
368            match &mut *state {
369                StateImpl::Downloading(downloading) => downloading.on_query_finished(self.clone(), &op, result),
370                _ => SessionOp::None
371            }
372        };
373        info!("{} will exec op after queried source, query_id={}, next_op={:?}", self, op.op_id, next_op);
374        self.on_session_op(next_op)
375    }
376
377    async fn start_session(&self, op: StartSessionOp) {
378        info!("{} will start session, op_id={}", self, op.op_id);
379
380        let stack = Stack::from(&self.0.stack);
381        let channel = stack.ndn().channel_manager().create_channel(&op.source.target).unwrap();   
382
383        let mut source: DownloadSource<DeviceId> = op.source.into();
384        source.codec_desc = match &source.codec_desc {
385            ChunkCodecDesc::Unknown => ChunkCodecDesc::Stream(None, None, None).fill_values(self.chunk()), 
386            ChunkCodecDesc::Stream(..) => source.codec_desc.fill_values(self.chunk()), 
387            _ => unimplemented!()
388        };
389
390        let session = channel.download( 
391            self.chunk().clone(), 
392            source.clone(), 
393            self.cache().stream().clone(), 
394            Some(self.owner().context().referer().to_owned()), 
395            self.owner().abs_group_path().clone()).or_else(|err| {
396                Ok::<DownloadSession, ()>(DownloadSession::error(self.chunk().clone(), None, source, None, None, err))
397            }).unwrap();
398
399        let start = {
400            let mut state = self.0.state.write().unwrap();
401            match &mut *state {
402                StateImpl::Downloading(downloading) => downloading.on_session_created(op.op_id, session.clone()), 
403                _ => false
404            }
405        };
406
407        if start {
408            info!("{} will start session, op_id={}, session={}", self, op.op_id, session);
409            session.start();
410        } else {
411            session.cancel_by_error(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled"));
412        }
413        self.owner().context().on_new_session(self.owner(), &session, op.update_at);
414    }
415
416    async fn sync_finished(&self) {
417        if self.cache().wait_exists(0..self.cache().chunk().len(), || self.owner().wait_user_canceled()).await.is_ok() {
418            info!("{} finished", self);
419            let state = &mut *self.0.state.write().unwrap();
420            *state = StateImpl::Finished;
421        }
422    }
423
424    async fn finished(&self) -> bool {
425        if let StateImpl::Finished = &*self.0.state.read().unwrap() {
426            true
427        } else {
428            false
429        }
430    }
431
432    pub fn owner(&self) -> &dyn LeafDownloadTask {
433        self.0.task.as_ref()
434    }
435
436    pub fn cache(&self) -> &ChunkCache {
437        &self.0.cache
438    }
439
440    pub fn chunk(&self) -> &ChunkId {
441        self.cache().chunk()
442    }
443
444    pub fn calc_speed(&self, when: Timestamp) -> u32 {
445        match &*self.0.state.read().unwrap() {
446            StateImpl::Downloading(downloading) => downloading.trying().map(|s| s.calc_speed(when)).unwrap_or_default(), 
447            _ => 0
448        }
449    } 
450
451    pub fn cur_speed(&self) -> u32 {
452        match &*self.0.state.read().unwrap() {
453            StateImpl::Downloading(downloading) => downloading.trying().map(|s| s.cur_speed()).unwrap_or_default(),
454            _ => 0
455        }
456    }
457
458    pub fn history_speed(&self) -> u32 {
459        match &*self.0.state.read().unwrap() {
460            StateImpl::Downloading(downloading) => downloading.trying().map(|s| s.history_speed()).unwrap_or_default(), 
461            _ => 0
462        }
463    }
464
465    pub fn on_drain(&self, _: u32) -> u32 {
466        let update_at = task::block_on(self.owner().context().update_at());
467        let (speed, op) = {
468            let mut state = self.0.state.write().unwrap();
469        
470            match &mut *state{
471                StateImpl::Downloading(downloading) => {
472                    let speed = downloading.trying().map(|s| s.cur_speed()).unwrap_or_default();
473                    let op = downloading.check_context(update_at);
474                    (speed, op)
475                }
476                _ => (0, SessionOp::None)
477            }
478        };
479        self.on_session_op(op);
480        speed
481    }
482}