cyfs_bdt/
stack.rs

1use log::*;
2use std::{
3    ops::Deref, 
4    time::Duration,
5    path::PathBuf
6    // sync::{atomic::{AtomicU64, Ordering}}
7};
8use async_std::{
9    sync::{Arc, Weak}, 
10    task, 
11    future, 
12};
13use cyfs_base::*;
14
15use crate::{
16    types::*,
17    cc::{self},
18    datagram::{self, DatagramManager},
19    finder::*,
20    history::keystore,
21    interface::{
22        self, 
23        NetManager, 
24        tcp::{self, OnTcpInterface},
25        udp::{self, OnUdpPackageBox, OnUdpRawData, UdpPackageBox},
26    },
27    protocol::{*, v0::*},
28    sn::{
29        self,
30        client::{PingClientCalledEvent, PingClients},
31    },
32    stream::{self, StreamManager},
33    tunnel::{self, TunnelManager},
34    pn::client::ProxyManager,
35    ndn::{self, HistorySpeedConfig, NdnStack, ChunkReader, NdnEventHandler, RawCacheConfig }, 
36    debug::{self, DebugStub, PingStub}
37};
38
39struct StackLazyComponents {
40    sn_client: sn::client::ClientManager,
41    tunnel_manager: TunnelManager,
42    stream_manager: StreamManager,
43    datagram_manager: DatagramManager,
44    proxy_manager: ProxyManager, 
45    debug_stub: Option<DebugStub>,
46    ping_stub: PingStub,
47}
48
49#[derive(Clone)]
50pub struct StackConfig {
51    pub statistic_interval: Duration, 
52    pub device_cache: DeviceCacheConfig, 
53    pub keystore: keystore::Config,
54    pub interface: interface::Config, 
55    pub sn_client: sn::client::Config,
56    pub tunnel: tunnel::Config,
57    pub stream: stream::Config,
58    pub datagram: datagram::Config,
59    pub ndn: ndn::Config, 
60    pub debug: Option<debug::Config>
61}
62
63impl StackConfig {
64    pub fn new(_isolate: &str) -> Self {
65        Self {
66            statistic_interval: Duration::from_secs(60),
67            keystore: keystore::Config {
68                active_time: Duration::from_secs(300),
69                capacity: 10000,
70            }, 
71            device_cache: DeviceCacheConfig {
72                expire: Duration::from_secs(5 * 60),
73                capacity: 1024 * 1024
74            }, 
75            interface: interface::Config {
76                udp: interface::udp::Config {
77                    sn_only: false, 
78                    sim_loss_rate: 0, 
79                    recv_buffer: 52428800
80                }
81            },
82            sn_client: sn::client::Config {
83                atomic_interval: Duration::from_millis(100),
84                ping: sn::client::ping::PingConfig {
85                    interval: Duration::from_secs(25), 
86                    udp: sn::client::ping::udp::Config {
87                        resend_interval: Duration::from_millis(500),
88                        resend_timeout: Duration::from_secs(5),
89                    }
90                }, 
91                call: sn::client::call::CallConfig {
92                    timeout: Duration::from_secs(5), 
93                    first_try_timeout: Duration::from_secs(2), 
94                    udp: sn::client::call::udp::Config {
95                        resend_interval: Duration::from_millis(500),
96                    }
97                }
98            },
99            tunnel: tunnel::Config {
100                retain_timeout: Duration::from_secs(60),
101                retry_sn_timeout: Duration::from_secs(2), 
102                connect_timeout: Duration::from_secs(5),
103                tcp: tunnel::tcp::Config {
104                    connect_timeout: Duration::from_secs(5), 
105                    confirm_timeout: Duration::from_secs(5), 
106                    accept_timeout: Duration::from_secs(5), 
107                    retain_connect_delay: Duration::from_secs(5), 
108                    ping_interval: Duration::from_secs(30), 
109                    ping_timeout: Duration::from_secs(60), 
110                    package_buffer: 100, 
111                    piece_buffer: 1000, 
112                    piece_interval: Duration::from_millis(10), 
113                }, 
114                udp: tunnel::udp::Config {
115                    holepunch_interval: Duration::from_millis(200),
116                    connect_timeout: Duration::from_secs(5),
117                    ping_interval: Duration::from_secs(30),
118                    ping_timeout: Duration::from_secs(60 * 3),
119                },
120            },
121            stream: stream::Config {
122                listener: stream::listener::Config { backlog: 100 },
123                stream: stream::container::Config {
124                    nagle: Duration::from_millis(0),
125                    recv_buffer: 1024 * 1024,
126                    recv_timeout: Duration::from_millis(200),
127                    drain: 0.5,
128                    send_buffer: 1024 * 512, // 这个值不能小于下边的max_record
129                    retry_sn_timeout: Duration::from_secs(2), 
130                    connect_timeout: Duration::from_secs(5),
131                    tcp: stream::tcp::Config {
132                        min_record: 1024,
133                        max_record: 2048,
134                    },
135                    package: stream::package::Config {
136                        connect_resend_interval: Duration::from_millis(100),
137                        atomic_interval: Duration::from_millis(10),
138                        break_overtime: Duration::from_secs(60),
139                        msl: Duration::from_secs(60), 
140                        cc: cc::Config {
141                            init_rto: Duration::from_secs(1),
142                            min_rto: Duration::from_millis(200),
143                            cc_impl: cc::ImplConfig::BBR(Default::default()),
144                        },
145                    },
146                },
147            },
148            datagram: datagram::Config {
149                min_random_vport: 32767,
150                max_random_vport: 65535,
151                max_try_random_vport_times: 5,
152                piece_cache_duration: Duration::from_millis(1000),
153                recv_cache_count: 16,
154                expired_tick_sec: 10,
155                fragment_cache_size: 100 *1024*1024,
156                fragment_expired_us: 30 *1000*1000,
157            },
158            ndn: ndn::Config {
159                atomic_interval: Duration::from_millis(10), 
160                schedule_interval: Duration::from_secs(1), 
161                channel: ndn::channel::Config {
162                    reserve_timeout: Duration::from_secs(60), 
163                    resend_interval: Duration::from_millis(500), 
164                    resend_timeout: Duration::from_secs(5), 
165                    block_interval: Duration::from_secs(2), 
166                    msl: Duration::from_secs(60), 
167                    udp: ndn::channel::tunnel::udp::Config {
168                        no_resp_loss_count: 3, 
169                        break_loss_count: 10, 
170                        cc: cc::Config {
171                            init_rto: Duration::from_secs(1),
172                            min_rto: Duration::from_millis(200),
173                            cc_impl: cc::ImplConfig::Ledbat(Default::default()),
174                        }
175                    }, 
176                    history_speed: HistorySpeedConfig {
177                        attenuation: 0.5, 
178                        expire: Duration::from_secs(20),  
179                        atomic: Duration::from_secs(1)
180                    }
181                }, 
182                chunk: ndn::chunk::Config{
183                    raw_caches: RawCacheConfig {
184                        mem_capacity: 1024 * 1024 * 1024, 
185                        tmp_dir: PathBuf::new()
186                    }
187                }
188            }, 
189            debug: None
190        }
191    }
192}
193
194pub struct StackImpl {
195    config: StackConfig,
196    local_device_id: DeviceId,
197    local_const: DeviceDesc,
198    id_generator: IncreaseIdGenerator,
199    keystore: keystore::Keystore,
200    device_cache: DeviceCache,
201    net_manager: NetManager,
202    lazy_components: Option<StackLazyComponents>, 
203    ndn: Option<NdnStack>, 
204}
205
206pub struct StackOpenParams {
207    pub config: StackConfig, 
208    pub tcp_port_mapping: Option<Vec<(Endpoint, u16)>>, 
209    pub known_sn: Option<Vec<Device>>,
210    pub known_device: Option<Vec<Device>>, 
211    pub active_pn: Option<Vec<Device>>, 
212    pub passive_pn: Option<Vec<Device>>, 
213
214    pub outer_cache: Option<Box<dyn OuterDeviceCache>>,
215    pub chunk_store: Option<Box<dyn ChunkReader>>, 
216
217    pub ndn_event: Option<Box<dyn NdnEventHandler>>,
218}
219
220impl StackOpenParams {
221    pub fn new(isolate: &str) -> Self {
222        Self {
223            config: StackConfig::new(isolate), 
224            tcp_port_mapping: None, 
225            known_sn: None, 
226            known_device: None, 
227            active_pn: None, 
228            passive_pn: None,
229            outer_cache: None,
230            chunk_store: None, 
231            ndn_event: None,
232        }
233    }
234}
235
236#[derive(Clone)]
237pub struct Stack(Arc<StackImpl>);
238pub type WeakStack = Weak<StackImpl>;
239
240impl Stack {
241    pub async fn open(
242        local_device: Device,
243        local_secret: PrivateKey,
244        params: StackOpenParams
245    ) -> Result<StackGuard, BuckyError> {
246        let local_device_id = local_device.desc().device_id();
247        
248        let mut params = params;
249        let mut tcp_port_mapping = None;
250        std::mem::swap(&mut tcp_port_mapping, &mut params.tcp_port_mapping);
251        
252        let net_manager =
253            NetManager::open(
254                local_device_id.clone(), 
255                &params.config.interface, 
256                &local_device.connect_info().endpoints(), 
257                tcp_port_mapping)?;
258        let net_listener = net_manager.listener();
259
260        let signer = RsaCPUObjectSigner::new(
261            local_device.desc().public_key().clone(),
262            local_secret.clone(),
263        );
264
265        let mut passive_pn = vec![];
266        if params.passive_pn.is_some() {
267            std::mem::swap(&mut passive_pn, params.passive_pn.as_mut().unwrap());
268        }
269
270        let init_local_device = {
271            let mut device = local_device.clone();
272            let device_endpoints = device.mut_connect_info().mut_endpoints();
273            device_endpoints.clear();
274            let bound_endpoints = net_manager.listener().endpoints();
275            for ep in bound_endpoints {
276                device_endpoints.push(ep);
277            }
278            
279            let passive_pn_list = device.mut_connect_info().mut_passive_pn_list();
280            for pn in passive_pn.iter().map(|d| d.desc().device_id()) {
281                passive_pn_list.push(pn);
282            }
283
284            device
285                .body_mut()
286                .as_mut()
287                .unwrap()
288                .increase_update_time(bucky_time_now());
289            sign_and_set_named_object_body(&signer, &mut device, &SignatureSource::RefIndex(SIGNATURE_SOURCE_REFINDEX_SELF))
290                .await
291                .map(|_| device)
292        }?;
293
294        let key_store = keystore::Keystore::new(
295            local_secret,
296            local_device.desc().clone(),
297            signer,
298            params.config.keystore.clone(),
299        );
300
301        let mut outer_cache = None;
302        std::mem::swap(&mut outer_cache, &mut params.outer_cache);
303
304        let stack = Self(Arc::new(StackImpl {
305            config: params.config.clone(),
306            local_device_id,
307            local_const: local_device.desc().clone(),
308            id_generator: IncreaseIdGenerator::new(),
309            keystore: key_store,
310            device_cache: DeviceCache::new(&params.config.device_cache, outer_cache),
311            net_manager,
312            lazy_components: None, 
313            ndn: None
314        }));
315
316        let datagram_manager = DatagramManager::new(stack.to_weak());
317
318        let proxy_manager = ProxyManager::new(stack.to_weak());
319
320        let mut active_pn = vec![];
321        if params.active_pn.is_some() {
322            std::mem::swap(&mut active_pn, params.active_pn.as_mut().unwrap());
323        }
324        for pn in active_pn {
325            proxy_manager.add_active_proxy(&pn);
326        }
327
328        for pn in passive_pn {
329            proxy_manager.add_passive_proxy(&pn);
330        }
331
332        let debug_stub = if stack.config().debug.is_some() {
333            Some(DebugStub::open(stack.to_weak(), stack.config().debug.as_ref().unwrap().chunk_store.clone()).await?)
334        } else {
335            None
336        };
337
338        let ping_stub = PingStub::new(stack.to_weak());
339
340        {
341            let components = StackLazyComponents {
342                sn_client: sn::client::ClientManager::create(stack.to_weak(), net_listener, init_local_device.clone()),
343                tunnel_manager: TunnelManager::new(stack.to_weak()),
344                stream_manager: StreamManager::new(stack.to_weak()),
345                datagram_manager, 
346                proxy_manager, 
347                debug_stub: debug_stub.clone(),
348                ping_stub: ping_stub.clone(),
349            };
350            
351            let stack_impl = unsafe { &mut *(Arc::as_ptr(&stack.0) as *mut StackImpl) };
352            stack_impl.lazy_components = Some(components);
353
354            let mut chunk_store = None;
355            std::mem::swap(&mut chunk_store, &mut params.chunk_store);
356
357            let mut ndn_event = None;
358            std::mem::swap(&mut ndn_event, &mut params.ndn_event);
359    
360            let ndn = NdnStack::open(stack.to_weak(), chunk_store, ndn_event);
361            let stack_impl = unsafe { &mut *(Arc::as_ptr(&stack.0) as *mut StackImpl) };
362            stack_impl.ndn = Some(ndn);
363
364        }
365        
366
367        let mut known_device = vec![];
368        if params.known_device.is_some() {
369            std::mem::swap(&mut known_device, params.known_device.as_mut().unwrap());
370        }
371        for device in known_device {
372            stack
373                .device_cache()
374                .add_static(&device.desc().device_id(), &device);
375        }
376
377        let net_listener = stack.net_manager().listener();
378        net_listener.start(stack.to_weak());
379        
380        let mut known_sn = vec![];
381        if params.known_sn.is_some() {
382            std::mem::swap(&mut known_sn, params.known_sn.as_mut().unwrap());
383        }
384        stack.reset_known_sn(known_sn.clone());
385        stack.ndn().start();
386
387        if let Some(debug_stub) = debug_stub {
388            debug_stub.listen();
389        }
390
391        ping_stub.listen();
392        
393        let arc_stack = stack.clone();
394        task::spawn(async move {
395            loop {
396                info!("{} statistic: {}, {}, {}, {}", 
397                    arc_stack, 
398                    arc_stack.tunnel_manager().on_statistic(), 
399                    arc_stack.stream_manager().on_statistic(),
400                    arc_stack.ndn().channel_manager().on_statistic(), 
401                    arc_stack.ndn().chunk_manager().on_statistic()
402                );
403                let _ = future::timeout(arc_stack.config().statistic_interval, future::pending::<()>()).await;
404            }
405        });
406
407        info!("{}: opened, version 0.5.4", stack); 
408        Ok(StackGuard::from(stack))
409    }
410
411    pub fn to_weak(&self) -> WeakStack {
412        Arc::downgrade(&self.0)
413    }
414
415    pub fn id_generator(&self) -> &IncreaseIdGenerator {
416        &self.0.id_generator
417    }
418
419    pub fn keystore(&self) -> &keystore::Keystore {
420        &self.0.keystore
421    }
422
423    pub fn net_manager(&self) -> &NetManager {
424        &self.0.net_manager
425    }
426
427    pub fn device_cache(&self) -> &DeviceCache {
428        &self.0.device_cache
429    }
430
431    pub fn config(&self) -> &StackConfig {
432        &self.0.config
433    }
434    pub fn tunnel_manager(&self) -> &TunnelManager {
435        &self.0.lazy_components.as_ref().unwrap().tunnel_manager
436    }
437    pub fn stream_manager(&self) -> &StreamManager {
438        &self.0.lazy_components.as_ref().unwrap().stream_manager
439    }
440
441    pub fn datagram_manager(&self) -> &DatagramManager {
442        &self.0.lazy_components.as_ref().unwrap().datagram_manager
443    }
444
445    pub fn proxy_manager(&self) -> &ProxyManager {
446        &self.0.lazy_components.as_ref().unwrap().proxy_manager
447    }
448
449    pub fn local_device_id(&self) -> &DeviceId {
450        &self.0.local_device_id
451    }
452
453    pub fn local_const(&self) -> &DeviceDesc {
454        &self.0.local_const
455    }
456
457    pub fn sn_client(&self) -> &sn::client::ClientManager {
458        &self.0.lazy_components.as_ref().unwrap().sn_client
459    }
460
461    pub fn ndn(&self) -> &NdnStack {
462        &self.0.ndn.as_ref().unwrap()
463    }
464
465    pub fn close(&self) {
466        //unimplemented!()
467    }
468
469    pub fn reset_sn_list(&self, sn_list: Vec<Device>) -> PingClients {
470        let sn_id_list: Vec<DeviceId> = sn_list.iter().map(|sn| sn.desc().device_id()).collect();
471        info!("{} reset_sn_list {:?}", self, sn_id_list);
472        self.sn_client().reset_sn_list(sn_list)
473    }
474
475    pub fn reset_known_sn(&self, sn_list: Vec<Device>) {
476        let sn_id_list: Vec<DeviceId> = sn_list.iter().map(|sn| sn.desc().device_id()).collect();
477        info!("{} reset_known_sn_list {:?}", self, sn_id_list);
478        for (id, sn) in sn_id_list.iter().zip(sn_list.iter()) {
479            self.device_cache().add_static(id, sn);
480        }
481        self.sn_client().cache().reset_known_sn(&sn_id_list);
482    }
483
484    pub async fn reset_endpoints(&self, endpoints: &Vec<Endpoint>) -> PingClients {
485        info!("{} reset {:?}", self, endpoints);
486        let listener = self.net_manager().reset(endpoints.as_slice());
487        
488        let mut local = self.sn_client().ping().default_local();
489        let device_endpoints = local.mut_connect_info().mut_endpoints();
490        device_endpoints.clear();
491        let bound_endpoints = listener.endpoints();
492        for ep in bound_endpoints {
493            device_endpoints.push(ep);
494        }
495        local
496            .body_mut()
497            .as_mut()
498            .unwrap()
499            .increase_update_time(bucky_time_now());
500        let _ = sign_and_set_named_object_body(
501            self.keystore().signer(),
502            &mut local,
503            &SignatureSource::RefIndex(0),
504        )
505        .await;
506        self.tunnel_manager().reset();
507
508        self.sn_client().reset_endpoints(listener.clone(), local)
509    }
510}
511
512impl std::fmt::Display for Stack {
513    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
514        write!(f, "BdtStack{{local:{}}}", self.local_device_id())
515    }
516}
517
518impl From<&WeakStack> for Stack {
519    fn from(w: &WeakStack) -> Self {
520        Self(w.upgrade().unwrap())
521    }
522}
523
524impl OnUdpPackageBox for Stack {
525    fn on_udp_package_box(&self, package_box: UdpPackageBox) -> Result<(), BuckyError> {
526        trace!("{} on_udp_package_box", self.local_device_id().as_ref());
527        //FIXME: 用sequence 和 send time 过滤
528        if package_box.as_ref().has_exchange() {
529            // let exchange: &Exchange = package_box.as_ref().packages()[0].as_ref();
530            self.keystore().add_key(
531                package_box.as_ref().key(),
532                package_box.as_ref().remote()
533            );
534        }
535        if package_box.as_ref().is_tunnel() {
536            self.tunnel_manager().on_udp_package_box(package_box)
537        } else if package_box.as_ref().is_sn() {
538            self.sn_client().on_udp_package_box(package_box)
539        } else if package_box.as_ref().is_tcp_stream() {
540            self.tunnel_manager().on_udp_package_box(package_box)
541        } else if package_box.as_ref().is_proxy() {
542            self.proxy_manager().on_udp_package_box(package_box)
543        } else {
544            unreachable!()
545        }
546    }
547}
548
549impl OnUdpRawData<(udp::Interface, DeviceId, MixAesKey, Endpoint)> for Stack {
550    fn on_udp_raw_data(
551        &self,
552        data: &[u8],
553        context: (udp::Interface, DeviceId, MixAesKey, Endpoint),
554    ) -> Result<(), BuckyError> {
555        self.tunnel_manager().on_udp_raw_data(data, context)
556    }
557}
558
559impl OnTcpInterface for Stack {
560    fn on_tcp_interface(
561        &self,
562        interface: tcp::AcceptInterface,
563        first_box: PackageBox,
564    ) -> Result<OnPackageResult, BuckyError> {
565        //FIXME: 用sequence 和 send time 过滤
566        if first_box.has_exchange() {
567            // let exchange: &Exchange = first_box.packages()[0].as_ref();
568            self.keystore()
569                .add_key(first_box.key(), first_box.remote());
570        }
571        if first_box.is_tunnel() {
572            self.tunnel_manager().on_tcp_interface(interface, first_box)
573        } else if first_box.is_sn() {
574            unreachable!()
575        } else if first_box.is_tcp_stream() {
576            self.tunnel_manager().on_tcp_interface(interface, first_box)
577        } else {
578            unreachable!()
579        }
580    }
581}
582
583
584impl PingClientCalledEvent for Stack {
585    fn on_called(&self, called: &SnCalled, _: ()) -> Result<(), BuckyError> {
586        if called.payload.len() == 0 {
587            warn!("{} ignore called for no payload.", self.local_device_id());
588            return Ok(());
589        }
590        use udp::*;
591        let mut crypto_buf = vec![0u8; called.payload.as_ref().len()];
592        let ctx = PackageBoxDecodeContext::new_copy(crypto_buf.as_mut(), self.keystore());
593        let caller_box = PackageBox::raw_decode_with_context(
594            called.payload.as_ref(),
595            (ctx, Some(called.into())),
596        ).map(|(package_box, _)| package_box)
597        .map_err(|err| {
598            error!("{} ignore decode payload failed, err={}.", self.local_device_id(), err);
599            err
600        })?;
601        if caller_box.has_exchange() {
602            // let exchange: &Exchange = caller_box.packages()[0].as_ref();
603            self.keystore().add_key(caller_box.key(), caller_box.remote());
604        }
605        self.tunnel_manager().on_called(called, caller_box)
606    }
607}
608
609struct StackGuardImpl(Stack);
610
611impl Drop for StackGuardImpl {
612    fn drop(&mut self) {
613        self.0.close();
614    }
615}
616
617#[derive(Clone)]
618pub struct StackGuard(Arc<StackGuardImpl>);
619
620impl From<Stack> for StackGuard {
621    fn from(stack: Stack) -> Self {
622        Self(Arc::new(StackGuardImpl(stack)))
623    }
624}
625
626impl Deref for StackGuard {
627    type Target = Stack;
628    fn deref(&self) -> &Stack {
629        &(*self.0).0
630    }
631}