1use std::{
2 sync::{RwLock},
3 io::SeekFrom,
4 ops::Range
5};
6use async_std::{
7 sync::Arc,
8 pin::Pin,
9 task::{Context, Poll},
10};
11
12use cyfs_base::*;
13use crate::{
14 types::*,
15 stack::{WeakStack, Stack},
16};
17use super::super::{
18 chunk::*,
19 types::*
20};
21use super::{
22 common::*,
23};
24
25
26struct DownloadingState {
27 downloaded: u64,
28 cur_speed: ProgressCounter,
29 cur_chunk: (ChunkDownloader, usize),
30 history_speed: HistorySpeed,
31}
32
33enum ControlStateImpl {
34 Normal(StateWaiter),
35 Canceled,
36}
37
38enum TaskStateImpl {
39 Pending,
40 Downloading(DownloadingState),
41 Error(BuckyError),
42 Finished(u64)
43}
44
45struct StateImpl {
46 abs_path: Option<String>,
47 control_state: ControlStateImpl,
48 task_state: TaskStateImpl,
49}
50
51struct TaskImpl {
52 stack: WeakStack,
53 name: String,
54 chunk_list: ChunkListDesc,
55 context: Box<dyn DownloadContext>,
56 state: RwLock<StateImpl>,
57}
58
59#[derive(Clone)]
60pub struct ChunkListTask(Arc<TaskImpl>);
61
62impl std::fmt::Display for ChunkListTask {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 write!(f, "ChunkListTask::{{name:{}}}", self.name())
65 }
66}
67
68impl ChunkListTask {
69 pub fn new(
70 stack: WeakStack,
71 name: String,
72 chunk_list: ChunkListDesc,
73 context: Box<dyn DownloadContext>,
74 ) -> Self {
75 Self(Arc::new(TaskImpl {
76 stack,
77 name,
78 context,
79 state: RwLock::new(StateImpl {
80 abs_path: None,
81 task_state: if chunk_list.total_len() > 0 {
82 TaskStateImpl::Pending
83 } else {
84 TaskStateImpl::Finished(0)
85 },
86 control_state: ControlStateImpl::Normal(StateWaiter::new()),
87 }),
88 chunk_list,
89 }))
90 }
91
92 pub fn name(&self) -> &str {
93 self.0.name.as_str()
94 }
95
96 pub fn chunk_list(&self) -> &ChunkListDesc {
97 &self.0.chunk_list
98 }
99
100 fn create_cache(&self, index: usize) -> BuckyResult<ChunkCache> {
101 let stack = Stack::from(&self.0.stack);
102 let chunk = &self.chunk_list().chunks()[index];
103
104 let mut state = self.0.state.write().unwrap();
105 match &mut state.task_state {
106 TaskStateImpl::Pending => {
107 debug!("{} create cache from pending, index={}, chunk={}", self, index, chunk);
108 let downloader = stack.ndn().chunk_manager().create_downloader(chunk, self.clone_as_leaf_task());
109 state.task_state = TaskStateImpl::Downloading(DownloadingState {
110 downloaded: 0,
111 cur_speed: ProgressCounter::new(0),
112 cur_chunk: (downloader.clone(), index),
113 history_speed: HistorySpeed::new(0, stack.config().ndn.channel.history_speed.clone()),
114 });
115 Ok(downloader.cache().clone())
116 },
117 TaskStateImpl::Downloading(downloading) => {
118 let (downloader, cur_index) = &downloading.cur_chunk;
119 if *cur_index != index {
120 debug!("{} create new cache, old_index={}, old_chunk={}, index={}, chunk={}", self, *cur_index, downloader.cache().chunk(), index, chunk);
121 downloading.downloaded += downloader.cache().stream().len() as u64;
122 downloading.cur_chunk = (stack.ndn().chunk_manager().create_downloader(chunk, self.clone_as_leaf_task()), index);
123 }
124 Ok(downloading.cur_chunk.0.cache().clone())
125 },
126 TaskStateImpl::Finished(_) => unreachable!(),
127 TaskStateImpl::Error(err) => Err(err.clone())
128 }
129 }
130}
131
132#[async_trait::async_trait]
133impl LeafDownloadTask for ChunkListTask {
134 fn clone_as_leaf_task(&self) -> Box<dyn LeafDownloadTask> {
135 Box::new(self.clone())
136 }
137
138 fn abs_group_path(&self) -> Option<String> {
139 self.0.state.read().unwrap().abs_path.clone()
140 }
141
142 fn context(&self) -> &dyn DownloadContext {
143 self.0.context.as_ref()
144 }
145
146 fn finish(&self) {
147 let mut state = self.0.state.write().unwrap();
148
149 match &mut state.task_state {
150 TaskStateImpl::Downloading(downloading) => {
151 info!("{} mark finished", self);
152 downloading.downloaded += downloading.cur_chunk.0.cache().stream().len() as u64;
153 state.task_state = TaskStateImpl::Finished(downloading.downloaded);
154 },
155 _ => {}
156 };
157 }
158}
159
160impl NdnTask for ChunkListTask {
161 fn clone_as_task(&self) -> Box<dyn NdnTask> {
162 Box::new(self.clone())
163 }
164
165 fn state(&self) -> NdnTaskState {
166 match &self.0.state.read().unwrap().task_state {
167 TaskStateImpl::Pending => NdnTaskState::Running,
168 TaskStateImpl::Downloading(_) => NdnTaskState::Running,
169 TaskStateImpl::Finished(_) => NdnTaskState::Finished,
170 TaskStateImpl::Error(err) => NdnTaskState::Error(err.clone()),
171 }
172 }
173
174 fn control_state(&self) -> NdnTaskControlState {
175 match &self.0.state.read().unwrap().control_state {
176 ControlStateImpl::Normal(_) => NdnTaskControlState::Normal,
177 ControlStateImpl::Canceled => NdnTaskControlState::Canceled
178 }
179 }
180
181 fn cur_speed(&self) -> u32 {
182 let state = self.0.state.read().unwrap();
183 match &state.task_state {
184 TaskStateImpl::Downloading(downloading) => downloading.history_speed.latest(),
185 _ => 0,
186 }
187 }
188
189 fn history_speed(&self) -> u32 {
190 let state = self.0.state.read().unwrap();
191 match &state.task_state {
192 TaskStateImpl::Downloading(downloading) => downloading.history_speed.average(),
193 _ => 0,
194 }
195 }
196
197 fn transfered(&self) -> u64 {
198 let state = self.0.state.read().unwrap();
199 match &state.task_state {
200 TaskStateImpl::Downloading(downloading) => downloading.downloaded + downloading.cur_chunk.0.cache().stream().len() as u64,
201 TaskStateImpl::Finished(downloaded) => *downloaded,
202 _ => 0,
203 }
204
205 }
206
207 fn cancel_by_error(&self, err: BuckyError) -> BuckyResult<NdnTaskControlState> {
208 let waiters = {
209 let mut state = self.0.state.write().unwrap();
210 let waiters = match &mut state.control_state {
211 ControlStateImpl::Normal(waiters) => {
212 let waiters = Some(waiters.transfer());
213 state.control_state = ControlStateImpl::Canceled;
214 waiters
215 },
216 _ => None
217 };
218
219 match &state.task_state {
220 TaskStateImpl::Downloading(_) => {
221 info!("{} cancel by err {}", self, err);
222 state.task_state = TaskStateImpl::Error(err);
223 },
224 _ => {}
225 };
226
227 waiters
228 };
229
230 if let Some(waiters) = waiters {
231 waiters.wake();
232 }
233
234 Ok(NdnTaskControlState::Canceled)
235 }
236}
237
238#[async_trait::async_trait]
239impl DownloadTask for ChunkListTask {
240 fn clone_as_download_task(&self) -> Box<dyn DownloadTask> {
241 Box::new(self.clone())
242 }
243
244 fn on_post_add_to_root(&self, abs_path: String) {
245 self.0.state.write().unwrap().abs_path = Some(abs_path);
246 }
247
248 fn calc_speed(&self, when: Timestamp) -> u32 {
249 let mut state = self.0.state.write().unwrap();
250 match &mut state.task_state {
251 TaskStateImpl::Downloading(downloading) => {
252 let downloaded = downloading.downloaded + downloading.cur_chunk.0.cache().stream().len() as u64;
253 let cur_speed = downloading.cur_speed.update(downloaded, when);
254 debug!("{} calc_speed update cur_speed {}", self, cur_speed);
255 downloading.history_speed.update(Some(cur_speed), when);
256 cur_speed
257 }
258 _ => 0,
259 }
260 }
261
262 async fn wait_user_canceled(&self) -> BuckyError {
263 let waiter = {
264 let mut state = self.0.state.write().unwrap();
265
266 match &mut state.control_state {
267 ControlStateImpl::Normal(waiters) => Some(waiters.new_waiter()),
268 _ => None
269 }
270 };
271
272 if let Some(waiter) = waiter {
273 let _ = StateWaiter::wait(waiter, || self.control_state()).await;
274 }
275
276 BuckyError::new(BuckyErrorCode::UserCanceled, "")
277 }
278}
279
280
281pub struct ChunkListTaskReader {
282 offset: u64,
283 task: ChunkListTask
284}
285
286impl ChunkListTaskReader {
287 fn new(task: ChunkListTask) -> Self {
288 Self {
289 offset: 0,
290 task
291 }
292 }
293
294 pub fn task(&self) -> &dyn LeafDownloadTask {
295 &self.task
296 }
297}
298
299impl Drop for ChunkListTaskReader {
300 fn drop(&mut self) {
301 if self.offset == self.task.chunk_list().total_len() {
302 info!("{} drop after finished", self.task());
303 self.task.finish();
304 } else {
305 info!("{} drop before finished", self.task());
306 let _ = self.task.cancel();
307 }
308 }
309}
310
311impl std::io::Seek for ChunkListTaskReader {
312 fn seek(
313 self: &mut Self,
314 pos: SeekFrom,
315 ) -> std::io::Result<u64> {
316 let len = self.task.chunk_list().total_len();
317 let new_offset = match pos {
318 SeekFrom::Start(offset) => len.min(offset),
319 SeekFrom::Current(offset) => {
320 let offset = (self.offset as i64) + offset;
321 let offset = offset.max(0) as u64;
322 len.min(offset)
323 },
324 SeekFrom::End(offset) => {
325 let offset = (len as i64) + offset;
326 let offset = offset.max(0) as u64;
327 len.min(offset)
328 }
329 };
330 if new_offset < self.offset {
331 Err(std::io::Error::new(std::io::ErrorKind::Unsupported, "single directed stream"))
332 } else {
333 self.offset = new_offset;
334
335 Ok(new_offset)
336 }
337 }
338}
339
340
341impl DownloadTaskSplitRead for ChunkListTaskReader {
342 fn poll_split_read(
343 self: Pin<&mut Self>,
344 cx: &mut Context<'_>,
345 buffer: &mut [u8],
346 ) -> Poll<std::io::Result<Option<(ChunkCache, Range<usize>)>>> {
347 let pined = self.get_mut();
348 debug!("{} poll split read, buffer={}, offset={}", pined.task(), buffer.len(), pined.offset);
349 let ranges = pined.task.chunk_list().range_of(pined.offset..pined.offset + buffer.len() as u64);
350 if ranges.is_empty() {
351 debug!("{} poll split read break, buffer={}, offset={}", pined.task(), buffer.len(), pined.offset);
352 return Poll::Ready(Ok(None));
353 }
354 if let NdnTaskState::Error(err) = pined.task.state() {
355 debug!("{} poll split read break, buffer={}, offset={}", pined.task(), buffer.len(), pined.offset);
356 return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, BuckyError::new(err, ""))));
357 }
358 let (index, range) = ranges[0].clone();
359 debug!("{} poll split on chunk index, buffer={}, offset={}, index={}", pined.task(), buffer.len(), pined.offset, index);
360 let result = match pined.task.create_cache(index) {
361 Ok(cache) => {
362 let mut reader = DownloadTaskReader::new(cache, pined.task.clone_as_leaf_task());
363 use std::{io::{Seek}};
364 match reader.seek(SeekFrom::Start(range.start)) {
365 Ok(_) => {
366 let result = DownloadTaskSplitRead::poll_split_read(Pin::new(&mut reader), cx, &mut buffer[0..(range.end - range.start) as usize]);
367 if let Poll::Ready(result) = &result {
368 if let Some((_, r)) = result.as_ref().ok().and_then(|r| r.as_ref()) {
369 let old_offset = pined.offset;
370 pined.offset += (r.end - r.start) as u64;
371 debug!("{} poll split offset changed, buffer={}, offset={}, new_offset={}", pined.task(), buffer.len(), old_offset, pined.offset);
372 }
373 }
374 result
375 },
376 Err(err) => {
377 return Poll::Ready(Err(err));
378 }
379 }
380 }
381 Err(err) => {
382 return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err)));
383 }
384 };
385 result
386 }
387}
388
389
390impl async_std::io::Read for ChunkListTaskReader {
391 fn poll_read(
392 self: Pin<&mut Self>,
393 cx: &mut Context<'_>,
394 buffer: &mut [u8],
395 ) -> Poll<std::io::Result<usize>> {
396 self.poll_split_read(cx, buffer).map(|result| result.map(|r| if let Some((_, r)) = r {
397 r.end - r.start
398 } else {
399 0
400 }))
401 }
402}
403
404
405impl ChunkListTask {
406 pub fn reader(
407 stack: WeakStack,
408 name: String,
409 chunk_list: ChunkListDesc,
410 context: Box<dyn DownloadContext>
411 ) -> (Self, ChunkListTaskReader) {
412 let task = Self::new(stack, name, chunk_list, context);
413 let reader = ChunkListTaskReader::new(task.clone());
414
415 (task, reader)
416 }
417}