cyfs_bdt/utils/ndn/
single_source.rs

1use std::{
2    sync::{Arc, RwLock},
3    collections::LinkedList
4};
5use futures::future::{AbortRegistration};
6use cyfs_base::*;
7use crate::{
8    types::*, 
9    ndn::{*, channel::{DownloadSession, DownloadSessionState}}, 
10    stack::{Stack}, 
11};
12
13enum WaitSession {
14    None(StateWaiter), 
15    Some(DownloadSession)
16}
17
18struct ContextImpl {
19    referer: String, 
20    create_at: Timestamp, 
21    source: DownloadSource<DeviceDesc>, 
22    session: RwLock<WaitSession>
23}
24
25#[derive(Clone)]
26pub struct SingleSourceContext(Arc<ContextImpl>);
27
28impl SingleSourceContext {
29    pub fn ptr_eq(&self, other: &Self) -> bool {
30        Arc::ptr_eq(&self.0, &other.0)
31    }
32
33    pub fn source(&self) -> &DownloadSource<DeviceDesc> {
34        &self.0.source
35    }
36
37    pub fn from_desc(referer: String, remote: DeviceDesc) -> Self {
38        Self(Arc::new(ContextImpl {
39            create_at: bucky_time_now(), 
40            referer, 
41            source: DownloadSource {
42                target: remote, 
43                codec_desc: ChunkCodecDesc::Stream(None, None, None), 
44            }, 
45            session: RwLock::new(WaitSession::None(StateWaiter::new()))
46        }))
47    }
48
49    pub async fn from_id(stack: &Stack, referer: String, remote: DeviceId) -> BuckyResult<Self> {
50        let device = stack.device_cache().get(&remote).await
51                .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "device desc not found"))?;
52        Ok(Self(Arc::new(ContextImpl {
53            create_at: bucky_time_now(), 
54            referer, 
55            source: DownloadSource {
56                target: device.desc().clone(), 
57                codec_desc: ChunkCodecDesc::Stream(None, None, None), 
58            },
59            session: RwLock::new(WaitSession::None(StateWaiter::new()))
60        })))
61    }
62
63    pub async fn wait_session(&self, abort: impl futures::Future<Output = BuckyError>) -> BuckyResult<DownloadSession> {
64        enum NextStep {
65            Wait(AbortRegistration), 
66            Some(DownloadSession)
67        }
68
69        let next = {
70            let mut session = self.0.session.write().unwrap();
71            match &mut *session {
72                WaitSession::None(waiter) => NextStep::Wait(waiter.new_waiter()), 
73                WaitSession::Some(session) => NextStep::Some(session.clone())
74            }
75        };
76
77        match next {
78            NextStep::Some(session) => Ok(session),
79            NextStep::Wait(waiter) => StateWaiter::abort_wait(abort, waiter, || {
80                let session = self.0.session.read().unwrap();
81                match & *session {
82                    WaitSession::Some(session) => session.clone(),
83                    _ => unreachable!()
84                }
85            }).await
86        }
87      
88    }
89}
90
91#[async_trait::async_trait]
92impl DownloadContext for SingleSourceContext {
93    fn clone_as_context(&self) -> Box<dyn DownloadContext> {
94        Box::new(self.clone())
95    }
96
97    fn is_mergable(&self) -> bool {
98        false
99    }
100
101    fn referer(&self) -> &str {
102        self.0.referer.as_str()
103    }
104
105    async fn update_at(&self) -> Timestamp {
106        self.0.create_at
107    }
108
109    async fn sources_of(&self, filter: &DownloadSourceFilter, _limit: usize) -> (LinkedList<DownloadSource<DeviceDesc>>, Timestamp) {
110        let mut result = LinkedList::new();
111        if filter.check(self.source()) {
112            result.push_back(DownloadSource {
113                target: self.source().target.clone(), 
114                codec_desc: self.source().codec_desc.clone(), 
115            });
116        } 
117        (result, self.0.create_at)
118    }
119
120    fn on_new_session(&self, _task: &dyn LeafDownloadTask, new_session: &DownloadSession, _update_at: Timestamp) {
121        let waiter = {
122            let mut session = self.0.session.write().unwrap();
123            match &mut *session {
124                WaitSession::None(waiter) => {
125                    let waiter = waiter.transfer();
126                    *session = WaitSession::Some(new_session.clone());
127                    waiter
128                } 
129                WaitSession::Some(_) => unreachable!()
130            }
131        };
132       
133        waiter.wake();
134    }
135
136    fn on_drain(
137        &self, 
138        task: &dyn LeafDownloadTask, 
139        _update_at: Timestamp) {
140        let session = {
141            let session = self.0.session.read().unwrap();
142            match &*session {
143                WaitSession::Some(session) => Some(session.clone()), 
144                _ => None
145            }
146        };
147        
148        if let Some(session) = session {
149            if let DownloadSessionState::Canceled(err) = session.state() {
150                let _ = task.cancel_by_error(err);
151            }
152        }
153    }
154}
155