1use log::*;
2use std::{
3 ops::Range,
4 sync::{RwLock}
5};
6use async_std::{
7 sync::Arc,
8};
9use cyfs_base::*;
10use crate::{
11 types::*
12};
13use super::super::{
14 chunk::*,
15 upload::*,
16 types::*
17};
18use super::{
19 protocol::v0::*,
20 channel::Channel,
21};
22
23struct UploadingState {
24 channel: Channel,
25 waiters: StateWaiter,
26 speed_counter: SpeedCounter,
27 uploaded: u64,
28 history_speed: HistorySpeed,
29 encoder: Box<dyn ChunkEncoder>
30}
31
32struct StateImpl {
33 task_state: TaskStateImpl,
34 control_state: NdnTaskControlState,
35}
36
37enum TaskStateImpl {
38 Uploading(UploadingState),
39 Finished(u64),
40 Error(BuckyError),
41}
42
43struct SessionImpl {
44 remote: DeviceId,
45 chunk: ChunkId,
46 session_id: TempSeq,
47 piece_type: ChunkCodecDesc,
48 state: RwLock<StateImpl>,
49}
50
51#[derive(Clone)]
52pub struct UploadSession(Arc<SessionImpl>);
53
54impl std::fmt::Display for UploadSession {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 write!(f, "UploadSession{{session_id:{:?}, chunk:{}, remote:{}}}", self.session_id(), self.chunk(), self.remote())
57 }
58}
59
60impl UploadSession {
61 pub fn new(
62 chunk: ChunkId,
63 session_id: TempSeq,
64 piece_type: ChunkCodecDesc,
65 encoder: Box<dyn ChunkEncoder>,
66 channel: Channel
67 ) -> Self {
68 Self(Arc::new(SessionImpl {
69 remote: channel.tunnel().remote().clone(),
70 chunk,
71 session_id,
72 piece_type,
73 state: RwLock::new(StateImpl{
74 task_state: TaskStateImpl::Uploading(UploadingState {
75 waiters: StateWaiter::new(),
76 history_speed: HistorySpeed::new(0, channel.config().history_speed.clone()),
77 speed_counter: SpeedCounter::new(0),
78 uploaded: 0,
79 encoder,
80 channel
81 }),
82 control_state: NdnTaskControlState::Normal
83 }),
84 }))
85 }
86
87 pub fn remote(&self) -> &DeviceId {
88 &self.0.remote
89 }
90
91 pub fn chunk(&self) -> &ChunkId {
92 &self.0.chunk
93 }
94
95 pub fn piece_type(&self) -> &ChunkCodecDesc {
96 &self.0.piece_type
97 }
98
99 pub fn session_id(&self) -> &TempSeq {
100 &self.0.session_id
101 }
102
103
104 pub(super) fn next_piece(&self, buf: &mut [u8]) -> BuckyResult<usize> {
105 let encoder = {
106 let state = self.0.state.read().unwrap();
107 match &state.task_state {
108 TaskStateImpl::Uploading(uploading) => {
109 Some(uploading.encoder.clone_as_encoder())
110 },
111 _ => None
112 }
113 };
114 if let Some(encoder) = encoder {
115 match encoder.next_piece(self.session_id(), buf) {
116 Ok(len) => {
117 let mut state = self.0.state.write().unwrap();
118 match &mut state.task_state {
119 TaskStateImpl::Uploading(uploading) => {
120 if len > 0 {
121 uploading.speed_counter.on_recv(len);
122 uploading.uploaded += len as u64;
123 }
124 Ok(len)
125 },
126 _ => {
127 Err(BuckyError::new(BuckyErrorCode::ErrorState, "not uploading"))
128 }
129 }
130
131 },
132 Err(err) => {
133 self.cancel_by_error(BuckyError::new(err.code(), "encoder failed"));
134 Err(err)
135 }
136 }
137 } else {
138 Ok(0)
139 }
140 }
141
142 pub(super) fn cancel_by_error(&self, err: BuckyError) {
143 let send = {
144 let mut state = self.0.state.write().unwrap();
145 match &mut state.task_state {
146 TaskStateImpl::Error(_) => None,
147 TaskStateImpl::Finished(_) => None,
148 TaskStateImpl::Uploading(uploading) => {
149 let mut waiters = StateWaiter::new();
150 uploading.waiters.transfer_into(&mut waiters);
151 let channel = uploading.channel.clone();
152 info!("{} canceled by err:{}", self, err);
153 state.task_state = TaskStateImpl::Error(err.clone());
154 Some((waiters, channel))
155 }
156 }
157 };
158
159 if let Some((waiters, channel)) = send {
160 let resp_interest = RespInterest {
161 session_id: self.session_id().clone(),
162 chunk: self.chunk().clone(),
163 err: err.code(),
164 redirect: None,
165 redirect_referer: None,
166 to: None,
167 };
168 channel.resp_interest(resp_interest);
169
170 waiters.wake();
171 }
172 }
173
174 pub fn on_interest(&self, channel: &Channel, _interest: &Interest) -> BuckyResult<()> {
176 enum NextStep {
177 ResetEncoder(Box<dyn ChunkEncoder>),
178 RespInterest(BuckyErrorCode),
179 None
180 }
181 let next_step = {
182 let state = self.0.state.read().unwrap();
183 match &state.task_state {
184 TaskStateImpl::Uploading(uploading) => {
185 NextStep::ResetEncoder(uploading.encoder.clone_as_encoder())
186 },
187 TaskStateImpl::Error(err) => {
188 NextStep::RespInterest(err.code())
189 },
190 _ => {
191 NextStep::None
192 }
193 }
194 };
195
196 match next_step {
197 NextStep::ResetEncoder(encoder) => {
198 debug!("{} will reset index", self);
199 if !encoder.reset() {
200 let resp_interest = RespInterest {
201 session_id: self.session_id().clone(),
202 chunk: self.chunk().clone(),
203 err: BuckyErrorCode::WouldBlock,
204 redirect: None,
205 redirect_referer: None,
206 to: None,
207 };
208 channel.resp_interest(resp_interest);
209 }
210 Ok(())
211 },
212 NextStep::RespInterest(err) => {
213 let resp_interest = RespInterest {
214 session_id: self.session_id().clone(),
215 chunk: self.chunk().clone(),
216 err,
217 redirect: None,
218 redirect_referer: None,
219 to: None,
220 };
221 channel.resp_interest(resp_interest);
222 Ok(())
223 },
224 NextStep::None => Ok(())
225 }
226 }
227
228 pub(super) fn on_piece_control(&self, channel: &Channel, ctrl: &PieceControl) -> BuckyResult<()> {
229 enum NextStep {
230 MergeIndex(Box<dyn ChunkEncoder>, u32, Vec<Range<u32>>),
231 RespInterest(BuckyErrorCode),
232 Notify(StateWaiter),
233 None
234 }
235
236 let next_step = match ctrl.command {
237 PieceControlCommand::Finish => {
238 let mut state = self.0.state.write().unwrap();
239 match &mut state.task_state {
240 TaskStateImpl::Uploading(uploading) => {
241 info!("{} finished", self);
242 let mut waiters = StateWaiter::new();
243 uploading.waiters.transfer_into(&mut waiters);
244 state.task_state = TaskStateImpl::Finished(uploading.uploaded);
245 NextStep::Notify(waiters)
246 },
247 _ => {
248 NextStep::None
249 }
250 }
251 },
252 PieceControlCommand::Cancel => {
253 info!("{} canceled by remote", self);
254 let mut state = self.0.state.write().unwrap();
255 match &mut state.task_state {
256 TaskStateImpl::Uploading(uploading) => {
257 info!("{} finished", self);
258 let mut waiters = StateWaiter::new();
259 uploading.waiters.transfer_into(&mut waiters);
260 state.task_state = TaskStateImpl::Error(BuckyError::new(BuckyErrorCode::Interrupted, "cancel by remote"));
261 NextStep::Notify(waiters)
262 },
263 _ => {
264 NextStep::None
265 }
266 }
267 },
268 PieceControlCommand::Continue => {
269 let state = self.0.state.read().unwrap();
270 match &state.task_state {
271 TaskStateImpl::Uploading(uploading) => {
272 if let Some(max_index) = ctrl.max_index {
273 NextStep::MergeIndex(uploading.encoder.clone_as_encoder(), max_index, ctrl.lost_index.clone().unwrap_or_default())
274 } else {
275 NextStep::None
276 }
277 },
278 TaskStateImpl::Error(err) => NextStep::RespInterest(err.code()),
279 _ => NextStep::None
280 }
281 },
282 _ => unimplemented!()
283 };
284
285 match next_step {
286 NextStep::MergeIndex(encoder, max_index, lost_index) => {
287 if !encoder.merge(max_index, lost_index) {
288 let resp_interest = RespInterest {
289 session_id: self.session_id().clone(),
290 chunk: self.chunk().clone(),
291 err: BuckyErrorCode::WouldBlock,
292 redirect: None,
293 redirect_referer: None,
294 to: None,
295 };
296 channel.resp_interest(resp_interest);
297 }
298 },
299 NextStep::RespInterest(err) => {
300 let resp_interest = RespInterest {
301 session_id: self.session_id().clone(),
302 chunk: self.chunk().clone(),
303 err: err,
304 redirect: None,
305 redirect_referer: None,
306 to: None,
307 };
308 channel.resp_interest(resp_interest);
309 },
310 NextStep::Notify(waiters) => {
311 waiters.wake();
312 },
313 NextStep::None => {
314 }
315 }
316 Ok(())
317 }
318
319 pub async fn wait_finish(&self) -> NdnTaskState {
320 let waiter = match &mut self.0.state.write().unwrap().task_state {
321 TaskStateImpl::Uploading(uploading) => Some(uploading.waiters.new_waiter()),
322 _ => None,
323 };
324
325 if let Some(waiter) = waiter {
326 StateWaiter::wait(waiter, || self.state()).await
327 } else {
328 self.state()
329 }
330 }
331}
332
333
334impl NdnTask for UploadSession {
335 fn clone_as_task(&self) -> Box<dyn NdnTask> {
336 Box::new(self.clone())
337 }
338
339 fn state(&self) -> NdnTaskState {
340 match &self.0.state.read().unwrap().task_state {
341 TaskStateImpl::Uploading(_) => NdnTaskState::Running,
342 TaskStateImpl::Finished(_) => NdnTaskState::Finished,
343 TaskStateImpl::Error(err) => NdnTaskState::Error(err.clone()),
344 }
345 }
346
347 fn control_state(&self) -> NdnTaskControlState {
348 self.0.state.read().unwrap().control_state.clone()
349 }
350
351 fn transfered(&self) -> u64 {
352 match &self.0.state.read().unwrap().task_state {
353 TaskStateImpl::Uploading(uploading) => uploading.uploaded,
354 TaskStateImpl::Finished(uploaded) => *uploaded,
355 TaskStateImpl::Error(_) => 0,
356 }
357 }
358
359 fn cur_speed(&self) -> u32 {
360 match &self.0.state.read().unwrap().task_state {
361 TaskStateImpl::Uploading(uploading) => {
362 uploading.history_speed.latest()
363 },
364 _ => 0
365 }
366 }
367
368 fn history_speed(&self) -> u32 {
369 match &self.0.state.read().unwrap().task_state {
370 TaskStateImpl::Uploading(uploading) => {
371 uploading.history_speed.average()
372 },
373 _ => 0
374 }
375 }
376}
377
378#[async_trait::async_trait]
379impl UploadTask for UploadSession {
380 fn clone_as_upload_task(&self) -> Box<dyn UploadTask> {
381 Box::new(self.clone())
382 }
383
384 fn calc_speed(&self, when: Timestamp) -> u32 {
385 match &mut self.0.state.write().unwrap().task_state {
386 TaskStateImpl::Uploading(uploading) => {
387 let cur_speed = uploading.speed_counter.update(when);
388 uploading.history_speed.update(Some(cur_speed), when);
389 cur_speed
390 },
391 _ => 0
392 }
393 }
394}
395