cyfs_bdt/pn/client/
manager.rs

1use log::*;
2use std::{
3    sync::RwLock, 
4    collections::BTreeSet
5};
6use cyfs_base::*;
7use crate::{
8    protocol::{*, v0::*}, 
9    interface::udp::*, 
10    stack::{WeakStack, Stack}
11};
12
13struct Proxies {
14    active_proxies: BTreeSet<DeviceId>,
15    passive_proxies: BTreeSet<DeviceId>,
16    dump_proxies: BTreeSet<DeviceId>,
17}
18
19impl Proxies {
20    fn new() -> Self {
21        Self {
22            active_proxies: Default::default(), 
23            passive_proxies: Default::default(), 
24            dump_proxies: Default::default(),
25        }
26    }
27}
28
29pub struct ProxyManager {
30    stack: WeakStack,
31    proxies: RwLock<Proxies>
32} 
33
34
35impl std::fmt::Display for ProxyManager {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        write!(f, "ProxyManager")
38    }
39}
40
41impl ProxyManager {
42    pub(crate) fn new(stack: WeakStack) -> Self {
43        Self {
44            stack, 
45            proxies: RwLock::new(Proxies::new()), 
46        }
47    }
48
49    pub fn add_active_proxy(&self, proxy: &Device) {
50        let stack = Stack::from(&self.stack);
51        let proxy_id = proxy.desc().device_id();
52        info!("{} add active proxy {}", self, proxy_id);
53        stack.device_cache().add_static(&proxy_id, proxy);
54        let _ = self.proxies.write().unwrap().active_proxies.insert(proxy_id);
55    }
56
57    pub fn remove_active_proxy(&self, proxy: &DeviceId) -> bool {
58        self.proxies.write().unwrap().active_proxies.remove(proxy)
59    }
60
61    pub fn active_proxies(&self) -> Vec<DeviceId> {
62        self.proxies.read().unwrap().active_proxies.iter().cloned().collect()
63    }
64
65     pub fn add_passive_proxy(&self, proxy: &Device) {
66        let stack = Stack::from(&self.stack);
67        let proxy_id = proxy.desc().device_id();
68        info!("{} add passive proxy {}", self, proxy_id);
69        stack.device_cache().add_static(&proxy_id, proxy);
70        let mut proxies = self.proxies.write().unwrap(); 
71        let _ = proxies.passive_proxies.insert(proxy_id.clone());
72        let _ = proxies.active_proxies.insert(proxy_id);
73    }
74
75    pub fn remove_passive_proxy(&self, proxy: &DeviceId) -> bool {
76        let mut proxies = self.proxies.write().unwrap(); 
77        let _ = proxies.active_proxies.remove(proxy);
78        proxies.passive_proxies.remove(proxy)
79    }
80
81    pub fn passive_proxies(&self) -> Vec<DeviceId> {
82        self.proxies.read().unwrap().passive_proxies.iter().cloned().collect()
83    }
84
85    pub fn add_dump_proxy(&self, proxy: &Device) {
86        let stack = Stack::from(&self.stack);
87        let proxy_id = proxy.desc().device_id();
88        info!("{} add dump proxy {}", self, proxy_id);
89        stack.device_cache().add_static(&proxy_id, proxy);
90        let _ = self.proxies.write().unwrap().dump_proxies.insert(proxy_id);
91    }
92
93    pub fn remove_dump_proxy(&self, proxy: &DeviceId) -> bool {
94        self.proxies.write().unwrap().dump_proxies.remove(proxy)
95    }
96
97    pub fn dump_proxies(&self) -> Vec<DeviceId> {
98        self.proxies.read().unwrap().dump_proxies.iter().cloned().collect()
99    }
100}
101
102impl OnUdpPackageBox for ProxyManager {
103    fn on_udp_package_box(&self, package_box: UdpPackageBox) -> Result<(), BuckyError> {
104        if let Some(first_package) = package_box.as_ref().packages_no_exchange().get(0) {
105            if first_package.cmd_code() == PackageCmdCode::AckProxy {
106                let ack_proxy: &AckProxy = first_package.as_ref();
107                trace!("{} got {:?} from {}", self, ack_proxy, package_box.as_ref().remote());
108                let stack = Stack::from(&self.stack);
109                if let Some(tunnel) = stack.tunnel_manager().container_of(&ack_proxy.to_peer_id) {
110                    let _ = tunnel.on_package(ack_proxy, package_box.as_ref().remote())?;
111                    Ok(())
112                } else {
113                    let err = BuckyError::new(BuckyErrorCode::NotFound, "tunnel not exists");
114                    debug!("{} ignore {:?} from {} for {}", self, ack_proxy, package_box.as_ref().remote(), err);
115                    Err(err)
116                }
117            } else {
118                let err = BuckyError::new(BuckyErrorCode::InvalidInput, format!("package box with first package {:?}", first_package.cmd_code()));
119                debug!("{} ignore package from {} for {}", self, package_box.as_ref().remote(), err);
120                Err(err)
121            }
122        } else {
123            let err = BuckyError::new(BuckyErrorCode::InvalidInput, "package box without package");
124            debug!("{} ignore package from {} for {}", self, package_box.as_ref().remote(), err);
125            Err(err)
126        }
127
128    }
129}