1use std::{
2 collections::{LinkedList},
3 io::SeekFrom,
4 ops::Range
5};
6use async_std::{
7 pin::Pin,
8 task::{Context, Poll},
9 task
10};
11
12use cyfs_base::*;
13use crate::{
14 types::*
15};
16use super::super::{
17 types::*,
18 chunk::*,
19 channel::{DownloadSession, protocol::v0::*}
20};
21
22
23#[derive(Clone, Debug)]
24pub struct DownloadSourceFilter {
25 pub exclude_target: Option<Vec<DeviceId>>,
26 pub include_target: Option<Vec<DeviceId>>,
27 pub include_codec: Option<Vec<ChunkCodecDesc>>,
28}
29
30impl Default for DownloadSourceFilter {
31 fn default() -> Self {
32 Self {
33 exclude_target: None,
34 include_target: None,
35 include_codec: Some(vec![ChunkCodecDesc::Unknown])
36 }
37 }
38}
39
40impl DownloadSourceFilter {
41 pub fn fill_values(&mut self, chunk: &ChunkId) {
42 self.include_codec = self.include_codec.as_ref().map(|include| include.iter().map(|codec| codec.fill_values(chunk)).collect());
43 }
44
45 pub fn check(&self, source: &DownloadSource<DeviceDesc>) -> bool {
46 if let Some(exclude) = self.exclude_target.as_ref() {
47 for target in exclude {
48 if source.target.device_id().eq(target) {
49 return false;
50 }
51 }
52 }
53
54 if let Some(include_target) = self.include_target.as_ref() {
55 let target_id = source.target.device_id();
56 if include_target.iter().any(|include| target_id.eq(include)) {
57 if let Some(include) = self.include_codec.as_ref() {
58 for codec in include {
59 if source.codec_desc.support_desc(codec) {
60 return true;
61 }
62 }
63 } else {
64 return true;
65 }
66 }
67 false
68 } else if let Some(include) = self.include_codec.as_ref() {
69 for codec in include {
70 if source.codec_desc.support_desc(codec) {
71 return true;
72 }
73 }
74 false
75 } else {
76 false
77 }
78 }
79}
80
81#[async_trait::async_trait]
82pub trait DownloadContext: Send + Sync {
83 fn is_mergable(&self) -> bool {
84 true
85 }
86 fn clone_as_context(&self) -> Box<dyn DownloadContext>;
87 fn referer(&self) -> &str;
88 async fn update_at(&self) -> Timestamp;
90 async fn sources_of(
91 &self,
92 filter: &DownloadSourceFilter,
93 limit: usize
94 ) -> (
95 LinkedList<DownloadSource<DeviceDesc>>,
96 Timestamp);
98 fn on_new_session(
99 &self,
100 _task: &dyn LeafDownloadTask,
101 _session: &DownloadSession,
102 _update_at: Timestamp
104 ) {}
105 fn on_drain(
107 &self,
108 _task: &dyn LeafDownloadTask,
109 _update_at: Timestamp) {}
111}
112
113#[derive(Clone, Debug)]
114pub struct DownloadSource<T: std::fmt::Debug + Clone + Send + Sync> {
115 pub target: T,
116 pub codec_desc: ChunkCodecDesc,
117}
118
119impl Into<DownloadSource<DeviceId>> for DownloadSource<DeviceDesc> {
120 fn into(self) -> DownloadSource<DeviceId> {
121 DownloadSource {
122 target: self.target.device_id(),
123 codec_desc: self.codec_desc,
124 }
125 }
126}
127
128
129#[derive(Clone, Copy)]
130pub enum DownloadTaskPriority {
131 Backgroud,
132 Normal,
133 Realtime(u32),
134}
135
136impl Default for DownloadTaskPriority {
137 fn default() -> Self {
138 Self::Normal
139 }
140}
141
142
143
144
145
146#[async_trait::async_trait]
147pub trait DownloadTask: NdnTask {
148 fn clone_as_download_task(&self) -> Box<dyn DownloadTask>;
149
150 async fn wait_user_canceled(&self) -> BuckyError;
151
152 fn add_task(&self, _path: Option<String>, _sub: Box<dyn DownloadTask>) -> BuckyResult<()> {
153 Err(BuckyError::new(BuckyErrorCode::NotSupport, "no implement"))
154 }
155 fn sub_task(&self, _path: &str) -> Option<Box<dyn DownloadTask>> {
156 None
157 }
158 fn on_post_add_to_root(&self, _abs_path: String) {
159
160 }
161
162 fn calc_speed(&self, when: Timestamp) -> u32;
163}
164
165
166#[async_trait::async_trait]
167pub trait LeafDownloadTask: DownloadTask + std::fmt::Display {
168 fn priority(&self) -> DownloadTaskPriority {
169 DownloadTaskPriority::default()
170 }
171 fn clone_as_leaf_task(&self) -> Box<dyn LeafDownloadTask>;
172 fn abs_group_path(&self) -> Option<String>;
173 fn context(&self) -> &dyn DownloadContext;
174 fn finish(&self);
175}
176
177
178pub struct DownloadTaskReader {
179 cache: ChunkCache,
180 offset: usize,
181 task: Box<dyn LeafDownloadTask>
182}
183
184
185pub trait DownloadTaskSplitRead: std::io::Seek {
186 fn poll_split_read(
187 self: Pin<&mut Self>,
188 cx: &mut Context<'_>,
189 buffer: &mut [u8],
190 ) -> Poll<std::io::Result<Option<(ChunkCache, Range<usize>)>>>;
191}
192
193impl std::fmt::Display for DownloadTaskReader {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 write!(f, "DownloadTaskReader{{chunk:{}}}", self.cache.chunk())
196 }
197}
198
199impl DownloadTaskReader {
200 pub fn new(cache: ChunkCache, task: Box<dyn LeafDownloadTask>) -> Self {
201 Self {
202 cache,
203 offset: 0,
204 task
205 }
206 }
207
208 pub fn task(&self) -> &dyn LeafDownloadTask {
209 self.task.as_ref()
210 }
211
212 pub fn offset(&self) -> usize {
213 self.offset
214 }
215
216 pub fn cache(&self) -> &ChunkCache {
217 &self.cache
218 }
219}
220
221impl DownloadTaskSplitRead for DownloadTaskReader {
222 fn poll_split_read(
223 self: Pin<&mut Self>,
224 cx: &mut Context<'_>,
225 buffer: &mut [u8],
226 ) -> Poll<std::io::Result<Option<(ChunkCache, Range<usize>)>>> {
227 let pined = self.get_mut();
228 trace!("{} split_read: {} offset: {}", pined, buffer.len(), pined.offset);
229 if let NdnTaskState::Error(err) = pined.task.state() {
230 trace!("{} split_read: {} offset: {} error: {}", pined, buffer.len(), pined.offset, err);
231 return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, BuckyError::new(err, ""))));
232 }
233 if let Some(range) = pined.cache.exists(pined.offset..pined.offset + buffer.len()) {
234 trace!("{} split_read: {} offset: {} exists {:?}", pined, buffer.len(), pined.offset, range);
235 let (desc, mut offset) = PieceDesc::from_stream_offset(PieceData::max_payload(), range.start as u32);
236 let (mut index, len) = desc.unwrap_as_stream();
237 let mut read = 0;
238 let result = loop {
239 match pined.cache.stream().sync_try_read(
240 &PieceDesc::Range(index, len),
241 offset as usize,
242 &mut buffer[read..]) {
243 Ok(this_read) => {
244 read += this_read;
245 if this_read == 0
246 || read >= buffer.len() {
247 pined.offset += read;
248 break Ok(read);
249 }
250 index += 1;
251 offset = 0;
252 },
253 Err(err) => {
254 break Err(std::io::Error::new(std::io::ErrorKind::Other, err))
255 }
256 }
257 };
258 Poll::Ready(result.map(|read| Some((pined.cache.clone(), range.start..range.start + read))))
259 } else {
260 let waker = cx.waker().clone();
261 let cache = pined.cache.clone();
262 let task = pined.task.clone_as_download_task();
263 let range = pined.offset..pined.offset + buffer.len();
264 task::spawn(async move {
265 let _ = cache.wait_exists(range, || task.wait_user_canceled()).await;
266 waker.wake();
267 });
268 Poll::Pending
269 }
270 }
271}
272
273impl std::io::Seek for DownloadTaskReader {
274 fn seek(
275 self: &mut Self,
276 pos: SeekFrom,
277 ) -> std::io::Result<u64> {
278 let len = self.cache.chunk().len();
279 let new_offset = match pos {
280 SeekFrom::Start(offset) => len.min(offset as usize),
281 SeekFrom::Current(offset) => {
282 let offset = (self.offset as i64) + offset;
283 let offset = offset.max(0);
284 len.min(offset as usize)
285 },
286 SeekFrom::End(offset) => {
287 let offset = (len as i64) + offset;
288 let offset = offset.max(0);
289 len.min(offset as usize)
290 }
291 };
292 self.offset = new_offset;
293
294 Ok(new_offset as u64)
295 }
296}
297
298impl async_std::io::Read for DownloadTaskReader {
299 fn poll_read(
300 self: Pin<&mut Self>,
301 cx: &mut Context<'_>,
302 buffer: &mut [u8],
303 ) -> Poll<std::io::Result<usize>> {
304 self.poll_split_read(cx, buffer).map(|result| result.map(|r| if let Some((_, r)) = r {
305 r.end - r.start
306 } else {
307 0
308 }))
309 }
310}