1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use std::{
    sync::RwLock, 
    collections::LinkedList
};
use async_std::{
    sync::Arc
};
use cyfs_base::*;
use super::super::{
    scheduler::*, 
    channel::UploadSession, 
    channel::PieceSessionType,
};
use super::{
    encode::*,
    view::{ChunkView}
};

struct UploaderImpl {
    view: ChunkView,  
    resource: ResourceManager,
    // 所有channel应当共享raptor encoder 
    // raptor_encoder: RaptorEncoder, 
    sessions: RwLock<LinkedList<UploadSession>>, 
}


// Chunk粒度的所有上传任务;不同channel上的所有session;channel上同chunk应当只有唯一session
// TODO: 这里应当有子 scheduler实现, 在session粒度调度上传
#[derive(Clone)]
pub struct ChunkUploader(Arc<UploaderImpl>);

impl std::fmt::Display for ChunkUploader {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "ChunkUploader:{{chunk:{}}}", self.chunk())
    }
}

impl ChunkUploader {
    pub fn new(
        view: ChunkView,  
        owner: ResourceManager
    ) -> Self {
        Self(Arc::new(UploaderImpl {
            view, 
            resource: ResourceManager::new(Some(owner)), 
            sessions: RwLock::new(LinkedList::new()), 
            // raptor_encoder
        }))
    }

    pub fn chunk(&self) -> &ChunkId {
        &self.0.view.chunk()
    }

    pub fn resource(&self) -> &ResourceManager {
        &self.0.resource
    }

    pub fn schedule_state(&self) -> TaskState {
        let sessions = self.0.sessions.read().unwrap();
        Self::collect_state(&*sessions)
    }

    fn collect_state(sessions: &LinkedList<UploadSession>) -> TaskState {
        for session in sessions {
            let state = session.schedule_state();
            match state {
                TaskState::Finished => continue, 
                TaskState::Canceled(_) => continue, 
                _ => {
                    return TaskState::Running(0);
                }
            }
        }
        TaskState::Finished
    }

    pub fn add_session(&self, session: UploadSession) -> BuckyResult<()> {
        info!("{} try add new session {}", self, session);
        {
            let mut sessions = self.0.sessions.write().unwrap();
            if sessions.iter().find(|s| 
                session.channel().remote().eq(s.channel().remote())
                && session.chunk().eq(s.chunk())
                && session.session_id().eq(s.session_id())).is_some() {
                info!("session {} exists in {}, ignore it", session, self);
                return Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "session exists"));
            }
            sessions.push_front(session.clone());
            // TODO: 根据资源管理器调度是不是开始
            // TODO: 根据session type创建 encoder
            
        }
        let encoder = match *session.piece_type() {
            PieceSessionType::RaptorA(_) | PieceSessionType::RaptorB(_) => TypedChunkEncoder::Raptor(self.0.view.raptor_encoder()),
            PieceSessionType::Stream(_) => TypedChunkEncoder::Range(RangeEncoder::from_reader(self.0.view.reader().unwrap(), self.chunk())), 
            _ => unreachable!()
        };
        session.start(encoder);

        Ok(())
    }
}

impl Scheduler for ChunkUploader {
    fn collect_resource_usage(&self) {
        let mut sessions = self.0.sessions.write().unwrap();
        let mut remain = LinkedList::new();
        loop {
            if let Some(session) = sessions.pop_front() {
                let state = session.schedule_state();
                match state {
                    TaskState::Finished => {
                        let _ = self.resource().remove_child(session.resource());
                        info!("{} remove session {} for finished", self, session);
                    },  
                    TaskState::Canceled(_) => {
                        let _ = self.resource().remove_child(session.resource());
                        info!("{} remove session {} for canceled", self, session);
                    }, 
                    _ => {
                        remain.push_back(session);
                    }
                }
            } else {
                break;
            }
        }
        *sessions = remain;
    }

    fn schedule_resource(&self) {

    }

    fn apply_scheduled_resource(&self) {
        
    }
}