text_document_frontend/
event_hub_client.rs1use common::event::{Event, EventHub, Origin};
3use flume::Receiver;
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::thread;
7
8pub type EventCallback = Box<dyn Fn(Event) + Send>;
10
11#[derive(Clone)]
14pub struct EventHubClient {
15 subscribers: Arc<Mutex<HashMap<Origin, Vec<EventCallback>>>>,
16 receiver: Receiver<Event>,
17}
18
19impl EventHubClient {
20 pub fn new(event_hub: &EventHub) -> Self {
22 EventHubClient {
23 subscribers: Arc::new(Mutex::new(HashMap::new())),
24 receiver: event_hub.subscribe_receiver(),
25 }
26 }
27
28 pub fn subscribe<F>(&self, origin: Origin, callback: F)
30 where
31 F: Fn(Event) + Send + 'static,
32 {
33 let mut subs = self.subscribers.lock().unwrap();
34 subs.entry(origin).or_default().push(Box::new(callback));
35 }
36
37 pub fn start(&self, quit_signal: Arc<std::sync::atomic::AtomicBool>) {
40 let receiver = self.receiver.clone();
41 let subscribers = Arc::clone(&self.subscribers);
42 let quit_signal = Arc::clone(&quit_signal);
43
44 log::info!("EventHubClient starting event loop");
45
46 thread::spawn(move || {
47 log::info!("EventHubClient event loop started");
48 loop {
49 match receiver.recv_timeout(std::time::Duration::from_millis(200)) {
50 Ok(event) => {
51 log::debug!("EventHubClient received event: {:?}", event);
52 let subs = subscribers.lock().unwrap();
53 if let Some(callbacks) = subs.get(&event.origin) {
54 for callback in callbacks {
55 callback(event.clone());
56 }
57 }
58 }
59 Err(flume::RecvTimeoutError::Timeout) => {
60 }
62 Err(flume::RecvTimeoutError::Disconnected) => {
63 log::info!("EventHubClient channel disconnected");
64 break;
65 }
66 }
67
68 if quit_signal.load(std::sync::atomic::Ordering::Relaxed) {
69 log::info!("EventHubClient quitting event loop");
70 break;
71 }
72 }
73 });
74 }
75}