cyfs_bdt/ndn/channel/
manager.rs1use std::{
2 sync::RwLock,
3 collections::{LinkedList, BTreeMap},
4};
5use async_std::{
6 sync::Arc,
7 task,
8};
9use cyfs_base::*;
10use crate::{
11 types::*,
12 tunnel::*,
13 datagram::{self, DatagramTunnelGuard},
14 stack::{WeakStack, Stack}
15};
16use super::super::{
17 types::*
18};
19use super::{
20 channel::{Channel},
21};
22
23struct ChannelGuard {
24 reserving: Option<Timestamp>,
25 channel: Channel
26}
27
28impl ChannelGuard {
29 fn get(&self) -> Channel {
30 self.channel.clone()
31 }
32
33 fn check(&mut self, when: Timestamp) -> bool {
34 if self.channel.ref_count() > 1 {
35 self.reserving = None;
36 true
37 } else if let Some(expire_at) = self.reserving {
38 if when > expire_at {
39 info!("{} expired at {}", self.channel, expire_at);
40 false
41 } else {
42 true
43 }
44 } else {
45 self.reserving = Some(when + self.channel.config().reserve_timeout.as_micros() as u64);
46 true
47 }
48 }
49}
50
51struct Channels {
52 download_history_speed: HistorySpeed,
53 download_cur_speed: u32,
54 upload_history_speed: HistorySpeed,
55 upload_cur_speed: u32,
56 entries: BTreeMap<DeviceId, ChannelGuard>,
57}
58
59
60struct ManagerImpl {
61 stack: WeakStack,
62 command_tunnel: DatagramTunnelGuard,
63 channels: RwLock<Channels>
64}
65
66#[derive(Clone)]
67pub struct ChannelManager(Arc<ManagerImpl>);
68
69impl std::fmt::Display for ChannelManager {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 let stack = Stack::from(&self.0.stack);
72 write!(f, "ChannelManager:{{local:{}}}", stack.local_device_id())
73 }
74}
75
76impl ChannelManager {
77 pub fn new(weak_stack: WeakStack) -> Self {
78 let stack = Stack::from(&weak_stack);
79 let command_tunnel = stack.datagram_manager().bind_reserved(datagram::ReservedVPort::Channel).unwrap();
80 let manager = Self(Arc::new(ManagerImpl {
81 stack: weak_stack.clone(),
82 command_tunnel,
83 channels: RwLock::new(Channels {
84 download_history_speed: HistorySpeed::new(0, stack.config().ndn.channel.history_speed.clone()),
85 download_cur_speed: 0,
86 upload_history_speed: HistorySpeed::new(0, stack.config().ndn.channel.history_speed.clone()),
87 upload_cur_speed: 0,
88 entries: BTreeMap::new()
89 }),
90 }));
91
92 {
93 let manager = manager.clone();
94 task::spawn(async move {
95 manager.recv_command().await;
96 });
97 }
98
99 manager
100 }
101
102
103 pub(crate) fn on_statistic(&self) -> String {
104 let channels = self.0.channels.read().unwrap();
105 let mut download_session_count = 0;
106 let mut upload_session_count = 0;
107 let mut channel_count = 0;
108 for (_, guard) in &channels.entries {
109 channel_count += 1;
110 download_session_count += guard.channel.download_session_count();
111 upload_session_count += guard.channel.upload_session_count();
112 }
113
114 format!("ChannelCount: {}, UploadSessionCount:{}, DownloadSessionCount:{}", channel_count, upload_session_count, download_session_count)
115 }
116
117 pub fn channel_of(&self, remote: &DeviceId) -> Option<Channel> {
118 self.0.channels.read().unwrap().entries.get(remote).map(|guard| guard.get())
119 }
120
121 pub fn create_channel(&self, remote_const: &DeviceDesc) -> BuckyResult<Channel> {
122 let stack = Stack::from(&self.0.stack);
123 let remote = remote_const.device_id();
124 let tunnel = stack.tunnel_manager().create_container(remote_const)?;
125 let mut channels = self.0.channels.write().unwrap();
126
127 Ok(channels.entries.get_mut(&remote).map(|guard| guard.get())
128 .map_or_else(|| {
129 info!("{} create channel on {}", self, remote);
130
131 let channel = Channel::new(
132 self.0.stack.clone(),
133 tunnel,
134 self.0.command_tunnel.clone());
135 channels.entries.insert(remote, ChannelGuard { reserving: None, channel: channel.clone() });
136
137 channel
138 }, |c| c))
139 }
140
141 pub fn on_schedule(&self, when: Timestamp) {
142 let mut channels = self.0.channels.write().unwrap();
143 let mut download_cur_speed = 0;
144 let mut download_session_count = 0;
145 let mut upload_cur_speed = 0;
146 let mut upload_session_count = 0;
147
148 let mut remove = LinkedList::new();
149 for (remote, guard) in &mut channels.entries {
150 let (d, u) = guard.channel.calc_speed(when);
151 download_cur_speed += d;
152 upload_cur_speed += u;
153
154 download_session_count += guard.channel.download_session_count();
155 upload_session_count += guard.channel.upload_session_count();
156
157 if !guard.check(when) {
158 remove.push_back(remote.clone());
159 }
160 }
161 for remote in remove {
162 info!("{} will remove channel for not used, channel={}", self, remote);
163 channels.entries.remove(&remote);
164 }
165
166 channels.download_cur_speed = download_cur_speed;
167 channels.upload_cur_speed = upload_cur_speed;
168
169 if download_session_count > 0 {
170 channels.download_history_speed.update(Some(download_cur_speed), when);
171 } else {
172 channels.download_history_speed.update(None, when);
173 }
174
175 if upload_session_count > 0 {
176 channels.upload_history_speed.update(Some(upload_cur_speed), when);
177 } else {
178 channels.upload_history_speed.update(None, when);
179 }
180
181 }
182
183 fn download_cur_speed(&self) -> u32 {
184 self.0.channels.read().unwrap().download_cur_speed
185 }
186
187 fn download_history_speed(&self) -> u32 {
188 self.0.channels.read().unwrap().download_history_speed.average()
189 }
190
191 fn upload_cur_speed(&self) -> u32 {
192 self.0.channels.read().unwrap().upload_cur_speed
193 }
194
195 fn upload_history_speed(&self) -> u32 {
196 self.0.channels.read().unwrap().upload_history_speed.average()
197 }
198
199 pub(crate) fn on_time_escape(&self, now: Timestamp) {
200 let channels: Vec<Channel> = self.0.channels.read().unwrap().entries.values().map(|guard| guard.get()).collect();
201 for channel in channels {
202 channel.on_time_escape(now);
203 }
204 }
205
206 async fn recv_command(&self) {
207 let stack = Stack::from(&self.0.stack);
208 loop {
209 match self.0.command_tunnel.recv_v().await {
210 Ok(datagrams) => {
211 for datagram in datagrams {
212 let channel = if let Some(channel) = self.channel_of(&datagram.source.remote) {
213 channel
214 } else {
215 let tunnel = stack.tunnel_manager().container_of(&datagram.source.remote).unwrap();
216 self.create_channel(tunnel.remote_const()).unwrap()
217 };
218 let _ = channel.on_datagram(datagram);
219 }
220 },
221 Err(_err) => {
222
223 }
224 }
225 }
226 }
227
228 pub fn on_raw_data(&self, data: &[u8], context: (&TunnelContainer, DynamicTunnel)) -> Result<(), BuckyError> {
229 let (container, tunnel) = context;
230 let channel = self.channel_of(container.remote()).ok_or_else(| | BuckyError::new(BuckyErrorCode::NotFound, "channel not exists"))?;
231 channel.on_raw_data(data, tunnel)
232 }
233}