cyfs_bdt/pn/client/
manager.rs1use log::*;
2use std::{
3 sync::RwLock,
4 collections::BTreeSet
5};
6use cyfs_base::*;
7use crate::{
8 protocol::{*, v0::*},
9 interface::udp::*,
10 stack::{WeakStack, Stack}
11};
12
13struct Proxies {
14 active_proxies: BTreeSet<DeviceId>,
15 passive_proxies: BTreeSet<DeviceId>,
16 dump_proxies: BTreeSet<DeviceId>,
17}
18
19impl Proxies {
20 fn new() -> Self {
21 Self {
22 active_proxies: Default::default(),
23 passive_proxies: Default::default(),
24 dump_proxies: Default::default(),
25 }
26 }
27}
28
29pub struct ProxyManager {
30 stack: WeakStack,
31 proxies: RwLock<Proxies>
32}
33
34
35impl std::fmt::Display for ProxyManager {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 write!(f, "ProxyManager")
38 }
39}
40
41impl ProxyManager {
42 pub(crate) fn new(stack: WeakStack) -> Self {
43 Self {
44 stack,
45 proxies: RwLock::new(Proxies::new()),
46 }
47 }
48
49 pub fn add_active_proxy(&self, proxy: &Device) {
50 let stack = Stack::from(&self.stack);
51 let proxy_id = proxy.desc().device_id();
52 info!("{} add active proxy {}", self, proxy_id);
53 stack.device_cache().add_static(&proxy_id, proxy);
54 let _ = self.proxies.write().unwrap().active_proxies.insert(proxy_id);
55 }
56
57 pub fn remove_active_proxy(&self, proxy: &DeviceId) -> bool {
58 self.proxies.write().unwrap().active_proxies.remove(proxy)
59 }
60
61 pub fn active_proxies(&self) -> Vec<DeviceId> {
62 self.proxies.read().unwrap().active_proxies.iter().cloned().collect()
63 }
64
65 pub fn add_passive_proxy(&self, proxy: &Device) {
66 let stack = Stack::from(&self.stack);
67 let proxy_id = proxy.desc().device_id();
68 info!("{} add passive proxy {}", self, proxy_id);
69 stack.device_cache().add_static(&proxy_id, proxy);
70 let mut proxies = self.proxies.write().unwrap();
71 let _ = proxies.passive_proxies.insert(proxy_id.clone());
72 let _ = proxies.active_proxies.insert(proxy_id);
73 }
74
75 pub fn remove_passive_proxy(&self, proxy: &DeviceId) -> bool {
76 let mut proxies = self.proxies.write().unwrap();
77 let _ = proxies.active_proxies.remove(proxy);
78 proxies.passive_proxies.remove(proxy)
79 }
80
81 pub fn passive_proxies(&self) -> Vec<DeviceId> {
82 self.proxies.read().unwrap().passive_proxies.iter().cloned().collect()
83 }
84
85 pub fn add_dump_proxy(&self, proxy: &Device) {
86 let stack = Stack::from(&self.stack);
87 let proxy_id = proxy.desc().device_id();
88 info!("{} add dump proxy {}", self, proxy_id);
89 stack.device_cache().add_static(&proxy_id, proxy);
90 let _ = self.proxies.write().unwrap().dump_proxies.insert(proxy_id);
91 }
92
93 pub fn remove_dump_proxy(&self, proxy: &DeviceId) -> bool {
94 self.proxies.write().unwrap().dump_proxies.remove(proxy)
95 }
96
97 pub fn dump_proxies(&self) -> Vec<DeviceId> {
98 self.proxies.read().unwrap().dump_proxies.iter().cloned().collect()
99 }
100}
101
102impl OnUdpPackageBox for ProxyManager {
103 fn on_udp_package_box(&self, package_box: UdpPackageBox) -> Result<(), BuckyError> {
104 if let Some(first_package) = package_box.as_ref().packages_no_exchange().get(0) {
105 if first_package.cmd_code() == PackageCmdCode::AckProxy {
106 let ack_proxy: &AckProxy = first_package.as_ref();
107 trace!("{} got {:?} from {}", self, ack_proxy, package_box.as_ref().remote());
108 let stack = Stack::from(&self.stack);
109 if let Some(tunnel) = stack.tunnel_manager().container_of(&ack_proxy.to_peer_id) {
110 let _ = tunnel.on_package(ack_proxy, package_box.as_ref().remote())?;
111 Ok(())
112 } else {
113 let err = BuckyError::new(BuckyErrorCode::NotFound, "tunnel not exists");
114 debug!("{} ignore {:?} from {} for {}", self, ack_proxy, package_box.as_ref().remote(), err);
115 Err(err)
116 }
117 } else {
118 let err = BuckyError::new(BuckyErrorCode::InvalidInput, format!("package box with first package {:?}", first_package.cmd_code()));
119 debug!("{} ignore package from {} for {}", self, package_box.as_ref().remote(), err);
120 Err(err)
121 }
122 } else {
123 let err = BuckyError::new(BuckyErrorCode::InvalidInput, "package box without package");
124 debug!("{} ignore package from {} for {}", self, package_box.as_ref().remote(), err);
125 Err(err)
126 }
127
128 }
129}