cyfs_lib/events/
handler.rs1use super::category::*;
2use super::request::*;
3use super::ws::*;
4use crate::stack::*;
5use cyfs_base::*;
6use cyfs_util::*;
7
8use async_trait::async_trait;
9use http_types::Url;
10use std::fmt;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13
14#[async_trait]
15pub(crate) trait RouterEventAnyRoutine: Send + Sync {
16 async fn emit(&self, param: String) -> BuckyResult<String>;
17}
18
19pub(crate) struct RouterEventRoutineT<REQ, RESP>(
20 pub Box<dyn EventListenerAsyncRoutine<RouterEventRequest<REQ>, RouterEventResponse<RESP>>>,
21)
22where
23 REQ: Send + Sync + 'static + JsonCodec<REQ> + fmt::Display,
24 RESP: Send + Sync + 'static + JsonCodec<RESP> + fmt::Display;
25
26#[async_trait]
27impl<REQ, RESP> RouterEventAnyRoutine for RouterEventRoutineT<REQ, RESP>
28where
29 REQ: Send + Sync + 'static + JsonCodec<REQ> + fmt::Display,
30 RESP: Send + Sync + 'static + JsonCodec<RESP> + fmt::Display,
31{
32 async fn emit(&self, param: String) -> BuckyResult<String> {
33 let param = RouterEventRequest::<REQ>::decode_string(¶m)?;
34 self.0
35 .call(¶m)
36 .await
37 .map(|resp| JsonCodec::encode_string(&resp))
38 }
39}
40
41#[derive(Clone)]
42pub struct RouterEventManager {
43 dec_id: Option<SharedObjectStackDecID>,
44
45 inner: RouterWSEventManager,
46 started: Arc<AtomicBool>,
47}
48
49impl RouterEventManager {
50 pub fn new(dec_id: Option<SharedObjectStackDecID>, ws_url: Url) -> Self {
51 let inner = RouterWSEventManager::new(ws_url);
52
53 Self {
54 dec_id,
55 inner,
56 started: Arc::new(AtomicBool::new(false)),
57 }
58 }
59
60 pub fn clone_processor(&self) -> RouterEventManagerProcessorRef {
61 Arc::new(Box::new(self.clone()))
62 }
63
64 fn get_dec_id(&self) -> Option<ObjectId> {
65 self.dec_id.as_ref().map(|v| v.get().cloned()).flatten()
66 }
67
68 fn try_start(&self) {
69 match self
70 .started
71 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
72 {
73 Ok(_) => {
74 info!("will start event manager!");
75 self.inner.start()
76 }
77 Err(_) => {}
78 }
79 }
80
81 pub fn add_event<REQ, RESP>(
82 &self,
83 id: &str,
84 index: i32,
85 routine: Box<
86 dyn EventListenerAsyncRoutine<RouterEventRequest<REQ>, RouterEventResponse<RESP>>,
87 >,
88 ) -> BuckyResult<()>
89 where
90 REQ: Send + Sync + 'static + JsonCodec<REQ> + fmt::Display,
91 RESP: Send + Sync + 'static + JsonCodec<RESP> + fmt::Display,
92 RouterEventRequest<REQ>: RouterEventCategoryInfo,
93 {
94 info!(
95 "will add event: category={}, id={}, index={}",
96 extract_router_event_category::<RouterEventRequest<REQ>>(),
97 id,
98 index
99 );
100
101 self.try_start();
102
103 self.inner.add_event(id, self.get_dec_id(), index, routine)
104 }
105
106 pub async fn remove_event(&self, category: RouterEventCategory, id: &str) -> BuckyResult<bool> {
107 info!("will remove event: category={}, id={}", category, id,);
108
109 self.try_start();
110
111 self.inner
112 .remove_event(category, id, self.get_dec_id())
113 .await
114 }
115
116 pub async fn stop(&self) {
117 self.inner.stop().await
118 }
119}
120
121use super::processor::*;
122
123#[async_trait::async_trait]
124impl<REQ, RESP> RouterEventProcessor<REQ, RESP> for RouterEventManager
125where
126 REQ: Send + Sync + 'static + JsonCodec<REQ> + fmt::Display,
127 RESP: Send + Sync + 'static + JsonCodec<RESP> + fmt::Display,
128 RouterEventRequest<REQ>: RouterEventCategoryInfo,
129{
130 async fn add_event(
131 &self,
132 id: &str,
133 index: i32,
134 routine: Box<
135 dyn EventListenerAsyncRoutine<RouterEventRequest<REQ>, RouterEventResponse<RESP>>,
136 >,
137 ) -> BuckyResult<()> {
138 Self::add_event(&self, id, index, routine)
139 }
140
141 async fn remove_event(&self, id: &str) -> BuckyResult<bool> {
142 let category = extract_router_event_category::<RouterEventRequest<REQ>>();
143 Self::remove_event(&self, category, id).await
144 }
145}
146
147impl RouterEventManagerProcessor for RouterEventManager {
148 fn test_event(&self) -> &dyn RouterEventProcessor<TestEventRequest, TestEventResponse> {
149 self
150 }
151
152 fn zone_role_changed_event(
153 &self,
154 ) -> &dyn RouterEventProcessor<ZoneRoleChangedEventRequest, ZoneRoleChangedEventResponse> {
155 self
156 }
157}