cyfs_lib/stack/
stack.rs

1use super::uni_stack::*;
2use crate::crypto::*;
3use crate::events::*;
4use crate::ndn::*;
5use crate::non::*;
6use crate::requestor::*;
7use crate::rmeta::*;
8use crate::root_state::*;
9use crate::router_handler::*;
10use crate::storage::StateStorage;
11use crate::sync::*;
12use crate::trans::*;
13use crate::util::*;
14use cyfs_base::*;
15
16use http_types::Url;
17use once_cell::sync::OnceCell;
18use std::sync::Arc;
19use std::sync::RwLock;
20
21pub type SharedObjectStackDecID = Arc<OnceCell<ObjectId>>;
22
23struct CyfsStackProcessors {
24    pub non_service: NONOutputProcessorRef,
25    pub ndn_service: NDNOutputProcessorRef,
26    pub crypto_service: CryptoOutputProcessorRef,
27    pub util_service: UtilOutputProcessorRef,
28    pub trans_service: TransOutputProcessorRef,
29
30    pub router_handlers: RouterHandlerManagerProcessorRef,
31    pub router_events: RouterEventManagerProcessorRef,
32
33    pub root_state: GlobalStateOutputProcessorRef,
34    pub root_state_accessor: GlobalStateAccessorOutputProcessorRef,
35
36    pub local_cache: GlobalStateOutputProcessorRef,
37    pub local_cache_accessor: GlobalStateAccessorOutputProcessorRef,
38
39    pub root_state_meta: GlobalStateMetaOutputProcessorRef,
40    pub local_cache_meta: GlobalStateMetaOutputProcessorRef,
41}
42
43impl Drop for CyfsStackProcessors {
44    fn drop(&mut self) {
45        warn!("CyfsStackProcessors dropped!");
46    }
47}
48
49pub(crate) struct ObjectServices {
50    non_service: NONRequestor,
51    ndn_service: NDNRequestor,
52
53    crypto_service: CryptoRequestor,
54
55    util_service: UtilRequestor,
56    trans_service: TransRequestor,
57    sync_service: SyncRequestor,
58
59    root_state: GlobalStateRequestor,
60    root_state_accessor: GlobalStateAccessorRequestor,
61
62    local_cache: GlobalStateRequestor,
63    local_cache_accessor: GlobalStateAccessorRequestor,
64
65    root_state_meta: GlobalStateMetaRequestor,
66    local_cache_meta: GlobalStateMetaRequestor,
67}
68
69impl Drop for ObjectServices {
70    fn drop(&mut self) {
71        warn!("object services dropped!");
72    }
73}
74
75#[derive(Clone)]
76pub struct SharedCyfsStack {
77    param: SharedCyfsStackParam,
78
79    // The dec_id it belongs to
80    dec_id: SharedObjectStackDecID,
81
82    services: Arc<ObjectServices>,
83    processors: Arc<CyfsStackProcessors>,
84
85    // router handler
86    router_handlers: RouterHandlerManager,
87
88    // router events
89    router_events: RouterEventManager,
90
91    // Device of the current protocol stack
92    device_info: Arc<RwLock<Option<(DeviceId, Device)>>>,
93
94    // uni_stack
95    uni_stack: Arc<OnceCell<UniCyfsStackRef>>,
96
97    requestor_holder: RequestorHolder,
98}
99
100#[derive(Debug, Clone)]
101pub enum CyfsStackEventType {
102    WebSocket(Url),
103}
104
105#[derive(Debug, Clone)]
106pub enum CyfsStackRequestorType {
107    Http,
108    WebSocket,
109}
110
111#[derive(Debug, Clone)]
112pub struct CyfsStackRequestorConfig {
113    // default is None, use the traditional http-tcp requestor without connection pool;
114    // use Some(0) will use the default connection pool size 50
115    pub http_max_connections_per_host: Option<usize>,
116
117    pub non_service: CyfsStackRequestorType,
118    pub ndn_service: CyfsStackRequestorType,
119    pub util_service: CyfsStackRequestorType,
120    pub trans_service: CyfsStackRequestorType,
121    pub crypto_service: CyfsStackRequestorType,
122    pub root_state: CyfsStackRequestorType,
123    pub local_cache: CyfsStackRequestorType,
124}
125
126impl CyfsStackRequestorConfig {
127    pub fn default() -> Self {
128        Self::http()
129    }
130
131    pub fn http() -> Self {
132        Self {
133            http_max_connections_per_host: None,
134
135            non_service: CyfsStackRequestorType::Http,
136            ndn_service: CyfsStackRequestorType::Http,
137            util_service: CyfsStackRequestorType::Http,
138            trans_service: CyfsStackRequestorType::Http,
139            crypto_service: CyfsStackRequestorType::Http,
140            root_state: CyfsStackRequestorType::Http,
141            local_cache: CyfsStackRequestorType::Http,
142        }
143    }
144
145    pub fn ws() -> Self {
146        Self {
147            http_max_connections_per_host: None,
148
149            non_service: CyfsStackRequestorType::WebSocket,
150            ndn_service: CyfsStackRequestorType::WebSocket,
151            util_service: CyfsStackRequestorType::WebSocket,
152            trans_service: CyfsStackRequestorType::WebSocket,
153            crypto_service: CyfsStackRequestorType::WebSocket,
154            root_state: CyfsStackRequestorType::WebSocket,
155            local_cache: CyfsStackRequestorType::WebSocket,
156        }
157    }
158}
159
160#[derive(Debug, Clone)]
161pub struct SharedCyfsStackParam {
162    pub dec_id: Option<ObjectId>,
163
164    // Service address based on the HTTP protocol
165    pub service_url: Url,
166
167    // Service address based on the WebSocket protocol
168    pub ws_url: Url,
169
170    pub event_type: CyfsStackEventType,
171
172    pub requestor_config: CyfsStackRequestorConfig,
173}
174
175impl SharedCyfsStackParam {
176    fn gen_url(http_port: u16, ws_port: u16) -> (Url, Url) {
177        assert_ne!(http_port, ws_port);
178
179        let service_url = format!("http://127.0.0.1:{}", http_port).parse().unwrap();
180        let ws_url = format!("ws://127.0.0.1:{}", ws_port).parse().unwrap();
181
182        (service_url, ws_url)
183    }
184
185    pub fn default_with_http_event(dec_id: Option<ObjectId>) -> Self {
186        Self::default(dec_id)
187    }
188
189    // Switch to the websocket mode by default
190    pub fn default(dec_id: Option<ObjectId>) -> Self {
191        let (service_url, ws_url) =
192            Self::gen_url(cyfs_base::NON_STACK_HTTP_PORT, cyfs_base::NON_STACK_WS_PORT);
193
194        Self {
195            dec_id,
196            service_url,
197            event_type: CyfsStackEventType::WebSocket(ws_url.clone()),
198            ws_url,
199            requestor_config: CyfsStackRequestorConfig::default(),
200        }
201    }
202
203    // SharedCyfsStack provided to Cyfs-Runtime
204    pub fn default_runtime(dec_id: Option<ObjectId>) -> Self {
205        let (service_url, ws_url) = Self::gen_url(
206            cyfs_base::CYFS_RUNTIME_NON_STACK_HTTP_PORT,
207            cyfs_base::CYFS_RUNTIME_NON_STACK_WS_PORT,
208        );
209
210        Self {
211            dec_id,
212            service_url,
213            event_type: CyfsStackEventType::WebSocket(ws_url.clone()),
214            ws_url,
215            requestor_config: CyfsStackRequestorConfig::default(),
216        }
217    }
218
219    // Open shared stack of the specified port
220    pub fn gen(dec_id: Option<ObjectId>, http_port: u16, ws_port: u16) -> Self {
221        let (service_url, ws_url) = Self::gen_url(http_port, ws_port);
222
223        Self {
224            dec_id,
225            service_url,
226            event_type: CyfsStackEventType::WebSocket(ws_url.clone()),
227            ws_url,
228            requestor_config: CyfsStackRequestorConfig::default(),
229        }
230    }
231
232    pub fn new_with_ws_event(
233        dec_id: Option<ObjectId>,
234        service_url: &str,
235        ws_url: &str,
236    ) -> BuckyResult<Self> {
237        let service_url = Url::parse(service_url).map_err(|e| {
238            let msg = format!("invalid http service url! url={}, {}", service_url, e);
239            error!("{}", msg);
240
241            BuckyError::new(BuckyErrorCode::InvalidFormat, msg)
242        })?;
243
244        let ws_url = Url::parse(ws_url).map_err(|e| {
245            let msg = format!("invalid ws service url! url={}, {}", ws_url, e);
246            error!("{}", msg);
247
248            BuckyError::new(BuckyErrorCode::InvalidFormat, msg)
249        })?;
250
251        Ok(Self {
252            dec_id,
253            service_url,
254            event_type: CyfsStackEventType::WebSocket(ws_url.clone()),
255            ws_url,
256            requestor_config: CyfsStackRequestorConfig::default(),
257        })
258    }
259}
260
261#[derive(Clone)]
262struct RequestorHolder {
263    requestor_config: CyfsStackRequestorConfig,
264    http: Option<HttpRequestorRef>,
265    ws: Option<HttpRequestorRef>,
266    data: Option<HttpRequestorRef>,
267}
268
269impl RequestorHolder {
270    fn new(requestor_config: CyfsStackRequestorConfig) -> Self {
271        Self {
272            requestor_config,
273            http: None,
274            ws: None,
275            data: None,
276        }
277    }
278
279    fn select_requestor(
280        &mut self,
281        param: &SharedCyfsStackParam,
282        requestor_type: &CyfsStackRequestorType,
283    ) -> HttpRequestorRef {
284        match *requestor_type {
285            CyfsStackRequestorType::Http => {
286                self.http
287                    .get_or_insert_with(|| {
288                        // Requestor based on standard http
289                        let addr = format!(
290                            "{}:{}",
291                            param.service_url.host_str().unwrap(),
292                            param.service_url.port().unwrap()
293                        );
294
295                        if let Some(http_max_connections_per_host) =
296                            self.requestor_config.http_max_connections_per_host
297                        {
298                            Arc::new(Box::new(SurfHttpRequestor::new(
299                                &addr,
300                                http_max_connections_per_host,
301                            )))
302                        } else {
303                            Arc::new(Box::new(TcpHttpRequestor::new(&addr)))
304                        }
305                    })
306                    .clone()
307            }
308            CyfsStackRequestorType::WebSocket => {
309                self.ws
310                    .get_or_insert_with(|| {
311                        // Requestor based on the WebSocket protocol
312                        Arc::new(Box::new(WSHttpRequestor::new(param.ws_url.clone())))
313                    })
314                    .clone()
315            }
316        }
317    }
318
319    fn data_requestor(&mut self, param: &SharedCyfsStackParam) -> HttpRequestorRef {
320        self.data
321            .get_or_insert_with(|| {
322                // Requestor based on standard HTTP
323                let addr = format!(
324                    "{}:{}",
325                    param.service_url.host_str().unwrap(),
326                    param.service_url.port().unwrap()
327                );
328
329                Arc::new(Box::new(TcpHttpRequestor::new(&addr)))
330            })
331            .clone()
332    }
333
334    async fn stop(&self) {
335        if let Some(requestor) = &self.http {
336            requestor.stop().await;
337        }
338        if let Some(requestor) = &self.ws {
339            requestor.stop().await;
340        }
341    }
342}
343
344impl SharedCyfsStack {
345    pub async fn open_default(dec_id: Option<ObjectId>) -> BuckyResult<Self> {
346        Self::open(SharedCyfsStackParam::default(dec_id)).await
347    }
348
349    pub async fn open_default_with_ws_event(dec_id: Option<ObjectId>) -> BuckyResult<Self> {
350        Self::open(SharedCyfsStackParam::default(dec_id)).await
351    }
352
353    pub async fn open_runtime(dec_id: Option<ObjectId>) -> BuckyResult<Self> {
354        Self::open(SharedCyfsStackParam::default_runtime(dec_id)).await
355    }
356
357    pub async fn open_with_port(
358        dec_id: Option<ObjectId>,
359        http_port: u16,
360        ws_port: u16,
361    ) -> BuckyResult<Self> {
362        Self::open(SharedCyfsStackParam::gen(dec_id, http_port, ws_port)).await
363    }
364
365    pub async fn open(param: SharedCyfsStackParam) -> BuckyResult<Self> {
366        Self::sync_open(param)
367    }
368
369    pub fn sync_open(param: SharedCyfsStackParam) -> BuckyResult<Self> {
370        info!("will init shared object stack: {:?}", param);
371
372        let dec_id = Arc::new(OnceCell::new());
373        if let Some(id) = &param.dec_id {
374            dec_id.set(id.clone()).unwrap();
375        }
376
377        let mut requestor_holder = RequestorHolder::new(param.requestor_config.clone());
378
379        // trans service
380        let requestor =
381            requestor_holder.select_requestor(&param, &param.requestor_config.trans_service);
382        let trans_service = TransRequestor::new(Some(dec_id.clone()), requestor);
383
384        // util
385        let requestor =
386            requestor_holder.select_requestor(&param, &param.requestor_config.util_service);
387        let util_service = UtilRequestor::new(Some(dec_id.clone()), requestor);
388
389        // crypto
390        let requestor =
391            requestor_holder.select_requestor(&param, &param.requestor_config.crypto_service);
392        let crypto_service = CryptoRequestor::new(Some(dec_id.clone()), requestor);
393
394        // non
395        let requestor =
396            requestor_holder.select_requestor(&param, &param.requestor_config.non_service);
397        let non_service = NONRequestor::new(Some(dec_id.clone()), requestor);
398
399        // ndn
400        let requestor =
401            requestor_holder.select_requestor(&param, &param.requestor_config.ndn_service);
402        let data_requestor =
403            requestor_holder.data_requestor(&param);
404        let ndn_service = NDNRequestor::new(Some(dec_id.clone()), requestor, Some(data_requestor));
405
406        // sync
407        let requestor = requestor_holder.select_requestor(&param, &CyfsStackRequestorType::Http);
408        let sync_service = SyncRequestor::new(requestor);
409
410        // root_state
411        let requestor =
412            requestor_holder.select_requestor(&param, &param.requestor_config.root_state);
413        let root_state =
414            GlobalStateRequestor::new_root_state(Some(dec_id.clone()), requestor.clone());
415
416        let root_state_accessor =
417            GlobalStateAccessorRequestor::new_root_state(Some(dec_id.clone()), requestor.clone());
418
419        let root_state_meta =
420            GlobalStateMetaRequestor::new_root_state(Some(dec_id.clone()), requestor.clone());
421
422        // local_cache
423        let requestor =
424            requestor_holder.select_requestor(&param, &param.requestor_config.local_cache);
425        let local_cache =
426            GlobalStateRequestor::new_local_cache(Some(dec_id.clone()), requestor.clone());
427
428        let local_cache_accessor =
429            GlobalStateAccessorRequestor::new_local_cache(Some(dec_id.clone()), requestor.clone());
430
431        let local_cache_meta =
432            GlobalStateMetaRequestor::new_local_cache(Some(dec_id.clone()), requestor);
433
434        let services = Arc::new(ObjectServices {
435            non_service,
436            ndn_service,
437            crypto_service,
438
439            util_service,
440            trans_service,
441            sync_service,
442
443            root_state,
444            root_state_accessor,
445
446            local_cache,
447            local_cache_accessor,
448
449            root_state_meta,
450            local_cache_meta,
451        });
452
453        // Initialize the corresponding event processor, choose one in the two choices
454        let router_handlers = match &param.event_type {
455            CyfsStackEventType::WebSocket(ws_url) => {
456                RouterHandlerManager::new(Some(dec_id.clone()), ws_url.clone())
457            }
458        };
459
460        let router_events = match &param.event_type {
461            CyfsStackEventType::WebSocket(ws_url) => {
462                RouterEventManager::new(Some(dec_id.clone()), ws_url.clone())
463            }
464        };
465
466        // Caches all processors, use UNI_STACK to return to use directly
467        let processors = Arc::new(CyfsStackProcessors {
468            non_service: services.non_service.clone_processor(),
469            ndn_service: services.ndn_service.clone_processor(),
470            crypto_service: services.crypto_service.clone_processor(),
471            util_service: services.util_service.clone_processor(),
472            trans_service: services.trans_service.clone_processor(),
473            router_handlers: router_handlers.clone_processor(),
474            router_events: router_events.clone_processor(),
475            root_state: services.root_state.clone_processor(),
476            root_state_accessor: services.root_state_accessor.clone_processor(),
477            local_cache: services.local_cache.clone_processor(),
478            local_cache_accessor: services.local_cache_accessor.clone_processor(),
479            root_state_meta: services.root_state_meta.clone_processor(),
480            local_cache_meta: services.local_cache_meta.clone_processor(),
481        });
482
483        let ret = Self {
484            param,
485            dec_id,
486
487            services,
488            processors,
489
490            router_handlers,
491            router_events,
492
493            device_info: Arc::new(RwLock::new(None)),
494            uni_stack: Arc::new(OnceCell::new()),
495
496            requestor_holder,
497        };
498
499        Ok(ret)
500    }
501
502    pub async fn stop(&self) {
503        self.requestor_holder.stop().await;
504
505        self.router_handlers.stop().await;
506
507        self.router_events.stop().await;
508    }
509
510    pub fn param(&self) -> &SharedCyfsStackParam {
511        &self.param
512    }
513
514    pub async fn fork_with_new_dec(&self, dec_id: Option<ObjectId>) -> BuckyResult<Self> {
515        let mut param = self.param.clone();
516        param.dec_id = dec_id;
517
518        Self::open(param).await
519    }
520
521    // Waiting for the protocol stack online
522    pub async fn wait_online(&self, timeout: Option<std::time::Duration>) -> BuckyResult<()> {
523        let this = self.clone();
524        let ft = async move {
525            loop {
526                match this.online().await {
527                    Ok(_) => break,
528                    Err(e) => {
529                        match e.code() {
530                            BuckyErrorCode::ConnectFailed | BuckyErrorCode::Timeout => {
531                                // Need to retry
532                            }
533                            _ => {
534                                error!("stack online failed! {}", e);
535                                return Err(e);
536                            }
537                        }
538                    }
539                }
540                async_std::task::sleep(std::time::Duration::from_secs(5)).await;
541            }
542            Ok(())
543        };
544        if let Some(timeout) = timeout {
545            match async_std::future::timeout(timeout, ft).await {
546                Ok(ret) => ret,
547                Err(async_std::future::TimeoutError { .. }) => {
548                    let msg = format!("wait stack online timeout! dur={:?}", timeout);
549                    error!("{}", msg);
550                    Err(BuckyError::new(BuckyErrorCode::Timeout, msg))
551                }
552            }
553        } else {
554            ft.await
555        }
556    }
557
558    pub async fn online(&self) -> BuckyResult<()> {
559        // Get Device_id of the current protocol stack
560        let req = UtilGetDeviceOutputRequest::new();
561        let resp = self.services.util_service.get_device(req).await?;
562
563        info!("got local stack device: {}", resp);
564
565        *self.device_info.write().unwrap() = Some((resp.device_id, resp.device));
566
567        Ok(())
568    }
569
570    // If it is not specified during initialization, it can be delayed init once
571    pub fn bind_dec(&self, dec_id: ObjectId) {
572        self.dec_id.set(dec_id).unwrap();
573    }
574
575    pub fn dec_id(&self) -> Option<&ObjectId> {
576        self.dec_id.get()
577    }
578
579    // The following two interfaces must be called online successfully before you can call them
580    pub fn local_device_id(&self) -> DeviceId {
581        self.device_info.read().unwrap().as_ref().unwrap().0.clone()
582    }
583
584    pub fn local_device(&self) -> Device {
585        self.device_info.read().unwrap().as_ref().unwrap().1.clone()
586    }
587
588    pub fn non_service(&self) -> &NONRequestor {
589        &self.services.non_service
590    }
591
592    pub fn ndn_service(&self) -> &NDNRequestor {
593        &self.services.ndn_service
594    }
595
596    pub fn crypto(&self) -> &CryptoRequestor {
597        &self.services.crypto_service
598    }
599
600    pub fn util(&self) -> &UtilRequestor {
601        &self.services.util_service
602    }
603
604    pub fn trans(&self) -> &TransRequestor {
605        &self.services.trans_service
606    }
607
608    pub fn sync(&self) -> &SyncRequestor {
609        &self.services.sync_service
610    }
611
612    pub fn router_handlers(&self) -> &RouterHandlerManager {
613        &self.router_handlers
614    }
615
616    pub fn router_events(&self) -> &RouterEventManager {
617        &self.router_events
618    }
619
620    // root_state Root State Management Related Interface
621    pub fn root_state(&self) -> &GlobalStateRequestor {
622        &self.services.root_state
623    }
624
625    pub fn root_state_stub(
626        &self,
627        target: Option<ObjectId>,
628        target_dec_id: Option<ObjectId>,
629    ) -> GlobalStateStub {
630        GlobalStateStub::new(
631            self.services.root_state.clone_processor(),
632            target,
633            target_dec_id,
634        )
635    }
636
637    pub fn root_state_accessor_stub(
638        &self,
639        target: Option<ObjectId>,
640        target_dec_id: Option<ObjectId>,
641    ) -> GlobalStateAccessorStub {
642        GlobalStateAccessorStub::new(
643            self.services.root_state_accessor.clone_processor(),
644            target,
645            target_dec_id,
646        )
647    }
648
649    // root_state meta
650    pub fn root_state_meta(&self) -> &GlobalStateMetaRequestor {
651        &self.services.root_state_meta
652    }
653
654    pub fn root_state_meta_stub(
655        &self,
656        target: Option<ObjectId>,
657        target_dec_id: Option<ObjectId>,
658    ) -> GlobalStateMetaStub {
659        GlobalStateMetaStub::new(
660            self.services.root_state_meta.clone_processor(),
661            target,
662            target_dec_id,
663        )
664    }
665
666    // local_cache
667    pub fn local_cache(&self) -> &GlobalStateRequestor {
668        &self.services.local_cache
669    }
670
671    pub fn local_cache_stub(&self, target_dec_id: Option<ObjectId>) -> GlobalStateStub {
672        GlobalStateStub::new(
673            self.services.local_cache.clone_processor(),
674            None,
675            target_dec_id,
676        )
677    }
678
679    pub fn local_cache_accessor_stub(
680        &self,
681        target: Option<ObjectId>,
682        target_dec_id: Option<ObjectId>,
683    ) -> GlobalStateAccessorStub {
684        GlobalStateAccessorStub::new(
685            self.services.local_cache_accessor.clone_processor(),
686            target,
687            target_dec_id,
688        )
689    }
690
691    // local_cache meta
692    pub fn local_cache_meta(&self) -> &GlobalStateMetaRequestor {
693        &self.services.local_cache_meta
694    }
695
696    pub fn local_cache_meta_stub(&self, target_dec_id: Option<ObjectId>) -> GlobalStateMetaStub {
697        GlobalStateMetaStub::new(
698            self.services.local_cache_meta.clone_processor(),
699            None,
700            target_dec_id,
701        )
702    }
703
704    // state_storage
705    pub fn global_state_storage(
706        &self,
707        category: GlobalStateCategory,
708        path: impl Into<String>,
709        content_type: ObjectMapSimpleContentType,
710    ) -> StateStorage {
711        StateStorage::new_with_stack(
712            self.uni_stack().clone(),
713            category,
714            path,
715            content_type,
716            None,
717            self.dec_id().cloned(),
718        )
719    }
720
721    pub fn global_state_storage_ex(
722        &self,
723        category: GlobalStateCategory,
724        path: impl Into<String>,
725        content_type: ObjectMapSimpleContentType,
726        target: Option<ObjectId>,
727        dec_id: Option<ObjectId>,
728    ) -> StateStorage {
729        StateStorage::new_with_stack(
730            self.uni_stack().clone(),
731            category,
732            path,
733            content_type,
734            target,
735            dec_id,
736        )
737    }
738
739    // uni_stack related interface
740    fn create_uni_stack(&self) -> UniCyfsStackRef {
741        Arc::new(self.clone())
742    }
743
744    pub fn uni_stack(&self) -> &UniCyfsStackRef {
745        self.uni_stack.get_or_init(|| self.create_uni_stack())
746    }
747}
748
749impl UniCyfsStack for SharedCyfsStack {
750    fn non_service(&self) -> &NONOutputProcessorRef {
751        &self.processors.non_service
752    }
753
754    fn ndn_service(&self) -> &NDNOutputProcessorRef {
755        &self.processors.ndn_service
756    }
757
758    fn crypto_service(&self) -> &CryptoOutputProcessorRef {
759        &self.processors.crypto_service
760    }
761
762    fn util_service(&self) -> &UtilOutputProcessorRef {
763        &self.processors.util_service
764    }
765
766    fn trans_service(&self) -> &TransOutputProcessorRef {
767        &self.processors.trans_service
768    }
769
770    fn router_handlers(&self) -> &RouterHandlerManagerProcessorRef {
771        &self.processors.router_handlers
772    }
773
774    fn router_events(&self) -> &RouterEventManagerProcessorRef {
775        &self.processors.router_events
776    }
777
778    fn root_state(&self) -> &GlobalStateOutputProcessorRef {
779        &self.processors.root_state
780    }
781
782    fn root_state_accessor(&self) -> &GlobalStateAccessorOutputProcessorRef {
783        &self.processors.root_state_accessor
784    }
785
786    fn local_cache(&self) -> &GlobalStateOutputProcessorRef {
787        &self.processors.local_cache
788    }
789
790    fn local_cache_accessor(&self) -> &GlobalStateAccessorOutputProcessorRef {
791        &self.processors.local_cache_accessor
792    }
793
794    fn root_state_meta(&self) -> &GlobalStateMetaOutputProcessorRef {
795        &self.processors.root_state_meta
796    }
797
798    fn local_cache_meta(&self) -> &GlobalStateMetaOutputProcessorRef {
799        &self.processors.local_cache_meta
800    }
801}
802
803#[cfg(test)]
804mod test {
805    use crate::*;
806
807    async fn test_online() {
808        let mut param = SharedCyfsStackParam::default(None);
809        param.requestor_config = CyfsStackRequestorConfig::ws();
810
811        let stack = SharedCyfsStack::open(param).await.unwrap();
812        let _ = stack.wait_online(None).await;
813
814        async_std::task::sleep(std::time::Duration::from_secs(60 * 2)).await;
815    }
816
817    #[test]
818    fn test() {
819        cyfs_base::init_simple_log("test-shared-object-stack", Some("trace"));
820
821        async_std::task::block_on(async move {
822            // test_online().await;
823        })
824    }
825}