1use std::any::Any;
2use std::sync::{
3 atomic::{AtomicU64, Ordering},
4 Arc,
5};
6
7use anyhow::Result;
8
9use crate::hooks::HookFut;
10use crate::{HookContext, HookResult, ServiceMethodKind};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13pub struct ListenerId(u64);
14
15static LISTENER_ID: AtomicU64 = AtomicU64::new(1);
16
17fn next_listener_id() -> ListenerId {
18 ListenerId(LISTENER_ID.fetch_add(1, Ordering::Relaxed))
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub enum ServiceEventKind {
24 Created,
25 Updated,
26 Patched,
27 Removed,
28 Custom(String),
29}
30
31impl ServiceEventKind {
32 pub fn custom(name: impl Into<String>) -> Self {
33 ServiceEventKind::Custom(name.into())
34 }
35}
36
37pub enum ServiceEventData<'a, R> {
39 Standard(&'a HookResult<R>),
40 Custom(&'a Arc<dyn Any + Send + Sync>),
41}
42
43pub type EventListener<R, P> = Arc<
45 dyn for<'a> Fn(&'a ServiceEventData<'a, R>, &'a HookContext<R, P>) -> HookFut<'a>
46 + Send
47 + Sync,
48>;
49
50pub type PublishFn<R, P> = Arc<
52 dyn for<'a> Fn(
53 &'a str,
54 &'a ServiceEventKind,
55 &'a ServiceEventData<'a, R>,
56 &'a HookContext<R, P>,
57 ) -> bool
58 + Send
59 + Sync,
60>;
61
62#[derive(Debug, Clone, PartialEq, Eq, Hash)]
63pub enum ServiceNamePat {
64 Any,
65 Exact(String),
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Hash)]
69pub enum EventPat {
70 Any,
71 Exact(ServiceEventKind),
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Hash)]
75pub struct ServiceEventPattern {
76 pub service: ServiceNamePat,
77 pub event: EventPat,
78}
79
80impl ServiceEventPattern {
81 pub fn exact(service: impl Into<String>, event: ServiceEventKind) -> Self {
82 Self {
83 service: ServiceNamePat::Exact(service.into()),
84 event: EventPat::Exact(event),
85 }
86 }
87
88 pub fn matches(&self, path: &str, event: &ServiceEventKind) -> bool {
89 let service_ok = match &self.service {
90 ServiceNamePat::Any => true,
91 ServiceNamePat::Exact(s) => s == path,
92 };
93 let event_ok = match &self.event {
94 EventPat::Any => true,
95 EventPat::Exact(e) => e == event,
96 };
97 service_ok && event_ok
98 }
99}
100
101#[derive(Clone)]
102struct ListenerEntry<R, P>
103where
104 R: Send + 'static,
105 P: Send + Clone + 'static,
106{
107 id: ListenerId,
108 pattern: ServiceEventPattern,
109 listener: EventListener<R, P>,
110 once: bool,
111}
112
113pub struct DogEventHub<R, P>
125where
126 R: Send + 'static,
127 P: Send + Clone + 'static,
128{
129 listeners: Vec<ListenerEntry<R, P>>,
130 publish: Option<PublishFn<R, P>>,
131}
132
133impl<R, P> Default for DogEventHub<R, P>
134where
135 R: Send + 'static,
136 P: Send + Clone + 'static,
137{
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143impl<R, P> DogEventHub<R, P>
144where
145 R: Send + 'static,
146 P: Send + Clone + 'static,
147{
148 pub fn new() -> Self {
149 Self {
150 listeners: Vec::new(),
151 publish: None,
152 }
153 }
154
155 pub fn set_publish(&mut self, f: PublishFn<R, P>) {
156 self.publish = Some(f);
157 }
158
159 pub fn clear_publish(&mut self) {
160 self.publish = None;
161 }
162
163 pub fn on_exact(
165 &mut self,
166 path: impl Into<String>,
167 event: ServiceEventKind,
168 listener: EventListener<R, P>,
169 ) -> ListenerId {
170 self.on_pattern(ServiceEventPattern::exact(path, event), listener)
171 }
172
173 pub fn on_pattern(&mut self, pattern: ServiceEventPattern, listener: EventListener<R, P>) -> ListenerId {
175 let id = next_listener_id();
176 self.listeners.push(ListenerEntry {
177 id,
178 pattern,
179 listener,
180 once: false,
181 });
182 id
183 }
184
185 pub fn once_pattern(&mut self, pattern: ServiceEventPattern, listener: EventListener<R, P>) -> ListenerId {
187 let id = next_listener_id();
188 self.listeners.push(ListenerEntry {
189 id,
190 pattern,
191 listener,
192 once: true,
193 });
194 id
195 }
196
197 pub fn off(&mut self, id: ListenerId) -> bool {
199 let before = self.listeners.len();
200 self.listeners.retain(|e| e.id != id);
201 before != self.listeners.len()
202 }
203
204 pub fn remove_all(&mut self, pattern: Option<&ServiceEventPattern>) -> usize {
206 let before = self.listeners.len();
207 if let Some(p) = pattern {
208 self.listeners.retain(|e| &e.pattern != p);
209 } else {
210 self.listeners.clear();
211 }
212 before - self.listeners.len()
213 }
214
215 pub fn snapshot_emit<'a>(
219 &'a self,
220 path: &str,
221 event: &ServiceEventKind,
222 data: &ServiceEventData<'a, R>,
223 ctx: &HookContext<R, P>,
224 ) -> (Vec<EventListener<R, P>>, Vec<ListenerId>) {
225 if let Some(publish) = &self.publish {
226 if !(publish)(path, event, data, ctx) {
227 return (Vec::new(), Vec::new());
228 }
229 }
230
231 let mut to_call: Vec<EventListener<R, P>> = Vec::new();
232 let mut once_ids: Vec<ListenerId> = Vec::new();
233
234 for entry in &self.listeners {
235 if entry.pattern.matches(path, event) {
236 to_call.push(entry.listener.clone());
237 if entry.once {
238 once_ids.push(entry.id);
239 }
240 }
241 }
242
243 (to_call, once_ids)
244 }
245
246 pub fn finalize_once_removals(&mut self, once_ids: &[ListenerId]) {
250 if once_ids.is_empty() {
251 return;
252 }
253 self.listeners.retain(|e| !once_ids.contains(&e.id));
254 }
255
256 pub async fn emit_async(
259 &mut self,
260 path: &str,
261 event: &ServiceEventKind,
262 data: &ServiceEventData<'_, R>,
263 ctx: &HookContext<R, P>,
264 ) -> Result<()> {
265 let (listeners, once_ids) = {
267 let hub: &Self = &*self;
269 hub.snapshot_emit(path, event, data, ctx)
270 };
271
272 for f in &listeners {
274 f(data, ctx).await?;
275 }
276
277 self.finalize_once_removals(&once_ids);
279
280 Ok(())
281 }
282}
283
284pub fn method_to_standard_event(method: &ServiceMethodKind) -> Option<ServiceEventKind> {
286 match method {
287 ServiceMethodKind::Create => Some(ServiceEventKind::Created),
288 ServiceMethodKind::Update => Some(ServiceEventKind::Updated),
289 ServiceMethodKind::Patch => Some(ServiceEventKind::Patched),
290 ServiceMethodKind::Remove => Some(ServiceEventKind::Removed),
291 _ => None,
292 }
293}
294
295pub fn parse_event_pattern(input: &str) -> anyhow::Result<ServiceEventPattern> {
297 let s = input.trim();
298
299 let (svc, ev) = if let Some((a, b)) = s.split_once(' ') {
300 (a.trim(), b.trim())
301 } else if let Some((a, b)) = s.split_once('.') {
302 (a.trim(), b.trim())
303 } else {
304 return Err(anyhow::anyhow!(
305 "Invalid event pattern '{s}'. Expected 'service event' or 'service.event'."
306 ));
307 };
308
309 let service = if svc == "*" {
310 ServiceNamePat::Any
311 } else {
312 ServiceNamePat::Exact(svc.to_string())
313 };
314
315 let event = if ev == "*" {
316 EventPat::Any
317 } else {
318 EventPat::Exact(parse_event_kind(ev)?)
319 };
320
321 Ok(ServiceEventPattern { service, event })
322}
323
324pub fn parse_event_kind(s: &str) -> anyhow::Result<ServiceEventKind> {
325 let norm = s.trim().to_lowercase();
326 match norm.as_str() {
327 "created" => Ok(ServiceEventKind::Created),
328 "updated" => Ok(ServiceEventKind::Updated),
329 "patched" => Ok(ServiceEventKind::Patched),
330 "removed" => Ok(ServiceEventKind::Removed),
331 other => Ok(ServiceEventKind::Custom(other.to_string())),
332 }
333}
334
335pub fn listener_id(id: ListenerId) -> ListenerId {
337 id
338}