cyfs_lib/requestor/
bdt.rs

1use super::requestor::*;
2use cyfs_base::*;
3use cyfs_bdt::*;
4
5use http_types::{Request, Response};
6use std::sync::Mutex;
7use std::sync::{RwLock, Arc};
8
9struct DeviceConnectWithSNCache {
10    devices: Mutex<lru_time_cache::LruCache<DeviceId, ()>>,
11}
12
13impl DeviceConnectWithSNCache {
14    pub fn new() -> Self {
15        Self {
16            devices: Mutex::new(lru_time_cache::LruCache::with_expiry_duration_and_capacity(
17                std::time::Duration::from_secs(60 * 10),
18                256,
19            )),
20        }
21    }
22
23    pub fn try_connect_via_sn(device_id: &DeviceId) -> bool {
24        static D: once_cell::sync::OnceCell<DeviceConnectWithSNCache> = once_cell::sync::OnceCell::new();
25        D.get_or_init(|| {
26            DeviceConnectWithSNCache::new()
27        }).try_connect_via_sn_impl(device_id)
28    }
29    
30    fn try_connect_via_sn_impl(&self, device_id: &DeviceId) -> bool {
31        let mut list = self.devices.lock().unwrap();
32
33        // force remove expired items
34        list.iter();
35
36        if let Some(_) = list.peek(device_id) {
37            return false;
38        }
39
40        list.insert(device_id.to_owned(), ());
41
42        true
43    }
44}
45
46
47#[derive(Clone)]
48pub struct BdtHttpRequestor {
49    bdt_stack: StackGuard,
50    device_id: DeviceId,
51    device: Arc<RwLock<Device>>,
52    vport: u16,
53}
54
55impl BdtHttpRequestor {
56    pub fn new(bdt_stack: StackGuard, device: Device, vport: u16) -> Self {
57        Self {
58            bdt_stack,
59            device_id: device.desc().device_id(),
60            device: Arc::new(RwLock::new(device)),
61            vport,
62        }
63    }
64
65    pub fn device(&self) -> Device {
66        self.device.read().unwrap().clone()
67    }
68
69    pub fn update_device(&self, device: Device) {
70        assert_eq!(device.desc().device_id(), self.device_id);
71        *self.device.write().unwrap() = device;
72    }
73    
74    pub fn has_wan_endpoint(&self) -> bool {
75        self.device.read().unwrap().has_wan_endpoint()
76    }
77
78    pub fn device_id(&self) -> &DeviceId {
79        &self.device_id
80    }
81
82    async fn connect(&self, with_remote_desc: bool) -> BuckyResult<StreamGuard> {
83        let begin = std::time::Instant::now();
84
85        let device = self.device();
86
87        let build_params = BuildTunnelParams {
88            remote_const: device.desc().clone(),
89            remote_sn: None,
90            remote_desc: if with_remote_desc {
91                Some(device)
92            } else {
93                None
94            },
95        };
96
97        let bdt_stream = self
98            .bdt_stack
99            .stream_manager()
100            .connect(self.vport, Vec::new(), build_params)
101            .await
102            .map_err(|e| {
103                let msg = format!(
104                    "connect to {} failed! with_desc={}, during={}ms, {}",
105                    self.remote_addr(),
106                    with_remote_desc,
107                    begin.elapsed().as_millis(),
108                    e
109                );
110                warn!("{}", msg);
111                BuckyError::new(BuckyErrorCode::ConnectFailed, msg)
112            })?;
113
114        Ok(bdt_stream)
115    }
116}
117
118#[async_trait::async_trait]
119impl HttpRequestor for BdtHttpRequestor {
120    async fn request_ext(
121        &self,
122        req: &mut Option<Request>,
123        conn_info: Option<&mut HttpRequestConnectionInfo>,
124    ) -> BuckyResult<Response> {
125        debug!(
126            "will create bdt stream connection to {}",
127            self.remote_addr()
128        );
129
130        let begin = std::time::Instant::now();
131
132        let bdt_stream = match self.connect(true).await {
133            Ok(stream) => stream,
134            Err(e) => {
135                if !self.has_wan_endpoint() {
136                    return Err(e);
137                }
138                
139                if !DeviceConnectWithSNCache::try_connect_via_sn(&self.device_id) {
140                    return Err(e);
141                }
142
143                info!("now will retry connect via sn: device={}", self.device_id);
144                self.connect(false).await?
145            }
146        };
147
148        let seq = bdt_stream.sequence();
149        if let Some(conn_info) = conn_info {
150            let local_addr = bdt_stream.local_ep().ok_or_else(|| {
151                let msg = format!("get local_ep from bdt stream but empty! seq={:?}", seq);
152                error!("{}", msg);
153                BuckyError::new(BuckyErrorCode::NotConnected, msg)
154            })?;
155    
156            let remote_addr = bdt_stream.remote_ep().ok_or_else(|| {
157                let msg = format!("get remote_ep from bdt stream but empty! seq={:?}", seq);
158                error!("{}", msg);
159                BuckyError::new(BuckyErrorCode::NotConnected, msg)
160            })?;
161
162            *conn_info = HttpRequestConnectionInfo::Bdt((
163                local_addr,
164                remote_addr,
165            ));
166        }
167
168        debug!(
169            "bdt connect to {} success, seq={:?}, during={}ms",
170            self.remote_addr(),
171            seq,
172            begin.elapsed().as_millis(),
173        );
174        // bdt_stream.display_ref_count();
175
176        let req = req.take().unwrap();
177        let req = self.add_default_headers(req);
178
179        match async_h1::connect(bdt_stream, req).await {
180            Ok(resp) => {
181                info!(
182                    "http-bdt request to {} success! during={}ms, seq={:?}",
183                    self.remote_addr(),
184                    begin.elapsed().as_millis(),
185                    seq,
186                );
187                Ok(resp)
188            }
189            Err(e) => {
190                let msg = format!(
191                    "http-bdt request to {} failed! during={}ms, seq={:?}, {}",
192                    self.remote_addr(),
193                    begin.elapsed().as_millis(),
194                    seq,
195                    e,
196                );
197                error!("{}", msg);
198                Err(BuckyError::from(msg))
199            }
200        }
201    }
202
203    fn remote_addr(&self) -> String {
204        format!("{}:{}", self.device_id, self.vport)
205    }
206
207    fn remote_device(&self) -> Option<DeviceId> {
208        Some(self.device_id.clone())
209    }
210
211    fn clone_requestor(&self) -> Box<dyn HttpRequestor> {
212        Box::new(self.clone())
213    }
214
215    async fn stop(&self) {
216        // to nothing
217    }
218}