cyfs_bdt/ndn/download/
common.rs

1use std::{
2    collections::{LinkedList}, 
3    io::SeekFrom, 
4    ops::Range
5};
6use async_std::{
7    pin::Pin, 
8    task::{Context, Poll},
9    task
10};
11
12use cyfs_base::*;
13use crate::{
14    types::*
15};
16use super::super::{
17    types::*, 
18    chunk::*,
19    channel::{DownloadSession, protocol::v0::*}
20};
21
22
23#[derive(Clone, Debug)]
24pub struct DownloadSourceFilter {
25    pub exclude_target: Option<Vec<DeviceId>>, 
26    pub include_target: Option<Vec<DeviceId>>, 
27    pub include_codec: Option<Vec<ChunkCodecDesc>>, 
28} 
29
30impl Default for DownloadSourceFilter {
31    fn default() -> Self {
32        Self {
33            exclude_target: None, 
34            include_target: None, 
35            include_codec: Some(vec![ChunkCodecDesc::Unknown])
36        }
37    } 
38}
39
40impl DownloadSourceFilter {
41    pub fn fill_values(&mut self, chunk: &ChunkId) {
42        self.include_codec = self.include_codec.as_ref().map(|include| include.iter().map(|codec| codec.fill_values(chunk)).collect());
43    }
44
45    pub fn check(&self, source: &DownloadSource<DeviceDesc>) -> bool {
46        if let Some(exclude) = self.exclude_target.as_ref() {
47            for target in exclude {
48                if source.target.device_id().eq(target) {
49                    return false;
50                }
51            }
52        }
53
54        if let Some(include_target) = self.include_target.as_ref() {
55            let target_id = source.target.device_id();
56            if include_target.iter().any(|include| target_id.eq(include)) {
57                if let Some(include) = self.include_codec.as_ref() {
58                    for codec in include {
59                        if source.codec_desc.support_desc(codec) {
60                            return true;
61                        }
62                    }
63                } else {
64                    return true;
65                }
66            }
67            false
68        } else if let Some(include) = self.include_codec.as_ref() {
69            for codec in include {
70                if source.codec_desc.support_desc(codec) {
71                    return true;
72                }
73            }
74            false
75        } else {
76            false
77        }
78    }
79}
80
81#[async_trait::async_trait]
82pub trait DownloadContext: Send + Sync {
83    fn is_mergable(&self) -> bool {
84        true
85    }
86    fn clone_as_context(&self) -> Box<dyn DownloadContext>;
87    fn referer(&self) -> &str;
88    // update time when context's sources changed
89    async fn update_at(&self) -> Timestamp;
90    async fn sources_of(
91        &self, 
92        filter: &DownloadSourceFilter, 
93        limit: usize
94    ) -> (
95        LinkedList<DownloadSource<DeviceDesc>>, 
96        /*context's current update_at*/
97        Timestamp);
98    fn on_new_session(
99        &self, 
100        _task: &dyn LeafDownloadTask, 
101        _session: &DownloadSession, 
102        /*session created based on context's update_at*/
103        _update_at: Timestamp
104    ) {}
105    // called when tried all source in context but task didn't finish  
106    fn on_drain(
107        &self, 
108        _task: &dyn LeafDownloadTask, 
109        /*event trigered based on context's update_at*/
110        _update_at: Timestamp) {}
111}
112
113#[derive(Clone, Debug)]
114pub struct DownloadSource<T: std::fmt::Debug + Clone + Send + Sync> {
115    pub target: T, 
116    pub codec_desc: ChunkCodecDesc, 
117}
118
119impl Into<DownloadSource<DeviceId>> for DownloadSource<DeviceDesc> {
120    fn into(self) -> DownloadSource<DeviceId> {
121        DownloadSource {
122            target: self.target.device_id(), 
123            codec_desc: self.codec_desc, 
124        }
125    }
126}
127
128
129#[derive(Clone, Copy)]
130pub enum DownloadTaskPriority {
131    Backgroud, 
132    Normal, 
133    Realtime(u32/*min speed*/),
134}
135
136impl Default for DownloadTaskPriority {
137    fn default() -> Self {
138        Self::Normal
139    }
140}
141
142
143
144
145
146#[async_trait::async_trait]
147pub trait DownloadTask: NdnTask {
148    fn clone_as_download_task(&self) -> Box<dyn DownloadTask>;
149
150    async fn wait_user_canceled(&self) -> BuckyError;
151
152    fn add_task(&self, _path: Option<String>, _sub: Box<dyn DownloadTask>) -> BuckyResult<()> {
153        Err(BuckyError::new(BuckyErrorCode::NotSupport, "no implement"))
154    }
155    fn sub_task(&self, _path: &str) -> Option<Box<dyn DownloadTask>> {
156        None
157    }
158    fn on_post_add_to_root(&self, _abs_path: String) {
159
160    }
161
162    fn calc_speed(&self, when: Timestamp) -> u32;
163}
164
165
166#[async_trait::async_trait]
167pub trait LeafDownloadTask: DownloadTask + std::fmt::Display {
168    fn priority(&self) -> DownloadTaskPriority {
169        DownloadTaskPriority::default()
170    }
171    fn clone_as_leaf_task(&self) -> Box<dyn LeafDownloadTask>;
172    fn abs_group_path(&self) -> Option<String>;
173    fn context(&self) -> &dyn DownloadContext;
174    fn finish(&self);
175}
176
177
178pub struct DownloadTaskReader {
179    cache: ChunkCache, 
180    offset: usize,
181    task: Box<dyn LeafDownloadTask>
182}
183
184
185pub trait DownloadTaskSplitRead: std::io::Seek {
186    fn poll_split_read(
187        self: Pin<&mut Self>,
188        cx: &mut Context<'_>,
189        buffer: &mut [u8],
190    ) -> Poll<std::io::Result<Option<(ChunkCache, Range<usize>)>>>;
191}
192
193impl std::fmt::Display for DownloadTaskReader {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        write!(f, "DownloadTaskReader{{chunk:{}}}", self.cache.chunk())
196    }
197}
198
199impl DownloadTaskReader {
200    pub fn new(cache: ChunkCache, task: Box<dyn LeafDownloadTask>) -> Self {
201        Self {
202            cache, 
203            offset: 0, 
204            task
205        }
206    }
207
208    pub fn task(&self) -> &dyn LeafDownloadTask {
209        self.task.as_ref()
210    }
211
212    pub fn offset(&self) -> usize {
213        self.offset
214    }
215
216    pub fn cache(&self) -> &ChunkCache {
217        &self.cache
218    }
219}
220
221impl DownloadTaskSplitRead for DownloadTaskReader {
222    fn poll_split_read(
223        self: Pin<&mut Self>,
224        cx: &mut Context<'_>,
225        buffer: &mut [u8],
226    ) -> Poll<std::io::Result<Option<(ChunkCache, Range<usize>)>>> {
227        let pined = self.get_mut();
228        trace!("{} split_read: {} offset: {}", pined, buffer.len(), pined.offset);
229        if let NdnTaskState::Error(err) = pined.task.state() {
230            trace!("{} split_read: {} offset: {} error: {}", pined, buffer.len(), pined.offset, err);
231            return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, BuckyError::new(err, ""))));
232        } 
233        if let Some(range) = pined.cache.exists(pined.offset..pined.offset + buffer.len()) {
234            trace!("{} split_read: {} offset: {} exists {:?}", pined, buffer.len(), pined.offset, range);
235            let (desc, mut offset) = PieceDesc::from_stream_offset(PieceData::max_payload(), range.start as u32);
236            let (mut index, len) = desc.unwrap_as_stream();
237            let mut read = 0;
238            let result = loop {
239                match pined.cache.stream().sync_try_read(
240                    &PieceDesc::Range(index, len), 
241                    offset as usize, 
242                    &mut buffer[read..]) {
243                    Ok(this_read) => {
244                        read += this_read;
245                        if this_read == 0 
246                            || read >= buffer.len() {
247                            pined.offset += read;
248                            break Ok(read);
249                        }
250                        index += 1;
251                        offset = 0;
252                    },
253                    Err(err) => {
254                        break Err(std::io::Error::new(std::io::ErrorKind::Other, err))
255                    }
256                }
257            };
258            Poll::Ready(result.map(|read| Some((pined.cache.clone(), range.start..range.start + read))))
259        } else {
260            let waker = cx.waker().clone();
261            let cache = pined.cache.clone();
262            let task = pined.task.clone_as_download_task();
263            let range = pined.offset..pined.offset + buffer.len();
264            task::spawn(async move {
265                let _ = cache.wait_exists(range, || task.wait_user_canceled()).await;
266                waker.wake();
267            });
268            Poll::Pending
269        }
270    }
271}
272
273impl std::io::Seek for DownloadTaskReader {
274    fn seek(
275        self: &mut Self,
276        pos: SeekFrom,
277    ) -> std::io::Result<u64> {
278        let len = self.cache.chunk().len();
279        let new_offset = match pos {
280            SeekFrom::Start(offset) => len.min(offset as usize), 
281            SeekFrom::Current(offset) => {
282                let offset = (self.offset as i64) + offset;
283                let offset = offset.max(0);
284                len.min(offset as usize)
285            },
286            SeekFrom::End(offset) => {
287                let offset = (len as i64) + offset;
288                let offset = offset.max(0);
289                len.min(offset as usize)
290            }
291        };
292        self.offset = new_offset;
293
294        Ok(new_offset as u64)   
295    }
296}
297
298impl async_std::io::Read for DownloadTaskReader {
299    fn poll_read(
300        self: Pin<&mut Self>,
301        cx: &mut Context<'_>,
302        buffer: &mut [u8],
303    ) -> Poll<std::io::Result<usize>> {
304        self.poll_split_read(cx, buffer).map(|result| result.map(|r| if let Some((_, r)) = r {
305            r.end - r.start
306        } else {
307            0
308        }))
309    }
310}