Skip to main content

arkhe_kernel/runtime/
observer.rs

1//! `KernelObserver` trait + panic-resilient `ObserverRegistry`.
2//!
3//! Observer panic policy: first panic evicts. Eviction is recorded
4//! in `evicted` set so subsequent deliveries skip the handle even if a
5//! pathological observer were re-inserted with the same handle (it cannot
6//! be — `next_handle` is monotonic).
7
8use std::collections::{BTreeMap, BTreeSet};
9use std::panic::{catch_unwind, AssertUnwindSafe};
10
11use crate::runtime::event::{EventMask, KernelEvent, ObserverHandle};
12
13/// Sink for [`KernelEvent`]s. Implementations are invoked in
14/// `BTreeMap<ObserverHandle, _>` order during the post-commit drain
15/// of `Kernel::step`. Panics inside `on_event` are caught and result
16/// in first-panic eviction of the observer (A22).
17///
18/// Implementors must be `Send + Sync` because the kernel may move
19/// observers across threads in higher-level setups (e.g. for
20/// off-thread fan-out); the kernel itself remains single-threaded.
21pub trait KernelObserver: Send + Sync {
22    /// Receive one kernel event. Filtered by
23    /// [`super::event::EventMask`] when registered via
24    /// [`super::kernel::Kernel::register_observer_filtered`];
25    /// `register_observer` is the unfiltered shorthand.
26    fn on_event(&self, event: &KernelEvent);
27}
28
29struct ObserverSlot {
30    observer: Box<dyn KernelObserver>,
31    mask: EventMask,
32}
33
34pub(crate) struct ObserverRegistry {
35    slots: BTreeMap<ObserverHandle, ObserverSlot>,
36    evicted: BTreeSet<ObserverHandle>,
37    next_handle: u16,
38}
39
40impl ObserverRegistry {
41    pub(crate) fn new() -> Self {
42        Self {
43            slots: BTreeMap::new(),
44            evicted: BTreeSet::new(),
45            next_handle: 0,
46        }
47    }
48
49    /// Register with `EventMask::ALL` (every variant delivered).
50    pub(crate) fn register(&mut self, obs: Box<dyn KernelObserver>) -> ObserverHandle {
51        self.register_filtered(obs, EventMask::ALL)
52    }
53
54    /// Register with an event-class filter — only events whose variant
55    /// bit is set in `mask` are delivered.
56    pub(crate) fn register_filtered(
57        &mut self,
58        obs: Box<dyn KernelObserver>,
59        mask: EventMask,
60    ) -> ObserverHandle {
61        let h = ObserverHandle(self.next_handle);
62        self.next_handle = self.next_handle.saturating_add(1);
63        self.slots.insert(
64            h,
65            ObserverSlot {
66                observer: obs,
67                mask,
68            },
69        );
70        h
71    }
72
73    /// Deliver `event` to every non-evicted observer whose mask matches.
74    /// Panics are caught and the offending observer is evicted. Returns
75    /// the handles of any observers newly evicted by this call.
76    pub(crate) fn deliver(&mut self, event: &KernelEvent) -> Vec<ObserverHandle> {
77        let mut newly_evicted = Vec::new();
78        let keys: Vec<ObserverHandle> = self.slots.keys().copied().collect();
79        for handle in keys {
80            if self.evicted.contains(&handle) {
81                continue;
82            }
83            let slot = match self.slots.get(&handle) {
84                Some(s) => s,
85                None => continue,
86            };
87            if !slot.mask.matches(event) {
88                continue;
89            }
90            let result = catch_unwind(AssertUnwindSafe(|| slot.observer.on_event(event)));
91            if result.is_err() {
92                self.evicted.insert(handle);
93                self.slots.remove(&handle);
94                newly_evicted.push(handle);
95            }
96        }
97        newly_evicted
98    }
99
100    #[cfg_attr(not(test), allow(dead_code))]
101    pub(crate) fn is_evicted(&self, h: ObserverHandle) -> bool {
102        self.evicted.contains(&h)
103    }
104
105    #[cfg_attr(not(test), allow(dead_code))]
106    pub(crate) fn len(&self) -> usize {
107        self.slots.len()
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use crate::abi::{InstanceId, Tick, TypeCode};
115    use std::sync::atomic::{AtomicU32, Ordering};
116    use std::sync::Arc;
117
118    struct CountingObserver {
119        count: Arc<AtomicU32>,
120    }
121    impl KernelObserver for CountingObserver {
122        fn on_event(&self, _event: &KernelEvent) {
123            self.count.fetch_add(1, Ordering::SeqCst);
124        }
125    }
126
127    struct PanicObserver;
128    impl KernelObserver for PanicObserver {
129        fn on_event(&self, _event: &KernelEvent) {
130            panic!("intentional");
131        }
132    }
133
134    fn evt() -> KernelEvent {
135        KernelEvent::ActionExecuted {
136            instance: InstanceId::new(1).unwrap(),
137            action_type: TypeCode(1),
138            at: Tick(0),
139        }
140    }
141
142    #[test]
143    fn register_and_len() {
144        let mut r = ObserverRegistry::new();
145        let count = Arc::new(AtomicU32::new(0));
146        let _h = r.register(Box::new(CountingObserver {
147            count: count.clone(),
148        }));
149        assert_eq!(r.len(), 1);
150    }
151
152    #[test]
153    fn deliver_increments_observer_count() {
154        let mut r = ObserverRegistry::new();
155        let count = Arc::new(AtomicU32::new(0));
156        r.register(Box::new(CountingObserver {
157            count: count.clone(),
158        }));
159        r.deliver(&evt());
160        r.deliver(&evt());
161        assert_eq!(count.load(Ordering::SeqCst), 2);
162    }
163
164    #[test]
165    fn first_panic_evicts_observer() {
166        let mut r = ObserverRegistry::new();
167        let h = r.register(Box::new(PanicObserver));
168        let evicted = r.deliver(&evt());
169        assert_eq!(evicted, vec![h]);
170        assert!(r.is_evicted(h));
171        assert_eq!(r.len(), 0);
172    }
173
174    #[test]
175    fn evicted_observer_does_not_receive_subsequent_events() {
176        let mut r = ObserverRegistry::new();
177        let count = Arc::new(AtomicU32::new(0));
178        r.register(Box::new(PanicObserver));
179        r.register(Box::new(CountingObserver {
180            count: count.clone(),
181        }));
182        r.deliver(&evt());
183        r.deliver(&evt());
184        // Counting observer received both; PanicObserver evicted on first.
185        assert_eq!(count.load(Ordering::SeqCst), 2);
186    }
187
188    #[test]
189    fn handles_are_monotonic() {
190        let mut r = ObserverRegistry::new();
191        let count = Arc::new(AtomicU32::new(0));
192        let h0 = r.register(Box::new(CountingObserver {
193            count: count.clone(),
194        }));
195        let h1 = r.register(Box::new(CountingObserver {
196            count: count.clone(),
197        }));
198        assert!(h0 < h1);
199    }
200}