cyfs_lib/events/
handler.rs

1use super::category::*;
2use super::request::*;
3use super::ws::*;
4use crate::stack::*;
5use cyfs_base::*;
6use cyfs_util::*;
7
8use async_trait::async_trait;
9use http_types::Url;
10use std::fmt;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13
14#[async_trait]
15pub(crate) trait RouterEventAnyRoutine: Send + Sync {
16    async fn emit(&self, param: String) -> BuckyResult<String>;
17}
18
19pub(crate) struct RouterEventRoutineT<REQ, RESP>(
20    pub Box<dyn EventListenerAsyncRoutine<RouterEventRequest<REQ>, RouterEventResponse<RESP>>>,
21)
22where
23    REQ: Send + Sync + 'static + JsonCodec<REQ> + fmt::Display,
24    RESP: Send + Sync + 'static + JsonCodec<RESP> + fmt::Display;
25
26#[async_trait]
27impl<REQ, RESP> RouterEventAnyRoutine for RouterEventRoutineT<REQ, RESP>
28where
29    REQ: Send + Sync + 'static + JsonCodec<REQ> + fmt::Display,
30    RESP: Send + Sync + 'static + JsonCodec<RESP> + fmt::Display,
31{
32    async fn emit(&self, param: String) -> BuckyResult<String> {
33        let param = RouterEventRequest::<REQ>::decode_string(&param)?;
34        self.0
35            .call(&param)
36            .await
37            .map(|resp| JsonCodec::encode_string(&resp))
38    }
39}
40
41#[derive(Clone)]
42pub struct RouterEventManager {
43    dec_id: Option<SharedObjectStackDecID>,
44
45    inner: RouterWSEventManager,
46    started: Arc<AtomicBool>,
47}
48
49impl RouterEventManager {
50    pub fn new(dec_id: Option<SharedObjectStackDecID>, ws_url: Url) -> Self {
51        let inner = RouterWSEventManager::new(ws_url);
52
53        Self {
54            dec_id,
55            inner,
56            started: Arc::new(AtomicBool::new(false)),
57        }
58    }
59
60    pub fn clone_processor(&self) -> RouterEventManagerProcessorRef {
61        Arc::new(Box::new(self.clone()))
62    }
63
64    fn get_dec_id(&self) -> Option<ObjectId> {
65        self.dec_id.as_ref().map(|v| v.get().cloned()).flatten()
66    }
67
68    fn try_start(&self) {
69        match self
70            .started
71            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
72        {
73            Ok(_) => {
74                info!("will start event manager!");
75                self.inner.start()
76            }
77            Err(_) => {}
78        }
79    }
80
81    pub fn add_event<REQ, RESP>(
82        &self,
83        id: &str,
84        index: i32,
85        routine: Box<
86            dyn EventListenerAsyncRoutine<RouterEventRequest<REQ>, RouterEventResponse<RESP>>,
87        >,
88    ) -> BuckyResult<()>
89    where
90        REQ: Send + Sync + 'static + JsonCodec<REQ> + fmt::Display,
91        RESP: Send + Sync + 'static + JsonCodec<RESP> + fmt::Display,
92        RouterEventRequest<REQ>: RouterEventCategoryInfo,
93    {
94        info!(
95            "will add event: category={}, id={}, index={}",
96            extract_router_event_category::<RouterEventRequest<REQ>>(),
97            id,
98            index
99        );
100
101        self.try_start();
102
103        self.inner.add_event(id, self.get_dec_id(), index, routine)
104    }
105
106    pub async fn remove_event(&self, category: RouterEventCategory, id: &str) -> BuckyResult<bool> {
107        info!("will remove event: category={}, id={}", category, id,);
108
109        self.try_start();
110
111        self.inner
112            .remove_event(category, id, self.get_dec_id())
113            .await
114    }
115
116    pub async fn stop(&self) {
117        self.inner.stop().await
118    }
119}
120
121use super::processor::*;
122
123#[async_trait::async_trait]
124impl<REQ, RESP> RouterEventProcessor<REQ, RESP> for RouterEventManager
125where
126    REQ: Send + Sync + 'static + JsonCodec<REQ> + fmt::Display,
127    RESP: Send + Sync + 'static + JsonCodec<RESP> + fmt::Display,
128    RouterEventRequest<REQ>: RouterEventCategoryInfo,
129{
130    async fn add_event(
131        &self,
132        id: &str,
133        index: i32,
134        routine: Box<
135            dyn EventListenerAsyncRoutine<RouterEventRequest<REQ>, RouterEventResponse<RESP>>,
136        >,
137    ) -> BuckyResult<()> {
138        Self::add_event(&self, id, index, routine)
139    }
140
141    async fn remove_event(&self, id: &str) -> BuckyResult<bool> {
142        let category = extract_router_event_category::<RouterEventRequest<REQ>>();
143        Self::remove_event(&self, category, id).await
144    }
145}
146
147impl RouterEventManagerProcessor for RouterEventManager {
148    fn test_event(&self) -> &dyn RouterEventProcessor<TestEventRequest, TestEventResponse> {
149        self
150    }
151
152    fn zone_role_changed_event(
153        &self,
154    ) -> &dyn RouterEventProcessor<ZoneRoleChangedEventRequest, ZoneRoleChangedEventResponse> {
155        self
156    }
157}