cyfs_bdt/utils/ndn/
download.rs1use std::{
2 sync::{Arc, RwLock},
3 collections::LinkedList
4};
5use cyfs_base::*;
6use crate::{
7 types::*,
8 ndn::{*},
9 stack::{Stack},
10};
11
12struct SampleContextSources {
13 update_at: Timestamp,
14 sources: LinkedList<DownloadSource<DeviceDesc>>,
15}
16
17struct SampleContextImpl {
18 referer: String,
19 sources: RwLock<SampleContextSources>,
20}
21
22#[derive(Clone)]
23pub struct SampleDownloadContext(Arc<SampleContextImpl>);
24
25impl Default for SampleDownloadContext {
26 fn default() -> Self {
27 Self::new("".to_owned())
28 }
29}
30
31impl SampleDownloadContext {
32 pub fn ptr_eq(&self, other: &Self) -> bool {
33 Arc::ptr_eq(&self.0, &other.0)
34 }
35
36 pub fn new(referer: String) -> Self {
37 Self(Arc::new(SampleContextImpl {
38 referer,
39 sources: RwLock::new(SampleContextSources {
40 update_at: bucky_time_now(),
41 sources: Default::default()
42 })
43 }))
44 }
45
46 pub fn desc_streams(referer: String, remotes: Vec<DeviceDesc>) -> Self {
47 let mut sources = LinkedList::new();
48 for remote in remotes {
49 sources.push_back(DownloadSource {
50 target: remote,
51 codec_desc: ChunkCodecDesc::Stream(None, None, None),
52 });
53 }
54 Self(Arc::new(SampleContextImpl {
55 referer,
56 sources: RwLock::new(SampleContextSources { update_at: bucky_time_now(), sources})
57 }))
58 }
59
60 pub async fn id_streams(stack: &Stack, referer: String, remotes: &[DeviceId]) -> BuckyResult<Self> {
61 let mut sources = LinkedList::new();
62 for remote in remotes {
63 let device = stack.device_cache().get(&remote).await
64 .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "device desc not found"))?;
65 sources.push_back(DownloadSource {
66 target: device.desc().clone(),
67 codec_desc: ChunkCodecDesc::Stream(None, None, None),
68 });
69 }
70 Ok(Self(Arc::new(SampleContextImpl {
71 referer,
72 sources: RwLock::new(SampleContextSources{ update_at: bucky_time_now(), sources })
73 })))
74 }
75
76 pub fn add_source(&self, source: DownloadSource<DeviceDesc>) {
77 let mut sources = self.0.sources.write().unwrap();
78 sources.update_at = bucky_time_now();
79 sources.sources.push_back(source);
80 }
81}
82
83#[async_trait::async_trait]
84impl DownloadContext for SampleDownloadContext {
85 fn clone_as_context(&self) -> Box<dyn DownloadContext> {
86 Box::new(self.clone())
87 }
88
89 fn referer(&self) -> &str {
90 self.0.referer.as_str()
91 }
92
93 async fn update_at(&self) -> Timestamp {
94 self.0.sources.read().unwrap().update_at
95 }
96
97 async fn sources_of(&self, filter: &DownloadSourceFilter, limit: usize) -> (LinkedList<DownloadSource<DeviceDesc>>, Timestamp) {
98 let mut result = LinkedList::new();
99 let mut count = 0;
100 let sources = self.0.sources.read().unwrap();
101 for source in sources.sources.iter() {
102 if filter.check(source) {
103 result.push_back(DownloadSource {
104 target: source.target.clone(),
105 codec_desc: source.codec_desc.clone(),
106 });
107 count += 1;
108 if count >= limit {
109 return (result, sources.update_at);
110 }
111 }
112 }
113 return (result, sources.update_at);
114 }
115}
116
117
118
119pub async fn download_chunk(
120 stack: &Stack,
121 chunk: ChunkId,
122 group: Option<String>,
123 context: impl DownloadContext
124) -> BuckyResult<(String, ChunkTaskReader)> {
125 let (task, reader) = ChunkTask::reader(
126 stack.to_weak(),
127 chunk,
128 context.clone_as_context()
129 );
130 let path = stack.ndn().root_task().download().add_task(group.unwrap_or_default(), &task)?;
131 Ok((path, reader))
132}
133
134pub async fn download_chunk_list(
135 stack: &Stack,
136 name: String,
137 chunks: &Vec<ChunkId>,
138 group: Option<String>,
139 context: impl DownloadContext,
140) -> BuckyResult<(String, ChunkListTaskReader)> {
141 let chunk_list = ChunkListDesc::from_chunks(chunks);
142
143 let (task, reader) = ChunkListTask::reader(
144 stack.to_weak(),
145 name,
146 chunk_list,
147 context.clone_as_context(),
148 );
149 let path = stack.ndn().root_task().download().add_task(group.unwrap_or_default(), &task)?;
150
151 Ok((path, reader))
152}
153
154
155pub async fn download_file(
156 stack: &Stack,
157 file: File,
158 group: Option<String>,
159 context: impl DownloadContext
160) -> BuckyResult<(String, ChunkListTaskReader)> {
161 let chunk_list = ChunkListDesc::from_file(&file)?;
162 let (task, reader) = ChunkListTask::reader(
163 stack.to_weak(),
164 file.desc().file_id().to_string(),
165 chunk_list,
166 context.clone_as_context()
167 );
168 let path = stack.ndn().root_task().download().add_task(group.unwrap_or_default(), &task)?;
169 Ok((path, reader))
170}
171