cyfs_bdt/ndn/channel/
manager.rs

1use std::{
2    sync::RwLock, 
3    collections::{LinkedList, BTreeMap}, 
4};
5use async_std::{
6    sync::Arc, 
7    task, 
8};
9use cyfs_base::*;
10use crate::{
11    types::*, 
12    tunnel::*, 
13    datagram::{self, DatagramTunnelGuard},
14    stack::{WeakStack, Stack}
15};
16use super::super::{
17    types::*
18};
19use super::{
20    channel::{Channel},
21};
22
23struct ChannelGuard {
24    reserving: Option<Timestamp>, 
25    channel: Channel
26}
27
28impl ChannelGuard {
29    fn get(&self) -> Channel {
30        self.channel.clone()
31    }
32
33    fn check(&mut self, when: Timestamp) -> bool {
34        if self.channel.ref_count() > 1 {
35            self.reserving = None;
36            true
37        } else if let Some(expire_at) = self.reserving {
38            if when > expire_at {
39                info!("{} expired at {}", self.channel, expire_at);
40                false
41            } else {
42                true
43            }
44        } else {
45            self.reserving = Some(when + self.channel.config().reserve_timeout.as_micros() as u64);
46            true
47        }
48    }
49}
50
51struct Channels {
52    download_history_speed: HistorySpeed, 
53    download_cur_speed: u32, 
54    upload_history_speed: HistorySpeed, 
55    upload_cur_speed: u32, 
56    entries: BTreeMap<DeviceId, ChannelGuard>, 
57}
58
59
60struct ManagerImpl {
61    stack: WeakStack, 
62    command_tunnel: DatagramTunnelGuard, 
63    channels: RwLock<Channels>
64}
65
66#[derive(Clone)]
67pub struct ChannelManager(Arc<ManagerImpl>);
68
69impl std::fmt::Display for ChannelManager {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        let stack = Stack::from(&self.0.stack);
72        write!(f, "ChannelManager:{{local:{}}}", stack.local_device_id())
73    }
74}
75
76impl ChannelManager {
77    pub fn new(weak_stack: WeakStack) -> Self {
78        let stack = Stack::from(&weak_stack);
79        let command_tunnel = stack.datagram_manager().bind_reserved(datagram::ReservedVPort::Channel).unwrap();
80        let manager = Self(Arc::new(ManagerImpl {
81            stack: weak_stack.clone(), 
82            command_tunnel, 
83            channels: RwLock::new(Channels {
84                download_history_speed: HistorySpeed::new(0, stack.config().ndn.channel.history_speed.clone()), 
85                download_cur_speed: 0, 
86                upload_history_speed: HistorySpeed::new(0, stack.config().ndn.channel.history_speed.clone()), 
87                upload_cur_speed: 0, 
88                entries: BTreeMap::new()
89            }), 
90        }));
91        
92        {
93            let manager = manager.clone();
94            task::spawn(async move {
95                manager.recv_command().await;
96            });
97        }
98
99        manager
100    }
101
102
103    pub(crate) fn on_statistic(&self) -> String {
104        let channels = self.0.channels.read().unwrap();
105        let mut download_session_count = 0;
106        let mut upload_session_count = 0;
107        let mut channel_count = 0;
108        for (_, guard) in &channels.entries {
109            channel_count += 1;
110            download_session_count += guard.channel.download_session_count();
111            upload_session_count += guard.channel.upload_session_count();
112        }
113
114        format!("ChannelCount: {}, UploadSessionCount:{}, DownloadSessionCount:{}", channel_count, upload_session_count, download_session_count)
115    }
116
117    pub fn channel_of(&self, remote: &DeviceId) -> Option<Channel> {
118        self.0.channels.read().unwrap().entries.get(remote).map(|guard| guard.get())
119    }
120
121    pub fn create_channel(&self, remote_const: &DeviceDesc) -> BuckyResult<Channel> {
122        let stack = Stack::from(&self.0.stack);
123        let remote = remote_const.device_id();
124        let tunnel = stack.tunnel_manager().create_container(remote_const)?;
125        let mut channels = self.0.channels.write().unwrap();
126
127        Ok(channels.entries.get_mut(&remote).map(|guard| guard.get())
128        .map_or_else(|| {
129            info!("{} create channel on {}", self, remote);
130
131            let channel = Channel::new(
132                self.0.stack.clone(), 
133                tunnel, 
134                self.0.command_tunnel.clone());
135            channels.entries.insert(remote, ChannelGuard { reserving: None, channel: channel.clone() });
136
137            channel
138        }, |c| c))
139    } 
140
141    pub fn on_schedule(&self, when: Timestamp) {
142        let mut channels = self.0.channels.write().unwrap();
143        let mut download_cur_speed = 0;
144        let mut download_session_count = 0;
145        let mut upload_cur_speed = 0;
146        let mut upload_session_count = 0;
147
148        let mut remove = LinkedList::new();
149        for (remote, guard) in &mut channels.entries {
150            let (d, u) = guard.channel.calc_speed(when);
151            download_cur_speed += d;
152            upload_cur_speed += u;
153
154            download_session_count += guard.channel.download_session_count();
155            upload_session_count += guard.channel.upload_session_count();
156
157            if !guard.check(when) {
158                remove.push_back(remote.clone());
159            }
160        }
161        for remote in remove {
162            info!("{} will remove channel for not used, channel={}", self, remote);
163            channels.entries.remove(&remote);
164        }
165
166        channels.download_cur_speed = download_cur_speed;
167        channels.upload_cur_speed = upload_cur_speed;
168
169        if download_session_count > 0 {
170            channels.download_history_speed.update(Some(download_cur_speed), when);
171        } else {
172            channels.download_history_speed.update(None, when);
173        }
174
175        if upload_session_count > 0 {
176            channels.upload_history_speed.update(Some(upload_cur_speed), when);
177        } else {
178            channels.upload_history_speed.update(None, when);
179        }
180
181    }
182
183    fn download_cur_speed(&self) -> u32 {
184        self.0.channels.read().unwrap().download_cur_speed
185    }
186
187    fn download_history_speed(&self) -> u32 {
188        self.0.channels.read().unwrap().download_history_speed.average()
189    }
190
191    fn upload_cur_speed(&self) -> u32 {
192        self.0.channels.read().unwrap().upload_cur_speed
193    }
194
195    fn upload_history_speed(&self) -> u32 {
196        self.0.channels.read().unwrap().upload_history_speed.average()
197    }
198
199    pub(crate) fn on_time_escape(&self, now: Timestamp) {
200        let channels: Vec<Channel> = self.0.channels.read().unwrap().entries.values().map(|guard| guard.get()).collect();
201        for channel in channels {
202            channel.on_time_escape(now);
203        }
204    }
205
206    async fn recv_command(&self) {
207        let stack = Stack::from(&self.0.stack);
208        loop {
209            match self.0.command_tunnel.recv_v().await {
210                Ok(datagrams) => {
211                    for datagram in datagrams {
212                        let channel = if let Some(channel) = self.channel_of(&datagram.source.remote) {
213                            channel
214                        } else {
215                            let tunnel = stack.tunnel_manager().container_of(&datagram.source.remote).unwrap();
216                            self.create_channel(tunnel.remote_const()).unwrap()
217                        };
218                        let _ = channel.on_datagram(datagram);
219                    }
220                }, 
221                Err(_err) => {
222                    
223                }
224            }
225        }
226    }
227
228    pub fn on_raw_data(&self, data: &[u8], context: (&TunnelContainer, DynamicTunnel)) -> Result<(), BuckyError> {
229        let (container, tunnel) = context;
230        let channel = self.channel_of(container.remote()).ok_or_else(| | BuckyError::new(BuckyErrorCode::NotFound, "channel not exists"))?;
231        channel.on_raw_data(data, tunnel)
232    }
233}