cyfs_bdt/utils/ndn/
download.rs

1use std::{
2    sync::{Arc, RwLock},
3    collections::LinkedList
4};
5use cyfs_base::*;
6use crate::{
7    types::*, 
8    ndn::{*}, 
9    stack::{Stack}, 
10};
11
12struct SampleContextSources {
13    update_at: Timestamp, 
14    sources: LinkedList<DownloadSource<DeviceDesc>>, 
15}
16
17struct SampleContextImpl {
18    referer: String, 
19    sources: RwLock<SampleContextSources>, 
20}
21
22#[derive(Clone)]
23pub struct SampleDownloadContext(Arc<SampleContextImpl>);
24
25impl Default for SampleDownloadContext {
26    fn default() -> Self {
27        Self::new("".to_owned())
28    }
29}
30
31impl SampleDownloadContext {
32    pub fn ptr_eq(&self, other: &Self) -> bool {
33        Arc::ptr_eq(&self.0, &other.0)
34    }
35
36    pub fn new(referer: String) -> Self {
37        Self(Arc::new(SampleContextImpl {
38            referer, 
39            sources: RwLock::new(SampleContextSources {
40                update_at: bucky_time_now(), 
41                sources: Default::default()
42            })
43        }))
44    }
45
46    pub fn desc_streams(referer: String, remotes: Vec<DeviceDesc>) -> Self {
47        let mut sources = LinkedList::new();
48        for remote in remotes {
49            sources.push_back(DownloadSource {
50                target: remote, 
51                codec_desc: ChunkCodecDesc::Stream(None, None, None), 
52            });
53        } 
54        Self(Arc::new(SampleContextImpl {
55            referer, 
56            sources: RwLock::new(SampleContextSources { update_at: bucky_time_now(),  sources})
57        }))
58    }
59
60    pub async fn id_streams(stack: &Stack, referer: String, remotes: &[DeviceId]) -> BuckyResult<Self> {
61        let mut sources = LinkedList::new();
62        for remote in remotes {
63            let device = stack.device_cache().get(&remote).await
64                .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "device desc not found"))?;
65            sources.push_back(DownloadSource {
66                target: device.desc().clone(), 
67                codec_desc: ChunkCodecDesc::Stream(None, None, None), 
68            });
69        } 
70        Ok(Self(Arc::new(SampleContextImpl {
71            referer, 
72            sources: RwLock::new(SampleContextSources{ update_at: bucky_time_now(), sources })
73        })))
74    }
75
76    pub fn add_source(&self, source: DownloadSource<DeviceDesc>) {
77        let mut sources = self.0.sources.write().unwrap();
78        sources.update_at = bucky_time_now();
79        sources.sources.push_back(source);
80    }
81}
82
83#[async_trait::async_trait]
84impl DownloadContext for SampleDownloadContext {
85    fn clone_as_context(&self) -> Box<dyn DownloadContext> {
86        Box::new(self.clone())
87    }
88
89    fn referer(&self) -> &str {
90        self.0.referer.as_str()
91    }
92
93    async fn update_at(&self) -> Timestamp {
94        self.0.sources.read().unwrap().update_at
95    }
96
97    async fn sources_of(&self, filter: &DownloadSourceFilter, limit: usize) -> (LinkedList<DownloadSource<DeviceDesc>>, Timestamp) {
98        let mut result = LinkedList::new();
99        let mut count = 0;
100        let sources = self.0.sources.read().unwrap();
101        for source in sources.sources.iter() {
102            if filter.check(source) {
103                result.push_back(DownloadSource {
104                    target: source.target.clone(), 
105                    codec_desc: source.codec_desc.clone(), 
106                });
107                count += 1;
108                if count >= limit {
109                    return (result, sources.update_at);
110                }
111            }
112        }
113        return (result, sources.update_at);
114    }
115}
116
117
118
119pub async fn download_chunk(
120    stack: &Stack, 
121    chunk: ChunkId, 
122    group: Option<String>, 
123    context: impl DownloadContext
124) -> BuckyResult<(String, ChunkTaskReader)> {
125    let (task, reader) = ChunkTask::reader(
126        stack.to_weak(), 
127        chunk, 
128        context.clone_as_context()
129    );
130    let path = stack.ndn().root_task().download().add_task(group.unwrap_or_default(), &task)?;
131    Ok((path, reader))
132}
133
134pub async fn download_chunk_list(
135    stack: &Stack, 
136    name: String, 
137    chunks: &Vec<ChunkId>, 
138    group: Option<String>, 
139    context: impl DownloadContext, 
140) -> BuckyResult<(String, ChunkListTaskReader)> {
141    let chunk_list = ChunkListDesc::from_chunks(chunks);
142   
143    let (task, reader) = ChunkListTask::reader(
144        stack.to_weak(), 
145        name, 
146        chunk_list, 
147        context.clone_as_context(), 
148    );
149    let path = stack.ndn().root_task().download().add_task(group.unwrap_or_default(), &task)?;
150    
151    Ok((path, reader))
152}
153
154
155pub async fn download_file(
156    stack: &Stack, 
157    file: File, 
158    group: Option<String>, 
159    context: impl DownloadContext
160) -> BuckyResult<(String, ChunkListTaskReader)> {
161    let chunk_list = ChunkListDesc::from_file(&file)?;
162    let (task, reader) = ChunkListTask::reader(
163        stack.to_weak(), 
164        file.desc().file_id().to_string(), 
165        chunk_list, 
166        context.clone_as_context()
167    );
168    let path = stack.ndn().root_task().download().add_task(group.unwrap_or_default(), &task)?;
169    Ok((path, reader))
170}
171