1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use std::{
sync::RwLock,
collections::LinkedList
};
use async_std::{
sync::Arc
};
use cyfs_base::*;
use super::super::{
scheduler::*,
channel::UploadSession,
channel::PieceSessionType,
};
use super::{
encode::*,
view::{ChunkView}
};
struct UploaderImpl {
view: ChunkView,
resource: ResourceManager,
sessions: RwLock<LinkedList<UploadSession>>,
}
#[derive(Clone)]
pub struct ChunkUploader(Arc<UploaderImpl>);
impl std::fmt::Display for ChunkUploader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ChunkUploader:{{chunk:{}}}", self.chunk())
}
}
impl ChunkUploader {
pub fn new(
view: ChunkView,
owner: ResourceManager
) -> Self {
Self(Arc::new(UploaderImpl {
view,
resource: ResourceManager::new(Some(owner)),
sessions: RwLock::new(LinkedList::new()),
}))
}
pub fn chunk(&self) -> &ChunkId {
&self.0.view.chunk()
}
pub fn resource(&self) -> &ResourceManager {
&self.0.resource
}
pub fn schedule_state(&self) -> TaskState {
let sessions = self.0.sessions.read().unwrap();
Self::collect_state(&*sessions)
}
fn collect_state(sessions: &LinkedList<UploadSession>) -> TaskState {
for session in sessions {
let state = session.schedule_state();
match state {
TaskState::Finished => continue,
TaskState::Canceled(_) => continue,
_ => {
return TaskState::Running(0);
}
}
}
TaskState::Finished
}
pub fn add_session(&self, session: UploadSession) -> BuckyResult<()> {
info!("{} try add new session {}", self, session);
{
let mut sessions = self.0.sessions.write().unwrap();
if sessions.iter().find(|s|
session.channel().remote().eq(s.channel().remote())
&& session.chunk().eq(s.chunk())
&& session.session_id().eq(s.session_id())).is_some() {
info!("session {} exists in {}, ignore it", session, self);
return Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "session exists"));
}
sessions.push_front(session.clone());
}
let encoder = match *session.piece_type() {
PieceSessionType::RaptorA(_) | PieceSessionType::RaptorB(_) => TypedChunkEncoder::Raptor(self.0.view.raptor_encoder()),
PieceSessionType::Stream(_) => TypedChunkEncoder::Range(RangeEncoder::from_reader(self.0.view.reader().unwrap(), self.chunk())),
_ => unreachable!()
};
session.start(encoder);
Ok(())
}
}
impl Scheduler for ChunkUploader {
fn collect_resource_usage(&self) {
let mut sessions = self.0.sessions.write().unwrap();
let mut remain = LinkedList::new();
loop {
if let Some(session) = sessions.pop_front() {
let state = session.schedule_state();
match state {
TaskState::Finished => {
let _ = self.resource().remove_child(session.resource());
info!("{} remove session {} for finished", self, session);
},
TaskState::Canceled(_) => {
let _ = self.resource().remove_child(session.resource());
info!("{} remove session {} for canceled", self, session);
},
_ => {
remain.push_back(session);
}
}
} else {
break;
}
}
*sessions = remain;
}
fn schedule_resource(&self) {
}
fn apply_scheduled_resource(&self) {
}
}