cyfs_lib/requestor/
bdt.rs1use super::requestor::*;
2use cyfs_base::*;
3use cyfs_bdt::*;
4
5use http_types::{Request, Response};
6use std::sync::Mutex;
7use std::sync::{RwLock, Arc};
8
9struct DeviceConnectWithSNCache {
10 devices: Mutex<lru_time_cache::LruCache<DeviceId, ()>>,
11}
12
13impl DeviceConnectWithSNCache {
14 pub fn new() -> Self {
15 Self {
16 devices: Mutex::new(lru_time_cache::LruCache::with_expiry_duration_and_capacity(
17 std::time::Duration::from_secs(60 * 10),
18 256,
19 )),
20 }
21 }
22
23 pub fn try_connect_via_sn(device_id: &DeviceId) -> bool {
24 static D: once_cell::sync::OnceCell<DeviceConnectWithSNCache> = once_cell::sync::OnceCell::new();
25 D.get_or_init(|| {
26 DeviceConnectWithSNCache::new()
27 }).try_connect_via_sn_impl(device_id)
28 }
29
30 fn try_connect_via_sn_impl(&self, device_id: &DeviceId) -> bool {
31 let mut list = self.devices.lock().unwrap();
32
33 list.iter();
35
36 if let Some(_) = list.peek(device_id) {
37 return false;
38 }
39
40 list.insert(device_id.to_owned(), ());
41
42 true
43 }
44}
45
46
47#[derive(Clone)]
48pub struct BdtHttpRequestor {
49 bdt_stack: StackGuard,
50 device_id: DeviceId,
51 device: Arc<RwLock<Device>>,
52 vport: u16,
53}
54
55impl BdtHttpRequestor {
56 pub fn new(bdt_stack: StackGuard, device: Device, vport: u16) -> Self {
57 Self {
58 bdt_stack,
59 device_id: device.desc().device_id(),
60 device: Arc::new(RwLock::new(device)),
61 vport,
62 }
63 }
64
65 pub fn device(&self) -> Device {
66 self.device.read().unwrap().clone()
67 }
68
69 pub fn update_device(&self, device: Device) {
70 assert_eq!(device.desc().device_id(), self.device_id);
71 *self.device.write().unwrap() = device;
72 }
73
74 pub fn has_wan_endpoint(&self) -> bool {
75 self.device.read().unwrap().has_wan_endpoint()
76 }
77
78 pub fn device_id(&self) -> &DeviceId {
79 &self.device_id
80 }
81
82 async fn connect(&self, with_remote_desc: bool) -> BuckyResult<StreamGuard> {
83 let begin = std::time::Instant::now();
84
85 let device = self.device();
86
87 let build_params = BuildTunnelParams {
88 remote_const: device.desc().clone(),
89 remote_sn: None,
90 remote_desc: if with_remote_desc {
91 Some(device)
92 } else {
93 None
94 },
95 };
96
97 let bdt_stream = self
98 .bdt_stack
99 .stream_manager()
100 .connect(self.vport, Vec::new(), build_params)
101 .await
102 .map_err(|e| {
103 let msg = format!(
104 "connect to {} failed! with_desc={}, during={}ms, {}",
105 self.remote_addr(),
106 with_remote_desc,
107 begin.elapsed().as_millis(),
108 e
109 );
110 warn!("{}", msg);
111 BuckyError::new(BuckyErrorCode::ConnectFailed, msg)
112 })?;
113
114 Ok(bdt_stream)
115 }
116}
117
118#[async_trait::async_trait]
119impl HttpRequestor for BdtHttpRequestor {
120 async fn request_ext(
121 &self,
122 req: &mut Option<Request>,
123 conn_info: Option<&mut HttpRequestConnectionInfo>,
124 ) -> BuckyResult<Response> {
125 debug!(
126 "will create bdt stream connection to {}",
127 self.remote_addr()
128 );
129
130 let begin = std::time::Instant::now();
131
132 let bdt_stream = match self.connect(true).await {
133 Ok(stream) => stream,
134 Err(e) => {
135 if !self.has_wan_endpoint() {
136 return Err(e);
137 }
138
139 if !DeviceConnectWithSNCache::try_connect_via_sn(&self.device_id) {
140 return Err(e);
141 }
142
143 info!("now will retry connect via sn: device={}", self.device_id);
144 self.connect(false).await?
145 }
146 };
147
148 let seq = bdt_stream.sequence();
149 if let Some(conn_info) = conn_info {
150 let local_addr = bdt_stream.local_ep().ok_or_else(|| {
151 let msg = format!("get local_ep from bdt stream but empty! seq={:?}", seq);
152 error!("{}", msg);
153 BuckyError::new(BuckyErrorCode::NotConnected, msg)
154 })?;
155
156 let remote_addr = bdt_stream.remote_ep().ok_or_else(|| {
157 let msg = format!("get remote_ep from bdt stream but empty! seq={:?}", seq);
158 error!("{}", msg);
159 BuckyError::new(BuckyErrorCode::NotConnected, msg)
160 })?;
161
162 *conn_info = HttpRequestConnectionInfo::Bdt((
163 local_addr,
164 remote_addr,
165 ));
166 }
167
168 debug!(
169 "bdt connect to {} success, seq={:?}, during={}ms",
170 self.remote_addr(),
171 seq,
172 begin.elapsed().as_millis(),
173 );
174 let req = req.take().unwrap();
177 let req = self.add_default_headers(req);
178
179 match async_h1::connect(bdt_stream, req).await {
180 Ok(resp) => {
181 info!(
182 "http-bdt request to {} success! during={}ms, seq={:?}",
183 self.remote_addr(),
184 begin.elapsed().as_millis(),
185 seq,
186 );
187 Ok(resp)
188 }
189 Err(e) => {
190 let msg = format!(
191 "http-bdt request to {} failed! during={}ms, seq={:?}, {}",
192 self.remote_addr(),
193 begin.elapsed().as_millis(),
194 seq,
195 e,
196 );
197 error!("{}", msg);
198 Err(BuckyError::from(msg))
199 }
200 }
201 }
202
203 fn remote_addr(&self) -> String {
204 format!("{}:{}", self.device_id, self.vport)
205 }
206
207 fn remote_device(&self) -> Option<DeviceId> {
208 Some(self.device_id.clone())
209 }
210
211 fn clone_requestor(&self) -> Box<dyn HttpRequestor> {
212 Box::new(self.clone())
213 }
214
215 async fn stop(&self) {
216 }
218}