1use std::{
2 sync::{RwLock, Arc, Weak}, collections::LinkedList,
3};
4use async_std::{
5 task,
6};
7use cyfs_base::*;
8use crate::{
9 types::*,
10 stack::{WeakStack, Stack}
11};
12use super::super::{
13 types::*,
14 channel::*,
15 download::*,
16};
17use super::{
18 cache::*,
19};
20
21#[derive(Debug)]
22struct QueryContextOp {
23 op_id: IncreaseId,
24 filter: DownloadSourceFilter,
25 limit: usize
26}
27
28#[derive(Debug)]
29struct StartSessionOp {
30 op_id: IncreaseId,
31 update_at: Timestamp,
32 source: DownloadSource<DeviceDesc>
33}
34
35#[derive(Debug)]
36enum SessionOp {
37 None,
38 TrigerDrain(Timestamp),
39 StartSession(StartSessionOp),
40 QueryContext(QueryContextOp)
41}
42
43#[derive(Clone)]
44enum TryingSession {
45 None,
46 Starting(IncreaseId),
47 Running(DownloadSession)
48}
49
50impl TryingSession {
51 fn as_session(&self) -> Option<&DownloadSession> {
52 match self {
53 Self::Running(session) => Some(session),
54 _ => None
55 }
56 }
57}
58
59struct SingleStreamSession {
60 gen_id: IncreaseIdGenerator,
61 update_at: Timestamp,
62 tried: LinkedList<DownloadSession>,
63 trying: TryingSession,
64 querying: Option<IncreaseId>
65}
66
67impl SingleStreamSession {
68 fn new(update_at: Timestamp) -> (Self, QueryContextOp) {
69 let gen_id = IncreaseIdGenerator::new();
70 let op_id = gen_id.generate();
71 let session = Self {
72 gen_id,
73 update_at,
74 tried: LinkedList::new(),
75 trying: TryingSession::None,
76 querying: Some(op_id)
77 };
78 let op = QueryContextOp {
79 op_id,
80 filter: session.next_filter(),
81 limit: 1
82 };
83 (session, op)
84 }
85
86 fn check_context(&mut self, update_at: Timestamp) -> SessionOp {
87 if self.querying.is_some() {
88 return SessionOp::None;
89 }
90
91 match self.trying.clone() {
92 TryingSession::Starting(_) => SessionOp::None,
93 TryingSession::None => {
94 if update_at != self.update_at {
95 let op_id = self.gen_id.generate();
96 let op = QueryContextOp {
97 op_id,
98 filter: self.next_filter(),
99 limit: 1
100 };
101 self.querying = Some(op_id);
102 SessionOp::QueryContext(op)
103 } else {
104 SessionOp::None
105 }
106 },
107 TryingSession::Running(session) => {
108 match session.state() {
109 DownloadSessionState::Downloading => {
110 if update_at != self.update_at {
111 let op_id = self.gen_id.generate();
112 let op = QueryContextOp {
113 op_id,
114 filter: self.check_filter(),
115 limit: 1
116 };
117 self.querying = Some(op_id);
118 SessionOp::QueryContext(op)
119 } else {
120 SessionOp::None
121 }
122 },
123 DownloadSessionState::Canceled(_) => {
124 self.trying = TryingSession::None;
125 self.tried.push_back(session);
126 let op_id = self.gen_id.generate();
127 let op = QueryContextOp {
128 op_id,
129 filter: self.next_filter(),
130 limit: 1
131 };
132 self.querying = Some(op_id);
133 SessionOp::QueryContext(op)
134 },
135 DownloadSessionState::Finished => SessionOp::None
136 }
137 }
138 }
139 }
140
141 fn trying(&self) -> Option<&DownloadSession> {
142 self.trying.as_session()
143 }
144
145 fn next_filter(&self) -> DownloadSourceFilter {
146 DownloadSourceFilter {
147 exclude_target: Some(self.tried.iter().map(|session| session.source().target.clone()).collect()),
148 include_target: None,
149 include_codec: Some(vec![ChunkCodecDesc::Stream(None, None, None)]),
150 }
151 }
152
153 fn check_filter(&self) -> DownloadSourceFilter {
154 DownloadSourceFilter {
155 exclude_target: Some(self.tried.iter().map(|session| session.source().target.clone()).collect()),
156 include_target: self.trying.as_session().map(|session| vec![session.source().target.clone()]),
157 include_codec: self.trying.as_session().map(|session| vec![session.source().codec_desc.clone()]),
158 }
159 }
160
161 fn on_session_created(&mut self, op_id: IncreaseId, session: DownloadSession) -> bool {
162 let start = match &self.trying {
163 TryingSession::Starting(stub_id) => *stub_id == op_id,
164 _ => false
165 };
166 if !start {
167 return false;
168 }
169
170 self.trying = TryingSession::Running(session);
171 true
172 }
173
174 fn on_query_finished(&mut self, owner: ChunkDownloader, op: &QueryContextOp, result: (LinkedList<DownloadSource<DeviceDesc>>, Timestamp)) -> SessionOp {
175 let (mut sources, update_at) = result;
176
177 if !self.querying.map(|stub_id| stub_id == op.op_id).unwrap_or(false) {
178 info!("{} ignore queried sources for another query posted, op_id={}", owner, op.op_id);
179 return SessionOp::None;
180 }
181 self.querying = None;
182
183 let trying = self.trying.clone();
184 match trying {
185 TryingSession::None => {
186 if update_at != self.update_at {
187 self.update_at = update_at;
188 }
189 if sources.len() == 0 {
190 SessionOp::TrigerDrain(update_at)
191 } else {
192 let op_id = self.gen_id.generate();
193 self.trying = TryingSession::Starting(op_id);
194 SessionOp::StartSession(StartSessionOp {
195 op_id,
196 update_at,
197 source: sources.pop_front().unwrap()
198 })
199 }
200 },
201 TryingSession::Starting(_) => {
202 info!("{} ignore queried sources for another session starting, op_id={}", owner, op.op_id);
203 SessionOp::None
204 },
205 TryingSession::Running(session) => {
206 if update_at != self.update_at {
207 self.update_at = update_at;
208 if sources.len() == 0 {
209 info!("{} cancel current session for context updated, op_id={}, session={}", owner, op.op_id, session);
210 session.cancel_by_error(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled"));
211 self.tried.push_back(session);
212 self.trying = TryingSession::None;
213
214 let op_id = self.gen_id.generate();
215 self.querying = Some(op_id);
216 SessionOp::QueryContext(QueryContextOp {
217 op_id,
218 filter: self.next_filter(),
219 limit: 1
220 })
221 } else {
222 SessionOp::None
223 }
224 } else {
225 unreachable!()
226 }
227 }
228 }
229 }
230}
231
232
233
234enum StateImpl {
235 Loading,
236 Downloading(SingleStreamSession),
237 Finished
238}
239
240struct ChunkDowloaderImpl {
241 stack: WeakStack,
242 task: Box<dyn LeafDownloadTask>,
243 cache: ChunkCache,
244 state: RwLock<StateImpl>,
245}
246
247impl std::fmt::Display for ChunkDowloaderImpl {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 write!(f, "ChunkDownloader{{chunk:{}}}", self.cache.chunk())
250 }
251}
252
253#[derive(Clone)]
254pub struct ChunkDownloader(Arc<ChunkDowloaderImpl>);
255
256impl Drop for ChunkDowloaderImpl {
257 fn drop(&mut self) {
258 let session = {
259 let state = &mut *self.state.write().unwrap();
260 match state {
261 StateImpl::Downloading(downloading) => downloading.trying.as_session().cloned(),
262 _ => None
263 }
264 };
265
266 if let Some(session) = session {
267 info!("{} canceled for drop", self);
268 session.cancel_by_error(BuckyError::new(BuckyErrorCode::UserCanceled, "user canceled"));
269 }
270 }
271}
272
273#[derive(Clone)]
274pub struct WeakChunkDownloader(Weak<ChunkDowloaderImpl>);
275
276impl WeakChunkDownloader {
277 pub fn to_strong(&self) -> Option<ChunkDownloader> {
278 Weak::upgrade(&self.0).map(|arc| ChunkDownloader(arc))
279 }
280}
281
282impl ChunkDownloader {
283 pub fn to_weak(&self) -> WeakChunkDownloader {
284 WeakChunkDownloader(Arc::downgrade(&self.0))
285 }
286}
287
288impl std::fmt::Display for ChunkDownloader {
289 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
290 write!(f, "{}", self.0)
291 }
292}
293
294
295impl ChunkDownloader {
296 pub fn new(
297 stack: WeakStack,
298 cache: ChunkCache,
299 task: Box<dyn LeafDownloadTask>
300 ) -> Self {
301 let downloader = Self(Arc::new(ChunkDowloaderImpl {
302 stack,
303 cache,
304 task,
305 state: RwLock::new(StateImpl::Loading),
306 }));
307
308 {
309 let downloader = downloader.clone();
310
311 task::spawn(async move {
312 info!("{} begin load cache", downloader);
313 let finished = downloader.cache().wait_loaded().await;
314 let update_at = task::block_on(downloader.owner().context().update_at());
315 let op = {
316 let state = &mut *downloader.0.state.write().unwrap();
317 if let StateImpl::Loading = state {
318 if finished {
319 *state = StateImpl::Finished;
320 info!("{} finished for cache exists", downloader);
321 None
322 } else {
323 info!("{} enter downloading", downloader);
324 let (downloading, op) = SingleStreamSession::new(update_at);
325 *state = StateImpl::Downloading(downloading);
326 Some(op)
327 }
328 } else {
329 unreachable!()
330 }
331 };
332
333 if let Some(op) = op {
334 {
335 let downloader = downloader.clone();
336 task::spawn(async move { downloader.sync_finished().await; });
337 }
338 downloader.query_context(op).await;
339 }
340
341 });
342 }
343
344 downloader
345 }
346
347 fn on_session_op(&self, op: SessionOp) {
348 match op {
349 SessionOp::None => {},
350 SessionOp::QueryContext(op) => {
351 let downloader = self.clone();
352 task::spawn(async move { downloader.query_context(op).await; });
353 },
354 SessionOp::TrigerDrain(update_at) => self.owner().context().on_drain(self.owner(), update_at),
355 SessionOp::StartSession(op) => {
356 let downloader = self.clone();
357 task::spawn(async move { downloader.start_session(op).await; });
358 }
359 }
360 }
361
362 async fn query_context(&self, mut op: QueryContextOp) {
363 op.filter.fill_values(self.chunk());
364 let result = self.owner().context().sources_of(&op.filter, op.limit).await;
365 info!("{} return sources from context, op_id={}, sources={:?}, update_at={}", self, op.op_id, result.0, result.1);
366 let next_op = {
367 let mut state = self.0.state.write().unwrap();
368 match &mut *state {
369 StateImpl::Downloading(downloading) => downloading.on_query_finished(self.clone(), &op, result),
370 _ => SessionOp::None
371 }
372 };
373 info!("{} will exec op after queried source, query_id={}, next_op={:?}", self, op.op_id, next_op);
374 self.on_session_op(next_op)
375 }
376
377 async fn start_session(&self, op: StartSessionOp) {
378 info!("{} will start session, op_id={}", self, op.op_id);
379
380 let stack = Stack::from(&self.0.stack);
381 let channel = stack.ndn().channel_manager().create_channel(&op.source.target).unwrap();
382
383 let mut source: DownloadSource<DeviceId> = op.source.into();
384 source.codec_desc = match &source.codec_desc {
385 ChunkCodecDesc::Unknown => ChunkCodecDesc::Stream(None, None, None).fill_values(self.chunk()),
386 ChunkCodecDesc::Stream(..) => source.codec_desc.fill_values(self.chunk()),
387 _ => unimplemented!()
388 };
389
390 let session = channel.download(
391 self.chunk().clone(),
392 source.clone(),
393 self.cache().stream().clone(),
394 Some(self.owner().context().referer().to_owned()),
395 self.owner().abs_group_path().clone()).or_else(|err| {
396 Ok::<DownloadSession, ()>(DownloadSession::error(self.chunk().clone(), None, source, None, None, err))
397 }).unwrap();
398
399 let start = {
400 let mut state = self.0.state.write().unwrap();
401 match &mut *state {
402 StateImpl::Downloading(downloading) => downloading.on_session_created(op.op_id, session.clone()),
403 _ => false
404 }
405 };
406
407 if start {
408 info!("{} will start session, op_id={}, session={}", self, op.op_id, session);
409 session.start();
410 } else {
411 session.cancel_by_error(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled"));
412 }
413 self.owner().context().on_new_session(self.owner(), &session, op.update_at);
414 }
415
416 async fn sync_finished(&self) {
417 if self.cache().wait_exists(0..self.cache().chunk().len(), || self.owner().wait_user_canceled()).await.is_ok() {
418 info!("{} finished", self);
419 let state = &mut *self.0.state.write().unwrap();
420 *state = StateImpl::Finished;
421 }
422 }
423
424 async fn finished(&self) -> bool {
425 if let StateImpl::Finished = &*self.0.state.read().unwrap() {
426 true
427 } else {
428 false
429 }
430 }
431
432 pub fn owner(&self) -> &dyn LeafDownloadTask {
433 self.0.task.as_ref()
434 }
435
436 pub fn cache(&self) -> &ChunkCache {
437 &self.0.cache
438 }
439
440 pub fn chunk(&self) -> &ChunkId {
441 self.cache().chunk()
442 }
443
444 pub fn calc_speed(&self, when: Timestamp) -> u32 {
445 match &*self.0.state.read().unwrap() {
446 StateImpl::Downloading(downloading) => downloading.trying().map(|s| s.calc_speed(when)).unwrap_or_default(),
447 _ => 0
448 }
449 }
450
451 pub fn cur_speed(&self) -> u32 {
452 match &*self.0.state.read().unwrap() {
453 StateImpl::Downloading(downloading) => downloading.trying().map(|s| s.cur_speed()).unwrap_or_default(),
454 _ => 0
455 }
456 }
457
458 pub fn history_speed(&self) -> u32 {
459 match &*self.0.state.read().unwrap() {
460 StateImpl::Downloading(downloading) => downloading.trying().map(|s| s.history_speed()).unwrap_or_default(),
461 _ => 0
462 }
463 }
464
465 pub fn on_drain(&self, _: u32) -> u32 {
466 let update_at = task::block_on(self.owner().context().update_at());
467 let (speed, op) = {
468 let mut state = self.0.state.write().unwrap();
469
470 match &mut *state{
471 StateImpl::Downloading(downloading) => {
472 let speed = downloading.trying().map(|s| s.cur_speed()).unwrap_or_default();
473 let op = downloading.check_context(update_at);
474 (speed, op)
475 }
476 _ => (0, SessionOp::None)
477 }
478 };
479 self.on_session_op(op);
480 speed
481 }
482}