1use log::*;
2use std::{
3 ops::Deref,
4 time::Duration,
5 path::PathBuf
6 };
8use async_std::{
9 sync::{Arc, Weak},
10 task,
11 future,
12};
13use cyfs_base::*;
14
15use crate::{
16 types::*,
17 cc::{self},
18 datagram::{self, DatagramManager},
19 finder::*,
20 history::keystore,
21 interface::{
22 self,
23 NetManager,
24 tcp::{self, OnTcpInterface},
25 udp::{self, OnUdpPackageBox, OnUdpRawData, UdpPackageBox},
26 },
27 protocol::{*, v0::*},
28 sn::{
29 self,
30 client::{PingClientCalledEvent, PingClients},
31 },
32 stream::{self, StreamManager},
33 tunnel::{self, TunnelManager},
34 pn::client::ProxyManager,
35 ndn::{self, HistorySpeedConfig, NdnStack, ChunkReader, NdnEventHandler, RawCacheConfig },
36 debug::{self, DebugStub, PingStub}
37};
38
39struct StackLazyComponents {
40 sn_client: sn::client::ClientManager,
41 tunnel_manager: TunnelManager,
42 stream_manager: StreamManager,
43 datagram_manager: DatagramManager,
44 proxy_manager: ProxyManager,
45 debug_stub: Option<DebugStub>,
46 ping_stub: PingStub,
47}
48
49#[derive(Clone)]
50pub struct StackConfig {
51 pub statistic_interval: Duration,
52 pub device_cache: DeviceCacheConfig,
53 pub keystore: keystore::Config,
54 pub interface: interface::Config,
55 pub sn_client: sn::client::Config,
56 pub tunnel: tunnel::Config,
57 pub stream: stream::Config,
58 pub datagram: datagram::Config,
59 pub ndn: ndn::Config,
60 pub debug: Option<debug::Config>
61}
62
63impl StackConfig {
64 pub fn new(_isolate: &str) -> Self {
65 Self {
66 statistic_interval: Duration::from_secs(60),
67 keystore: keystore::Config {
68 active_time: Duration::from_secs(300),
69 capacity: 10000,
70 },
71 device_cache: DeviceCacheConfig {
72 expire: Duration::from_secs(5 * 60),
73 capacity: 1024 * 1024
74 },
75 interface: interface::Config {
76 udp: interface::udp::Config {
77 sn_only: false,
78 sim_loss_rate: 0,
79 recv_buffer: 52428800
80 }
81 },
82 sn_client: sn::client::Config {
83 atomic_interval: Duration::from_millis(100),
84 ping: sn::client::ping::PingConfig {
85 interval: Duration::from_secs(25),
86 udp: sn::client::ping::udp::Config {
87 resend_interval: Duration::from_millis(500),
88 resend_timeout: Duration::from_secs(5),
89 }
90 },
91 call: sn::client::call::CallConfig {
92 timeout: Duration::from_secs(5),
93 first_try_timeout: Duration::from_secs(2),
94 udp: sn::client::call::udp::Config {
95 resend_interval: Duration::from_millis(500),
96 }
97 }
98 },
99 tunnel: tunnel::Config {
100 retain_timeout: Duration::from_secs(60),
101 retry_sn_timeout: Duration::from_secs(2),
102 connect_timeout: Duration::from_secs(5),
103 tcp: tunnel::tcp::Config {
104 connect_timeout: Duration::from_secs(5),
105 confirm_timeout: Duration::from_secs(5),
106 accept_timeout: Duration::from_secs(5),
107 retain_connect_delay: Duration::from_secs(5),
108 ping_interval: Duration::from_secs(30),
109 ping_timeout: Duration::from_secs(60),
110 package_buffer: 100,
111 piece_buffer: 1000,
112 piece_interval: Duration::from_millis(10),
113 },
114 udp: tunnel::udp::Config {
115 holepunch_interval: Duration::from_millis(200),
116 connect_timeout: Duration::from_secs(5),
117 ping_interval: Duration::from_secs(30),
118 ping_timeout: Duration::from_secs(60 * 3),
119 },
120 },
121 stream: stream::Config {
122 listener: stream::listener::Config { backlog: 100 },
123 stream: stream::container::Config {
124 nagle: Duration::from_millis(0),
125 recv_buffer: 1024 * 1024,
126 recv_timeout: Duration::from_millis(200),
127 drain: 0.5,
128 send_buffer: 1024 * 512, retry_sn_timeout: Duration::from_secs(2),
130 connect_timeout: Duration::from_secs(5),
131 tcp: stream::tcp::Config {
132 min_record: 1024,
133 max_record: 2048,
134 },
135 package: stream::package::Config {
136 connect_resend_interval: Duration::from_millis(100),
137 atomic_interval: Duration::from_millis(10),
138 break_overtime: Duration::from_secs(60),
139 msl: Duration::from_secs(60),
140 cc: cc::Config {
141 init_rto: Duration::from_secs(1),
142 min_rto: Duration::from_millis(200),
143 cc_impl: cc::ImplConfig::BBR(Default::default()),
144 },
145 },
146 },
147 },
148 datagram: datagram::Config {
149 min_random_vport: 32767,
150 max_random_vport: 65535,
151 max_try_random_vport_times: 5,
152 piece_cache_duration: Duration::from_millis(1000),
153 recv_cache_count: 16,
154 expired_tick_sec: 10,
155 fragment_cache_size: 100 *1024*1024,
156 fragment_expired_us: 30 *1000*1000,
157 },
158 ndn: ndn::Config {
159 atomic_interval: Duration::from_millis(10),
160 schedule_interval: Duration::from_secs(1),
161 channel: ndn::channel::Config {
162 reserve_timeout: Duration::from_secs(60),
163 resend_interval: Duration::from_millis(500),
164 resend_timeout: Duration::from_secs(5),
165 block_interval: Duration::from_secs(2),
166 msl: Duration::from_secs(60),
167 udp: ndn::channel::tunnel::udp::Config {
168 no_resp_loss_count: 3,
169 break_loss_count: 10,
170 cc: cc::Config {
171 init_rto: Duration::from_secs(1),
172 min_rto: Duration::from_millis(200),
173 cc_impl: cc::ImplConfig::Ledbat(Default::default()),
174 }
175 },
176 history_speed: HistorySpeedConfig {
177 attenuation: 0.5,
178 expire: Duration::from_secs(20),
179 atomic: Duration::from_secs(1)
180 }
181 },
182 chunk: ndn::chunk::Config{
183 raw_caches: RawCacheConfig {
184 mem_capacity: 1024 * 1024 * 1024,
185 tmp_dir: PathBuf::new()
186 }
187 }
188 },
189 debug: None
190 }
191 }
192}
193
194pub struct StackImpl {
195 config: StackConfig,
196 local_device_id: DeviceId,
197 local_const: DeviceDesc,
198 id_generator: IncreaseIdGenerator,
199 keystore: keystore::Keystore,
200 device_cache: DeviceCache,
201 net_manager: NetManager,
202 lazy_components: Option<StackLazyComponents>,
203 ndn: Option<NdnStack>,
204}
205
206pub struct StackOpenParams {
207 pub config: StackConfig,
208 pub tcp_port_mapping: Option<Vec<(Endpoint, u16)>>,
209 pub known_sn: Option<Vec<Device>>,
210 pub known_device: Option<Vec<Device>>,
211 pub active_pn: Option<Vec<Device>>,
212 pub passive_pn: Option<Vec<Device>>,
213
214 pub outer_cache: Option<Box<dyn OuterDeviceCache>>,
215 pub chunk_store: Option<Box<dyn ChunkReader>>,
216
217 pub ndn_event: Option<Box<dyn NdnEventHandler>>,
218}
219
220impl StackOpenParams {
221 pub fn new(isolate: &str) -> Self {
222 Self {
223 config: StackConfig::new(isolate),
224 tcp_port_mapping: None,
225 known_sn: None,
226 known_device: None,
227 active_pn: None,
228 passive_pn: None,
229 outer_cache: None,
230 chunk_store: None,
231 ndn_event: None,
232 }
233 }
234}
235
236#[derive(Clone)]
237pub struct Stack(Arc<StackImpl>);
238pub type WeakStack = Weak<StackImpl>;
239
240impl Stack {
241 pub async fn open(
242 local_device: Device,
243 local_secret: PrivateKey,
244 params: StackOpenParams
245 ) -> Result<StackGuard, BuckyError> {
246 let local_device_id = local_device.desc().device_id();
247
248 let mut params = params;
249 let mut tcp_port_mapping = None;
250 std::mem::swap(&mut tcp_port_mapping, &mut params.tcp_port_mapping);
251
252 let net_manager =
253 NetManager::open(
254 local_device_id.clone(),
255 ¶ms.config.interface,
256 &local_device.connect_info().endpoints(),
257 tcp_port_mapping)?;
258 let net_listener = net_manager.listener();
259
260 let signer = RsaCPUObjectSigner::new(
261 local_device.desc().public_key().clone(),
262 local_secret.clone(),
263 );
264
265 let mut passive_pn = vec![];
266 if params.passive_pn.is_some() {
267 std::mem::swap(&mut passive_pn, params.passive_pn.as_mut().unwrap());
268 }
269
270 let init_local_device = {
271 let mut device = local_device.clone();
272 let device_endpoints = device.mut_connect_info().mut_endpoints();
273 device_endpoints.clear();
274 let bound_endpoints = net_manager.listener().endpoints();
275 for ep in bound_endpoints {
276 device_endpoints.push(ep);
277 }
278
279 let passive_pn_list = device.mut_connect_info().mut_passive_pn_list();
280 for pn in passive_pn.iter().map(|d| d.desc().device_id()) {
281 passive_pn_list.push(pn);
282 }
283
284 device
285 .body_mut()
286 .as_mut()
287 .unwrap()
288 .increase_update_time(bucky_time_now());
289 sign_and_set_named_object_body(&signer, &mut device, &SignatureSource::RefIndex(SIGNATURE_SOURCE_REFINDEX_SELF))
290 .await
291 .map(|_| device)
292 }?;
293
294 let key_store = keystore::Keystore::new(
295 local_secret,
296 local_device.desc().clone(),
297 signer,
298 params.config.keystore.clone(),
299 );
300
301 let mut outer_cache = None;
302 std::mem::swap(&mut outer_cache, &mut params.outer_cache);
303
304 let stack = Self(Arc::new(StackImpl {
305 config: params.config.clone(),
306 local_device_id,
307 local_const: local_device.desc().clone(),
308 id_generator: IncreaseIdGenerator::new(),
309 keystore: key_store,
310 device_cache: DeviceCache::new(¶ms.config.device_cache, outer_cache),
311 net_manager,
312 lazy_components: None,
313 ndn: None
314 }));
315
316 let datagram_manager = DatagramManager::new(stack.to_weak());
317
318 let proxy_manager = ProxyManager::new(stack.to_weak());
319
320 let mut active_pn = vec![];
321 if params.active_pn.is_some() {
322 std::mem::swap(&mut active_pn, params.active_pn.as_mut().unwrap());
323 }
324 for pn in active_pn {
325 proxy_manager.add_active_proxy(&pn);
326 }
327
328 for pn in passive_pn {
329 proxy_manager.add_passive_proxy(&pn);
330 }
331
332 let debug_stub = if stack.config().debug.is_some() {
333 Some(DebugStub::open(stack.to_weak(), stack.config().debug.as_ref().unwrap().chunk_store.clone()).await?)
334 } else {
335 None
336 };
337
338 let ping_stub = PingStub::new(stack.to_weak());
339
340 {
341 let components = StackLazyComponents {
342 sn_client: sn::client::ClientManager::create(stack.to_weak(), net_listener, init_local_device.clone()),
343 tunnel_manager: TunnelManager::new(stack.to_weak()),
344 stream_manager: StreamManager::new(stack.to_weak()),
345 datagram_manager,
346 proxy_manager,
347 debug_stub: debug_stub.clone(),
348 ping_stub: ping_stub.clone(),
349 };
350
351 let stack_impl = unsafe { &mut *(Arc::as_ptr(&stack.0) as *mut StackImpl) };
352 stack_impl.lazy_components = Some(components);
353
354 let mut chunk_store = None;
355 std::mem::swap(&mut chunk_store, &mut params.chunk_store);
356
357 let mut ndn_event = None;
358 std::mem::swap(&mut ndn_event, &mut params.ndn_event);
359
360 let ndn = NdnStack::open(stack.to_weak(), chunk_store, ndn_event);
361 let stack_impl = unsafe { &mut *(Arc::as_ptr(&stack.0) as *mut StackImpl) };
362 stack_impl.ndn = Some(ndn);
363
364 }
365
366
367 let mut known_device = vec![];
368 if params.known_device.is_some() {
369 std::mem::swap(&mut known_device, params.known_device.as_mut().unwrap());
370 }
371 for device in known_device {
372 stack
373 .device_cache()
374 .add_static(&device.desc().device_id(), &device);
375 }
376
377 let net_listener = stack.net_manager().listener();
378 net_listener.start(stack.to_weak());
379
380 let mut known_sn = vec![];
381 if params.known_sn.is_some() {
382 std::mem::swap(&mut known_sn, params.known_sn.as_mut().unwrap());
383 }
384 stack.reset_known_sn(known_sn.clone());
385 stack.ndn().start();
386
387 if let Some(debug_stub) = debug_stub {
388 debug_stub.listen();
389 }
390
391 ping_stub.listen();
392
393 let arc_stack = stack.clone();
394 task::spawn(async move {
395 loop {
396 info!("{} statistic: {}, {}, {}, {}",
397 arc_stack,
398 arc_stack.tunnel_manager().on_statistic(),
399 arc_stack.stream_manager().on_statistic(),
400 arc_stack.ndn().channel_manager().on_statistic(),
401 arc_stack.ndn().chunk_manager().on_statistic()
402 );
403 let _ = future::timeout(arc_stack.config().statistic_interval, future::pending::<()>()).await;
404 }
405 });
406
407 info!("{}: opened, version 0.5.4", stack);
408 Ok(StackGuard::from(stack))
409 }
410
411 pub fn to_weak(&self) -> WeakStack {
412 Arc::downgrade(&self.0)
413 }
414
415 pub fn id_generator(&self) -> &IncreaseIdGenerator {
416 &self.0.id_generator
417 }
418
419 pub fn keystore(&self) -> &keystore::Keystore {
420 &self.0.keystore
421 }
422
423 pub fn net_manager(&self) -> &NetManager {
424 &self.0.net_manager
425 }
426
427 pub fn device_cache(&self) -> &DeviceCache {
428 &self.0.device_cache
429 }
430
431 pub fn config(&self) -> &StackConfig {
432 &self.0.config
433 }
434 pub fn tunnel_manager(&self) -> &TunnelManager {
435 &self.0.lazy_components.as_ref().unwrap().tunnel_manager
436 }
437 pub fn stream_manager(&self) -> &StreamManager {
438 &self.0.lazy_components.as_ref().unwrap().stream_manager
439 }
440
441 pub fn datagram_manager(&self) -> &DatagramManager {
442 &self.0.lazy_components.as_ref().unwrap().datagram_manager
443 }
444
445 pub fn proxy_manager(&self) -> &ProxyManager {
446 &self.0.lazy_components.as_ref().unwrap().proxy_manager
447 }
448
449 pub fn local_device_id(&self) -> &DeviceId {
450 &self.0.local_device_id
451 }
452
453 pub fn local_const(&self) -> &DeviceDesc {
454 &self.0.local_const
455 }
456
457 pub fn sn_client(&self) -> &sn::client::ClientManager {
458 &self.0.lazy_components.as_ref().unwrap().sn_client
459 }
460
461 pub fn ndn(&self) -> &NdnStack {
462 &self.0.ndn.as_ref().unwrap()
463 }
464
465 pub fn close(&self) {
466 }
468
469 pub fn reset_sn_list(&self, sn_list: Vec<Device>) -> PingClients {
470 let sn_id_list: Vec<DeviceId> = sn_list.iter().map(|sn| sn.desc().device_id()).collect();
471 info!("{} reset_sn_list {:?}", self, sn_id_list);
472 self.sn_client().reset_sn_list(sn_list)
473 }
474
475 pub fn reset_known_sn(&self, sn_list: Vec<Device>) {
476 let sn_id_list: Vec<DeviceId> = sn_list.iter().map(|sn| sn.desc().device_id()).collect();
477 info!("{} reset_known_sn_list {:?}", self, sn_id_list);
478 for (id, sn) in sn_id_list.iter().zip(sn_list.iter()) {
479 self.device_cache().add_static(id, sn);
480 }
481 self.sn_client().cache().reset_known_sn(&sn_id_list);
482 }
483
484 pub async fn reset_endpoints(&self, endpoints: &Vec<Endpoint>) -> PingClients {
485 info!("{} reset {:?}", self, endpoints);
486 let listener = self.net_manager().reset(endpoints.as_slice());
487
488 let mut local = self.sn_client().ping().default_local();
489 let device_endpoints = local.mut_connect_info().mut_endpoints();
490 device_endpoints.clear();
491 let bound_endpoints = listener.endpoints();
492 for ep in bound_endpoints {
493 device_endpoints.push(ep);
494 }
495 local
496 .body_mut()
497 .as_mut()
498 .unwrap()
499 .increase_update_time(bucky_time_now());
500 let _ = sign_and_set_named_object_body(
501 self.keystore().signer(),
502 &mut local,
503 &SignatureSource::RefIndex(0),
504 )
505 .await;
506 self.tunnel_manager().reset();
507
508 self.sn_client().reset_endpoints(listener.clone(), local)
509 }
510}
511
512impl std::fmt::Display for Stack {
513 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
514 write!(f, "BdtStack{{local:{}}}", self.local_device_id())
515 }
516}
517
518impl From<&WeakStack> for Stack {
519 fn from(w: &WeakStack) -> Self {
520 Self(w.upgrade().unwrap())
521 }
522}
523
524impl OnUdpPackageBox for Stack {
525 fn on_udp_package_box(&self, package_box: UdpPackageBox) -> Result<(), BuckyError> {
526 trace!("{} on_udp_package_box", self.local_device_id().as_ref());
527 if package_box.as_ref().has_exchange() {
529 self.keystore().add_key(
531 package_box.as_ref().key(),
532 package_box.as_ref().remote()
533 );
534 }
535 if package_box.as_ref().is_tunnel() {
536 self.tunnel_manager().on_udp_package_box(package_box)
537 } else if package_box.as_ref().is_sn() {
538 self.sn_client().on_udp_package_box(package_box)
539 } else if package_box.as_ref().is_tcp_stream() {
540 self.tunnel_manager().on_udp_package_box(package_box)
541 } else if package_box.as_ref().is_proxy() {
542 self.proxy_manager().on_udp_package_box(package_box)
543 } else {
544 unreachable!()
545 }
546 }
547}
548
549impl OnUdpRawData<(udp::Interface, DeviceId, MixAesKey, Endpoint)> for Stack {
550 fn on_udp_raw_data(
551 &self,
552 data: &[u8],
553 context: (udp::Interface, DeviceId, MixAesKey, Endpoint),
554 ) -> Result<(), BuckyError> {
555 self.tunnel_manager().on_udp_raw_data(data, context)
556 }
557}
558
559impl OnTcpInterface for Stack {
560 fn on_tcp_interface(
561 &self,
562 interface: tcp::AcceptInterface,
563 first_box: PackageBox,
564 ) -> Result<OnPackageResult, BuckyError> {
565 if first_box.has_exchange() {
567 self.keystore()
569 .add_key(first_box.key(), first_box.remote());
570 }
571 if first_box.is_tunnel() {
572 self.tunnel_manager().on_tcp_interface(interface, first_box)
573 } else if first_box.is_sn() {
574 unreachable!()
575 } else if first_box.is_tcp_stream() {
576 self.tunnel_manager().on_tcp_interface(interface, first_box)
577 } else {
578 unreachable!()
579 }
580 }
581}
582
583
584impl PingClientCalledEvent for Stack {
585 fn on_called(&self, called: &SnCalled, _: ()) -> Result<(), BuckyError> {
586 if called.payload.len() == 0 {
587 warn!("{} ignore called for no payload.", self.local_device_id());
588 return Ok(());
589 }
590 use udp::*;
591 let mut crypto_buf = vec![0u8; called.payload.as_ref().len()];
592 let ctx = PackageBoxDecodeContext::new_copy(crypto_buf.as_mut(), self.keystore());
593 let caller_box = PackageBox::raw_decode_with_context(
594 called.payload.as_ref(),
595 (ctx, Some(called.into())),
596 ).map(|(package_box, _)| package_box)
597 .map_err(|err| {
598 error!("{} ignore decode payload failed, err={}.", self.local_device_id(), err);
599 err
600 })?;
601 if caller_box.has_exchange() {
602 self.keystore().add_key(caller_box.key(), caller_box.remote());
604 }
605 self.tunnel_manager().on_called(called, caller_box)
606 }
607}
608
609struct StackGuardImpl(Stack);
610
611impl Drop for StackGuardImpl {
612 fn drop(&mut self) {
613 self.0.close();
614 }
615}
616
617#[derive(Clone)]
618pub struct StackGuard(Arc<StackGuardImpl>);
619
620impl From<Stack> for StackGuard {
621 fn from(stack: Stack) -> Self {
622 Self(Arc::new(StackGuardImpl(stack)))
623 }
624}
625
626impl Deref for StackGuard {
627 type Target = Stack;
628 fn deref(&self) -> &Stack {
629 &(*self.0).0
630 }
631}