1use log::*;
2use std::{
3 sync::{RwLock},
4};
5use async_std::{
6 sync::Arc,
7};
8use futures::future::AbortRegistration;
9use cyfs_base::*;
10use crate::{
11 types::*
12};
13use super::super::{
14 chunk::*,
15 types::*,
16 download::*
17};
18use super::{
19 protocol::v0::*,
20 channel::{Channel, WeakChannel},
21 tunnel::{
22 DynamicChannelTunnel,
23 TunnelDownloadState
24 }
25};
26
27
28struct InterestingState {
29 waiters: StateWaiter,
30 last_send_time: Option<Timestamp>,
31 next_send_time: Option<Timestamp>,
32 history_speed: HistorySpeed,
33 cache: ChunkStreamCache,
34 channel: Channel
35}
36
37struct DownloadingState {
38 waiters: StateWaiter,
39 tunnel_state: Box<dyn TunnelDownloadState>,
40 decoder: Box<dyn ChunkDecoder>,
41 speed_counter: SpeedCounter,
42 history_speed: HistorySpeed,
43 channel: Channel
44}
45
46
47struct FinishedState {
48 send_ctrl_time: Option<(WeakChannel, Timestamp)>,
49}
50
51struct CanceledState {
52 send_ctrl_time: Option<(WeakChannel, Timestamp)>,
53 err: BuckyError
54}
55
56#[derive(Debug, Clone)]
57pub enum DownloadSessionState {
58 Downloading,
59 Finished,
60 Canceled(BuckyError),
61}
62
63enum StateImpl {
64 Interesting(InterestingState),
65 Downloading(DownloadingState),
66 Finished(FinishedState),
67 Canceled(CanceledState),
68}
69
70impl StateImpl {
71 fn to_session_state(&self) -> DownloadSessionState {
72 match self {
73 Self::Interesting(_) => DownloadSessionState::Downloading,
74 Self::Downloading(_) => DownloadSessionState::Downloading,
75 Self::Finished(_) => DownloadSessionState::Finished,
76 Self::Canceled(canceled) => DownloadSessionState::Canceled(canceled.err.clone()),
77 }
78 }
79}
80
81
82struct SessionImpl {
83 chunk: ChunkId,
84 session_id: TempSeq,
85 source: DownloadSource<DeviceId>,
86 referer: Option<String>,
87 group_path: Option<String>,
88 state: RwLock<StateImpl>,
89}
90
91#[derive(Clone)]
92pub struct DownloadSession(Arc<SessionImpl>);
93
94impl std::fmt::Display for DownloadSession {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 write!(f, "DownloadSession{{session_id:{:?}, chunk:{}, source:{}}}", self.session_id(), self.chunk(), self.source().target)
97 }
98}
99
100
101impl DownloadSession {
102 pub fn error(
103 chunk: ChunkId,
104 session_id: Option<TempSeq>,
105 source: DownloadSource<DeviceId>,
106 referer: Option<String>,
107 group_path: Option<String>,
108 err: BuckyError
109 ) -> Self {
110 Self(Arc::new(SessionImpl {
111 chunk,
112 session_id: session_id.unwrap_or_default(),
113 source,
114 referer,
115 group_path,
116 state: RwLock::new(StateImpl::Canceled(CanceledState {
117 send_ctrl_time: None,
118 err
119 })),
120 }))
121 }
122
123 pub fn interest(
124 chunk: ChunkId,
125 session_id: TempSeq,
126 channel: Channel,
127 source: DownloadSource<DeviceId>,
128 cache: ChunkStreamCache,
129 referer: Option<String>,
130 group_path: Option<String>
131 ) -> Self {
132 Self(Arc::new(SessionImpl {
133 chunk,
134 session_id,
135 source,
136 referer,
137 group_path,
138 state: RwLock::new(StateImpl::Interesting(InterestingState {
139 history_speed: HistorySpeed::new(0, channel.config().history_speed.clone()),
140 waiters: StateWaiter::new(),
141 last_send_time: None,
142 next_send_time: None,
143 channel,
144 cache
145 })),
146 }))
147 }
148
149 pub fn source(&self) -> &DownloadSource<DeviceId> {
150 &self.0.source
151 }
152
153 pub fn referer(&self) -> &Option<String> {
154 &self.0.referer
155 }
156
157 pub fn group_path(&self) -> &Option<String> {
158 &self.0.group_path
159 }
160
161 pub fn start(&self) {
162 let send = {
163 let state = &mut *self.0.state.write().unwrap();
164 match state {
165 StateImpl::Interesting(interesting) => {
166 if interesting.last_send_time.is_none() {
167 let now = bucky_time_now();
168 interesting.last_send_time = Some(now);
169 interesting.next_send_time = Some(now + interesting.channel.config().resend_interval.as_micros() as u64);
170 Some(interesting.channel.clone())
171 } else {
172 None
173 }
174 },
175 _ => None
176 }
177 };
178
179 if let Some(channel) = send {
180 let interest = Interest {
181 session_id: self.session_id().clone(),
182 chunk: self.chunk().clone(),
183 prefer_type: self.source().codec_desc.clone(),
184 referer: self.referer().clone(),
185 group_path: self.group_path().clone(),
186 from: None,
187 };
188 info!("{} sent {:?}", self, interest);
189 channel.interest(interest);
190 }
191
192 }
193
194 pub fn chunk(&self) -> &ChunkId {
195 &self.0.chunk
196 }
197
198 pub fn state(&self) -> DownloadSessionState {
199 (&self.0.state.read().unwrap()).to_session_state()
200 }
201
202 pub fn session_id(&self) -> &TempSeq {
203 &self.0.session_id
204 }
205
206 pub fn ptr_eq(&self, other: &Self) -> bool {
207 Arc::ptr_eq(&self.0, &other.0)
208 }
209
210 pub async fn wait_finish(&self) -> DownloadSessionState {
211 enum NextStep {
212 Wait(AbortRegistration),
213 Return(DownloadSessionState)
214 }
215 let next_step = {
216 let state = &mut *self.0.state.write().unwrap();
217 match state {
218 StateImpl::Interesting(interesting) => NextStep::Wait(interesting.waiters.new_waiter()),
219 StateImpl::Downloading(downloading) => NextStep::Wait(downloading.waiters.new_waiter()),
220 StateImpl::Finished(_) => NextStep::Return(DownloadSessionState::Finished),
221 StateImpl::Canceled(canceled) => NextStep::Return(DownloadSessionState::Canceled(canceled.err.clone())),
222 }
223 };
224 match next_step {
225 NextStep::Wait(waker) => StateWaiter::wait(waker, || self.state()).await,
226 NextStep::Return(state) => state
227 }
228 }
229
230 pub(super) fn push_piece_data(&self, channel: &Channel, piece: &PieceData, tunnel: &DynamicChannelTunnel) {
231 enum NextStep {
232 EnterDownloading,
233 RespControl(PieceControlCommand),
234 Ignore,
235 Push(Box<dyn ChunkDecoder>)
236 }
237 use NextStep::*;
238 use StateImpl::*;
239 let next_step = {
240 let state = &mut *self.0.state.write().unwrap();
241 match state {
242 Interesting(_) => EnterDownloading,
243 Downloading(downloading) => {
244 downloading.speed_counter.on_recv(piece.data.len());
245 Push(downloading.decoder.clone_as_decoder())
246 },
247 Finished(finished) => {
248 let now = bucky_time_now();
249 if finished.send_ctrl_time.is_none() {
250 finished.send_ctrl_time = Some((channel.to_weak(), now + channel.config().resend_interval.as_micros() as u64))
251 } {
252 Ignore
253 }
254 },
255 Canceled(canceled) => {
256 let now = bucky_time_now();
257 if canceled.send_ctrl_time.is_none() {
258 canceled.send_ctrl_time = Some((channel.to_weak(), now + channel.config().resend_interval.as_micros() as u64))
259 } {
260 Ignore
261 }
262 },
263 }
264 };
265
266 let resp_control = |command: PieceControlCommand| {
267 channel.send_piece_control(PieceControl {
268 sequence: channel.gen_command_seq(),
269 session_id: self.session_id().clone(),
270 chunk: self.chunk().clone(),
271 command,
272 max_index: None,
273 lost_index: None
274 });
275 };
276
277 let push_to_decoder = |provider: Box<dyn ChunkDecoder>| {
278 let result = provider.push_piece_data(piece).unwrap();
279 if let Some(waiters) = {
280 let state = &mut *self.0.state.write().unwrap();
281 match state {
282 Downloading(downloading) => {
283 if result.valid {
284 downloading.tunnel_state.as_mut().on_piece_data();
285 }
286 if result.finished {
287 let mut waiters = StateWaiter::new();
288 std::mem::swap(&mut waiters, &mut downloading.waiters);
289 info!("{} finished", self);
290 *state = Finished(FinishedState {
291 send_ctrl_time: None,
292 });
293 Some(waiters)
294 } else {
295 None
296 }
297 },
298 _ => None
299 }
300 } {
301 waiters.wake();
302 resp_control(PieceControlCommand::Finish);
303 }
304 };
305
306 match next_step {
307 EnterDownloading => {
308 if let Some(decoder) = {
309 let state = &mut *self.0.state.write().unwrap();
310 match state {
311 Interesting(interesting) => {
312 let decoder = StreamDecoder::new(self.chunk(), &self.source().codec_desc, interesting.cache.clone());
313 let mut downloading = DownloadingState {
314 channel: channel.clone(),
315 tunnel_state: tunnel.as_ref().download_state(),
316 history_speed: interesting.history_speed.clone(),
317 speed_counter: SpeedCounter::new(piece.data.len()),
318 decoder: decoder.clone_as_decoder(),
319 waiters: StateWaiter::new(),
320 };
321 std::mem::swap(&mut downloading.waiters, &mut interesting.waiters);
322 *state = Downloading(downloading);
323 Some(decoder.clone_as_decoder())
324 },
325 Downloading(downloading) => {
326 Some(downloading.decoder.clone_as_decoder())
327 },
328 _ => None
329 }
330 } {
331 push_to_decoder(decoder)
332 }
333
334 },
335 Push(decoder) => {
336 push_to_decoder(decoder)
337 },
338 RespControl(cmd) => resp_control(cmd),
339 Ignore => {}
340 }
341 }
342
343 pub(super) fn on_resp_interest(&self, channel: &Channel, resp_interest: &RespInterest) -> BuckyResult<()> {
344 match resp_interest.err {
345 BuckyErrorCode::Ok => unimplemented!(),
346 BuckyErrorCode::WouldBlock => {
347 use StateImpl::*;
348 let state = &mut *self.0.state.write().unwrap();
349 match state {
350 Interesting(interesting) => {
351 interesting.next_send_time = Some(bucky_time_now() + channel.config().block_interval.as_micros() as u64);
352 },
353 Downloading(downloading) => {
354 downloading.tunnel_state.on_resp_interest();
355 },
356 Finished(finished) => {
357 let now = bucky_time_now();
358 if finished.send_ctrl_time.is_none() {
359 finished.send_ctrl_time = Some((channel.to_weak(), now + channel.config().resend_interval.as_micros() as u64))
360 }
361 },
362 Canceled(canceled) => {
363 let now = bucky_time_now();
364 if canceled.send_ctrl_time.is_none() {
365 canceled.send_ctrl_time = Some((channel.to_weak(), now + channel.config().resend_interval.as_micros() as u64))
366 }
367 }
368 }
369 },
370 _ => {
371 error!("{} cancel by err {}", self, resp_interest.err);
372
373 let mut waiters = StateWaiter::new();
374 {
375 let state = &mut *self.0.state.write().unwrap();
376 match state {
377 StateImpl::Interesting(interesting) => {
378 std::mem::swap(&mut waiters, &mut interesting.waiters);
379 *state = StateImpl::Canceled(CanceledState {
380 send_ctrl_time: None,
381 err: BuckyError::new(resp_interest.err, "cancel by remote")
382 });
383 },
384 StateImpl::Downloading(downloading) => {
385 std::mem::swap(&mut waiters, &mut downloading.waiters);
386 *state = StateImpl::Canceled(CanceledState {
387 send_ctrl_time: None,
388 err: BuckyError::new(resp_interest.err, "cancel by remote")
389 });
390 },
391 _ => {}
392 };
393 }
394
395 waiters.wake();
396 }
397 }
398 Ok(())
399 }
400
401 fn resend_interest(&self, channel: &Channel) -> BuckyResult<()> {
402 let interest = Interest {
403 session_id: self.session_id().clone(),
404 chunk: self.chunk().clone(),
405 prefer_type: self.source().codec_desc.clone(),
406 referer: self.referer().clone(),
407 from: None,
408 group_path: None
409 };
410 info!("{} sent {:?}", self, interest);
411 channel.interest(interest);
412 Ok(())
413 }
414
415
416 pub fn cancel_by_error(&self, err: BuckyError) {
417 error!("{} cancel by err {}", self, err);
418
419 let mut waiters = StateWaiter::new();
420 let send = {
421 let state = &mut *self.0.state.write().unwrap();
422 match state {
423 StateImpl::Interesting(interesting) => {
424 std::mem::swap(&mut waiters, &mut interesting.waiters);
425 let channel = interesting.channel.clone();
426 *state = StateImpl::Canceled(CanceledState {
427 send_ctrl_time: None,
428 err
429 });
430 Some(channel)
431 },
432 StateImpl::Downloading(downloading) => {
433 std::mem::swap(&mut waiters, &mut downloading.waiters);
434 let channel = downloading.channel.clone();
435 *state = StateImpl::Canceled(CanceledState {
436 send_ctrl_time: None,
437 err
438 });
439 Some(channel)
440 },
441 _ => None
442 }
443 };
444 waiters.wake();
445
446 if let Some(channel) = send {
447 channel.send_piece_control(PieceControl {
448 sequence: channel.gen_command_seq(),
449 session_id: self.session_id().clone(),
450 chunk: self.chunk().clone(),
451 command: PieceControlCommand::Cancel,
452 max_index: None,
453 lost_index: None
454 });
455 }
456 }
457
458 pub(super) fn on_time_escape(&self, now: Timestamp) -> BuckyResult<()> {
459 enum NextStep {
460 None,
461 SendInterest(Channel),
462 SendPieceControl(Channel, PieceControl),
463 }
464
465 let next_step = {
466 let state = &mut *self.0.state.write().unwrap();
467 match state {
468 StateImpl::Interesting(interesting) => {
469 if let Some(next_send_time) = interesting.next_send_time {
470 if now > next_send_time {
471 interesting.next_send_time = Some(now + 2 * (next_send_time - interesting.last_send_time.unwrap()));
472 interesting.last_send_time = Some(now);
473 NextStep::SendInterest(interesting.channel.clone())
474 } else {
475 NextStep::None
476 }
477 } else {
478 NextStep::None
479 }
480
481 },
482 StateImpl::Downloading(downloading) => {
483 if downloading.tunnel_state.as_mut().on_time_escape(now) {
484 if let Some((max_index, lost_index)) = downloading.decoder.require_index() {
485 debug!("{} dectect loss piece max_index:{:?} lost_index:{:?}", self, max_index, lost_index);
486 NextStep::SendPieceControl(downloading.channel.clone(), PieceControl {
487 sequence: downloading.channel.gen_command_seq(),
488 session_id: self.session_id().clone(),
489 chunk: self.chunk().clone(),
490 command: PieceControlCommand::Continue,
491 max_index,
492 lost_index
493 })
494 } else {
495 NextStep::None
496 }
497 } else {
498 NextStep::None
499 }
500 },
501 StateImpl::Finished(finished) => {
502 if let Some((channel, send_time)) = &finished.send_ctrl_time {
503 if now > *send_time {
504 let channel = channel.to_strong();
505 finished.send_ctrl_time = None;
506 if let Some(channel) = channel {
507 let ctrl = PieceControl {
508 sequence: channel.gen_command_seq(),
509 session_id: self.session_id().clone(),
510 chunk: self.chunk().clone(),
511 command: PieceControlCommand::Finish,
512 max_index: None,
513 lost_index: None
514 };
515
516 NextStep::SendPieceControl(channel, ctrl)
517 } else {
518 NextStep::None
519 }
520 } else {
521 NextStep::None
522 }
523 } else {
524 NextStep::None
525 }
526 }
527 StateImpl::Canceled(canceled) => {
528 if let Some((channel, send_time)) = &canceled.send_ctrl_time {
529 if now > *send_time {
530 let channel = channel.to_strong();
531 canceled.send_ctrl_time = None;
532 if let Some(channel) = channel {
533 let ctrl = PieceControl {
534 sequence: channel.gen_command_seq(),
535 session_id: self.session_id().clone(),
536 chunk: self.chunk().clone(),
537 command: PieceControlCommand::Cancel,
538 max_index: None,
539 lost_index: None
540 };
541 NextStep::SendPieceControl(channel, ctrl)
542 } else {
543 NextStep::None
544 }
545 } else {
546 NextStep::None
547 }
548 } else {
549 NextStep::None
550 }
551 }
552 }
553 };
554
555 match next_step {
556 NextStep::None => Ok(()),
557 NextStep::SendInterest(channel) => {
558 let _ = self.resend_interest(&channel);
559 Ok(())
560 },
561 NextStep::SendPieceControl(channel, ctrl) => {
562 channel.send_piece_control(ctrl);
563 Ok(())
564 }
565 }
566 }
567
568 pub fn calc_speed(&self, when: Timestamp) -> u32 {
569 let state = &mut *self.0.state.write().unwrap();
570 match state {
571 StateImpl::Interesting(interesting) => {
572 interesting.history_speed.update(Some(0), when);
573 0
574 },
575 StateImpl::Downloading(downloading) => {
576 let cur_speed = downloading.speed_counter.update(when);
577 downloading.history_speed.update(Some(cur_speed), when);
578 cur_speed
579 },
580 _ => 0
581 }
582 }
583
584 pub fn cur_speed(&self) -> u32 {
585 let state = &*self.0.state.read().unwrap();
586 match state {
587 StateImpl::Downloading(downloading) => downloading.history_speed.latest(),
588 _ => 0
589 }
590 }
591
592 pub fn history_speed(&self) -> u32 {
593 let state = &*self.0.state.read().unwrap();
594 match state {
595 StateImpl::Interesting(interesting) => interesting.history_speed.average(),
596 StateImpl::Downloading(downloading) => downloading.history_speed.average(),
597 _ => 0
598 }
599 }
600}
601
602
603
604