arkhe_kernel/runtime/
observer.rs1use std::collections::{BTreeMap, BTreeSet};
9use std::panic::{catch_unwind, AssertUnwindSafe};
10
11use crate::runtime::event::{EventMask, KernelEvent, ObserverHandle};
12
13pub trait KernelObserver: Send + Sync {
22 fn on_event(&self, event: &KernelEvent);
27}
28
29struct ObserverSlot {
30 observer: Box<dyn KernelObserver>,
31 mask: EventMask,
32}
33
34pub(crate) struct ObserverRegistry {
35 slots: BTreeMap<ObserverHandle, ObserverSlot>,
36 evicted: BTreeSet<ObserverHandle>,
37 next_handle: u16,
38}
39
40impl ObserverRegistry {
41 pub(crate) fn new() -> Self {
42 Self {
43 slots: BTreeMap::new(),
44 evicted: BTreeSet::new(),
45 next_handle: 0,
46 }
47 }
48
49 pub(crate) fn register(&mut self, obs: Box<dyn KernelObserver>) -> ObserverHandle {
51 self.register_filtered(obs, EventMask::ALL)
52 }
53
54 pub(crate) fn register_filtered(
57 &mut self,
58 obs: Box<dyn KernelObserver>,
59 mask: EventMask,
60 ) -> ObserverHandle {
61 let h = ObserverHandle(self.next_handle);
62 self.next_handle = self.next_handle.saturating_add(1);
63 self.slots.insert(
64 h,
65 ObserverSlot {
66 observer: obs,
67 mask,
68 },
69 );
70 h
71 }
72
73 pub(crate) fn deliver(&mut self, event: &KernelEvent) -> Vec<ObserverHandle> {
77 let mut newly_evicted = Vec::new();
78 let keys: Vec<ObserverHandle> = self.slots.keys().copied().collect();
79 for handle in keys {
80 if self.evicted.contains(&handle) {
81 continue;
82 }
83 let slot = match self.slots.get(&handle) {
84 Some(s) => s,
85 None => continue,
86 };
87 if !slot.mask.matches(event) {
88 continue;
89 }
90 let result = catch_unwind(AssertUnwindSafe(|| slot.observer.on_event(event)));
91 if result.is_err() {
92 self.evicted.insert(handle);
93 self.slots.remove(&handle);
94 newly_evicted.push(handle);
95 }
96 }
97 newly_evicted
98 }
99
100 #[cfg_attr(not(test), allow(dead_code))]
101 pub(crate) fn is_evicted(&self, h: ObserverHandle) -> bool {
102 self.evicted.contains(&h)
103 }
104
105 #[cfg_attr(not(test), allow(dead_code))]
106 pub(crate) fn len(&self) -> usize {
107 self.slots.len()
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use crate::abi::{InstanceId, Tick, TypeCode};
115 use std::sync::atomic::{AtomicU32, Ordering};
116 use std::sync::Arc;
117
118 struct CountingObserver {
119 count: Arc<AtomicU32>,
120 }
121 impl KernelObserver for CountingObserver {
122 fn on_event(&self, _event: &KernelEvent) {
123 self.count.fetch_add(1, Ordering::SeqCst);
124 }
125 }
126
127 struct PanicObserver;
128 impl KernelObserver for PanicObserver {
129 fn on_event(&self, _event: &KernelEvent) {
130 panic!("intentional");
131 }
132 }
133
134 fn evt() -> KernelEvent {
135 KernelEvent::ActionExecuted {
136 instance: InstanceId::new(1).unwrap(),
137 action_type: TypeCode(1),
138 at: Tick(0),
139 }
140 }
141
142 #[test]
143 fn register_and_len() {
144 let mut r = ObserverRegistry::new();
145 let count = Arc::new(AtomicU32::new(0));
146 let _h = r.register(Box::new(CountingObserver {
147 count: count.clone(),
148 }));
149 assert_eq!(r.len(), 1);
150 }
151
152 #[test]
153 fn deliver_increments_observer_count() {
154 let mut r = ObserverRegistry::new();
155 let count = Arc::new(AtomicU32::new(0));
156 r.register(Box::new(CountingObserver {
157 count: count.clone(),
158 }));
159 r.deliver(&evt());
160 r.deliver(&evt());
161 assert_eq!(count.load(Ordering::SeqCst), 2);
162 }
163
164 #[test]
165 fn first_panic_evicts_observer() {
166 let mut r = ObserverRegistry::new();
167 let h = r.register(Box::new(PanicObserver));
168 let evicted = r.deliver(&evt());
169 assert_eq!(evicted, vec![h]);
170 assert!(r.is_evicted(h));
171 assert_eq!(r.len(), 0);
172 }
173
174 #[test]
175 fn evicted_observer_does_not_receive_subsequent_events() {
176 let mut r = ObserverRegistry::new();
177 let count = Arc::new(AtomicU32::new(0));
178 r.register(Box::new(PanicObserver));
179 r.register(Box::new(CountingObserver {
180 count: count.clone(),
181 }));
182 r.deliver(&evt());
183 r.deliver(&evt());
184 assert_eq!(count.load(Ordering::SeqCst), 2);
186 }
187
188 #[test]
189 fn handles_are_monotonic() {
190 let mut r = ObserverRegistry::new();
191 let count = Arc::new(AtomicU32::new(0));
192 let h0 = r.register(Box::new(CountingObserver {
193 count: count.clone(),
194 }));
195 let h1 = r.register(Box::new(CountingObserver {
196 count: count.clone(),
197 }));
198 assert!(h0 < h1);
199 }
200}