cyfs_bdt/utils/ndn/
single_source.rs1use std::{
2 sync::{Arc, RwLock},
3 collections::LinkedList
4};
5use futures::future::{AbortRegistration};
6use cyfs_base::*;
7use crate::{
8 types::*,
9 ndn::{*, channel::{DownloadSession, DownloadSessionState}},
10 stack::{Stack},
11};
12
13enum WaitSession {
14 None(StateWaiter),
15 Some(DownloadSession)
16}
17
18struct ContextImpl {
19 referer: String,
20 create_at: Timestamp,
21 source: DownloadSource<DeviceDesc>,
22 session: RwLock<WaitSession>
23}
24
25#[derive(Clone)]
26pub struct SingleSourceContext(Arc<ContextImpl>);
27
28impl SingleSourceContext {
29 pub fn ptr_eq(&self, other: &Self) -> bool {
30 Arc::ptr_eq(&self.0, &other.0)
31 }
32
33 pub fn source(&self) -> &DownloadSource<DeviceDesc> {
34 &self.0.source
35 }
36
37 pub fn from_desc(referer: String, remote: DeviceDesc) -> Self {
38 Self(Arc::new(ContextImpl {
39 create_at: bucky_time_now(),
40 referer,
41 source: DownloadSource {
42 target: remote,
43 codec_desc: ChunkCodecDesc::Stream(None, None, None),
44 },
45 session: RwLock::new(WaitSession::None(StateWaiter::new()))
46 }))
47 }
48
49 pub async fn from_id(stack: &Stack, referer: String, remote: DeviceId) -> BuckyResult<Self> {
50 let device = stack.device_cache().get(&remote).await
51 .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "device desc not found"))?;
52 Ok(Self(Arc::new(ContextImpl {
53 create_at: bucky_time_now(),
54 referer,
55 source: DownloadSource {
56 target: device.desc().clone(),
57 codec_desc: ChunkCodecDesc::Stream(None, None, None),
58 },
59 session: RwLock::new(WaitSession::None(StateWaiter::new()))
60 })))
61 }
62
63 pub async fn wait_session(&self, abort: impl futures::Future<Output = BuckyError>) -> BuckyResult<DownloadSession> {
64 enum NextStep {
65 Wait(AbortRegistration),
66 Some(DownloadSession)
67 }
68
69 let next = {
70 let mut session = self.0.session.write().unwrap();
71 match &mut *session {
72 WaitSession::None(waiter) => NextStep::Wait(waiter.new_waiter()),
73 WaitSession::Some(session) => NextStep::Some(session.clone())
74 }
75 };
76
77 match next {
78 NextStep::Some(session) => Ok(session),
79 NextStep::Wait(waiter) => StateWaiter::abort_wait(abort, waiter, || {
80 let session = self.0.session.read().unwrap();
81 match & *session {
82 WaitSession::Some(session) => session.clone(),
83 _ => unreachable!()
84 }
85 }).await
86 }
87
88 }
89}
90
91#[async_trait::async_trait]
92impl DownloadContext for SingleSourceContext {
93 fn clone_as_context(&self) -> Box<dyn DownloadContext> {
94 Box::new(self.clone())
95 }
96
97 fn is_mergable(&self) -> bool {
98 false
99 }
100
101 fn referer(&self) -> &str {
102 self.0.referer.as_str()
103 }
104
105 async fn update_at(&self) -> Timestamp {
106 self.0.create_at
107 }
108
109 async fn sources_of(&self, filter: &DownloadSourceFilter, _limit: usize) -> (LinkedList<DownloadSource<DeviceDesc>>, Timestamp) {
110 let mut result = LinkedList::new();
111 if filter.check(self.source()) {
112 result.push_back(DownloadSource {
113 target: self.source().target.clone(),
114 codec_desc: self.source().codec_desc.clone(),
115 });
116 }
117 (result, self.0.create_at)
118 }
119
120 fn on_new_session(&self, _task: &dyn LeafDownloadTask, new_session: &DownloadSession, _update_at: Timestamp) {
121 let waiter = {
122 let mut session = self.0.session.write().unwrap();
123 match &mut *session {
124 WaitSession::None(waiter) => {
125 let waiter = waiter.transfer();
126 *session = WaitSession::Some(new_session.clone());
127 waiter
128 }
129 WaitSession::Some(_) => unreachable!()
130 }
131 };
132
133 waiter.wake();
134 }
135
136 fn on_drain(
137 &self,
138 task: &dyn LeafDownloadTask,
139 _update_at: Timestamp) {
140 let session = {
141 let session = self.0.session.read().unwrap();
142 match &*session {
143 WaitSession::Some(session) => Some(session.clone()),
144 _ => None
145 }
146 };
147
148 if let Some(session) = session {
149 if let DownloadSessionState::Canceled(err) = session.state() {
150 let _ = task.cancel_by_error(err);
151 }
152 }
153 }
154}
155