1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::sync::Arc;
use async_trait::async_trait;
use crate::dht::Did;
use crate::err::Error;
use crate::err::Result;
use crate::swarm::Swarm;
use crate::transports::Transport;
use crate::types::channel::Channel as ChannelTrait;
use crate::types::ice_transport::IceTransportInterface;
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait TransportManager {
type Transport;
fn get_transports(&self) -> Vec<(Did, Self::Transport)>;
fn get_dids(&self) -> Vec<Did>;
fn get_transport(&self, did: Did) -> Option<Self::Transport>;
fn remove_transport(&self, did: Did) -> Option<(Did, Self::Transport)>;
fn get_transport_numbers(&self) -> usize;
async fn get_and_check_transport(&self, did: Did) -> Option<Self::Transport>;
async fn new_transport(&self) -> Result<Self::Transport>;
async fn register(&self, did: Did, trans: Self::Transport) -> Result<()>;
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl TransportManager for Swarm {
type Transport = Arc<Transport>;
async fn new_transport(&self) -> Result<Self::Transport> {
let event_sender = self.transport_event_channel.sender();
let mut ice_transport = Transport::new(event_sender);
ice_transport
.start(self.ice_servers.clone(), self.external_address.clone())
.await?
.apply_callback()
.await?;
Ok(Arc::new(ice_transport))
}
async fn register(&self, did: Did, trans: Self::Transport) -> Result<()> {
if trans.is_disconnected().await {
return Err(Error::InvalidTransport);
}
tracing::info!("register transport {:?}", trans.id.clone());
#[cfg(test)]
{
println!("register transport {:?}", trans.id.clone());
}
let id = trans.id;
if let Some(t) = self.transports.get(&did) {
if t.is_connected().await && !trans.is_connected().await {
return Err(Error::InvalidTransport);
}
if t.id != id {
self.transports.set(&did, trans);
if let Err(e) = t.close().await {
tracing::error!("failed to close previous while registering {:?}", e);
return Err(Error::SwarmToClosePrevTransport(format!("{:?}", e)));
}
tracing::debug!("replace and closed previous connection! {:?}", t.id);
}
} else {
self.transports.set(&did, trans);
}
Ok(())
}
async fn get_and_check_transport(&self, did: Did) -> Option<Self::Transport> {
match self.get_transport(did) {
Some(t) => {
if t.is_disconnected().await {
tracing::debug!(
"[get_and_check_transport] transport {:?} is not connected will be drop",
t.id
);
if t.close().await.is_err() {
tracing::error!("Failed on close transport");
};
None
} else {
Some(t)
}
}
None => None,
}
}
fn get_transport(&self, did: Did) -> Option<Self::Transport> {
self.transports.get(&did)
}
fn remove_transport(&self, did: Did) -> Option<(Did, Self::Transport)> {
self.transports.remove(&did)
}
fn get_transport_numbers(&self) -> usize {
self.transports.len()
}
fn get_dids(&self) -> Vec<Did> {
self.transports.keys()
}
fn get_transports(&self) -> Vec<(Did, Self::Transport)> {
self.transports.items()
}
}