1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use log::*;
use std::{
sync::RwLock,
collections::BTreeSet
};
use cyfs_base::*;
use crate::{
protocol::*,
interface::udp::*,
stack::{WeakStack, Stack}
};
struct Proxies {
active_proxies: BTreeSet<DeviceId>,
passive_proxies: BTreeSet<DeviceId>
}
impl Proxies {
fn new() -> Self {
Self {
active_proxies: Default::default(),
passive_proxies: Default::default(),
}
}
}
pub struct ProxyManager {
stack: WeakStack,
proxies: RwLock<Proxies>
}
impl std::fmt::Display for ProxyManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ProxyManager")
}
}
impl ProxyManager {
pub(crate) fn new(stack: WeakStack) -> Self {
Self {
stack,
proxies: RwLock::new(Proxies::new()),
}
}
pub fn add_active_proxy(&self, proxy: &Device) {
let stack = Stack::from(&self.stack);
let proxy_id = proxy.desc().device_id();
info!("{} add active proxy {}", self, proxy_id);
stack.device_cache().add(&proxy_id, proxy);
let _ = self.proxies.write().unwrap().active_proxies.insert(proxy_id);
}
pub fn remove_active_proxy(&self, proxy: &DeviceId) -> bool {
self.proxies.write().unwrap().active_proxies.remove(proxy)
}
pub fn active_proxies(&self) -> Vec<DeviceId> {
self.proxies.read().unwrap().active_proxies.iter().cloned().collect()
}
pub fn add_passive_proxy(&self, proxy: &Device) {
let stack = Stack::from(&self.stack);
let proxy_id = proxy.desc().device_id();
info!("{} add passive proxy {}", self, proxy_id);
stack.device_cache().add(&proxy_id, proxy);
let mut proxies = self.proxies.write().unwrap();
let _ = proxies.passive_proxies.insert(proxy_id.clone());
let _ = proxies.active_proxies.insert(proxy_id);
}
pub fn remove_passive_proxy(&self, proxy: &DeviceId) -> bool {
let mut proxies = self.proxies.write().unwrap();
let _ = proxies.active_proxies.remove(proxy);
proxies.passive_proxies.remove(proxy)
}
pub fn passive_proxies(&self) -> Vec<DeviceId> {
self.proxies.read().unwrap().passive_proxies.iter().cloned().collect()
}
pub async fn sync_passive_proxies(&self) {
let stack = Stack::from(&self.stack);
stack.reset_local().await;
stack.sn_client().resend_ping();
}
}
impl OnUdpPackageBox for ProxyManager {
fn on_udp_package_box(&self, package_box: UdpPackageBox) -> Result<(), BuckyError> {
if let Some(first_package) = package_box.as_ref().packages_no_exchange().get(0) {
if first_package.cmd_code() == PackageCmdCode::AckProxy {
let ack_proxy: &AckProxy = first_package.as_ref();
trace!("{} got {:?} from {}", self, ack_proxy, package_box.as_ref().remote());
let stack = Stack::from(&self.stack);
if let Some(tunnel) = stack.tunnel_manager().container_of(&ack_proxy.to_peer_id) {
let _ = tunnel.on_package(ack_proxy, package_box.as_ref().remote())?;
Ok(())
} else {
let err = BuckyError::new(BuckyErrorCode::NotFound, "tunnel not exists");
debug!("{} ignore {:?} from {} for {}", self, ack_proxy, package_box.as_ref().remote(), err);
Err(err)
}
} else {
let err = BuckyError::new(BuckyErrorCode::InvalidInput, format!("package box with first package {:?}", first_package.cmd_code()));
debug!("{} ignore package from {} for {}", self, package_box.as_ref().remote(), err);
Err(err)
}
} else {
let err = BuckyError::new(BuckyErrorCode::InvalidInput, "package box without package");
debug!("{} ignore package from {} for {}", self, package_box.as_ref().remote(), err);
Err(err)
}
}
}