Skip to main content

pe_core/
channel_store.rs

1//! ChannelStore — heterogeneous typed channel storage.
2//!
3//! Each entry is a type-erased `Channel` accessed by `TypeId`.
4//! Uses ChannelId indirection (pattern from Bevy ECS) for fast indexed access
5//! after initial type resolution.
6//!
7//! Based on Decision 1 (TypeId type-map) and research pattern P5.
8
9use std::any::TypeId;
10use std::collections::HashMap;
11
12use crate::channel::Channel;
13
14/// Opaque index into the channel storage. Created once from TypeId,
15/// then used for fast direct access without hashing.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17pub struct ChannelId(usize);
18
19/// Heterogeneous typed channel map.
20///
21/// Stores type-erased `Channel` values indexed by `TypeId`.
22/// Uses a two-level lookup: `TypeId → ChannelId (usize) → Channel`.
23///
24/// # Example
25///
26/// ```
27/// use pe_core::channel::{LastValue, Appender};
28/// use pe_core::channel_store::ChannelStore;
29///
30/// let mut store = ChannelStore::new();
31/// store.insert::<LastValue<String>>(LastValue::new("hello".into()));
32/// store.insert::<Appender<i32>>(Appender::new());
33///
34/// let val = store.get::<LastValue<String>>().unwrap();
35/// assert_eq!(val.get(), "hello");
36/// ```
37pub struct ChannelStore {
38    /// TypeId → index into `channels` vec
39    id_map: HashMap<TypeId, ChannelId>,
40    /// Indexed channel storage
41    channels: Vec<Option<Box<dyn Channel>>>,
42}
43
44impl ChannelStore {
45    /// Create an empty channel store.
46    pub fn new() -> Self {
47        Self {
48            id_map: HashMap::new(),
49            channels: Vec::new(),
50        }
51    }
52
53    /// Insert a channel by its concrete type.
54    /// Overwrites any existing channel of the same type.
55    pub fn insert<T: Channel + 'static>(&mut self, channel: T) {
56        let type_id = TypeId::of::<T>();
57        if let Some(&id) = self.id_map.get(&type_id) {
58            self.channels[id.0] = Some(Box::new(channel));
59        } else {
60            let id = ChannelId(self.channels.len());
61            self.channels.push(Some(Box::new(channel)));
62            self.id_map.insert(type_id, id);
63        }
64    }
65
66    /// Get an immutable reference to a channel by its concrete type.
67    pub fn get<T: Channel + 'static>(&self) -> Option<&T> {
68        let type_id = TypeId::of::<T>();
69        let &id = self.id_map.get(&type_id)?;
70        self.channels[id.0]
71            .as_ref()
72            .and_then(|ch| ch.as_any().downcast_ref::<T>())
73    }
74
75    /// Get a mutable reference to a channel by its concrete type.
76    pub fn get_mut<T: Channel + 'static>(&mut self) -> Option<&mut T> {
77        let type_id = TypeId::of::<T>();
78        let &id = self.id_map.get(&type_id)?;
79        self.channels[id.0]
80            .as_mut()
81            .and_then(|ch| ch.as_any_mut().downcast_mut::<T>())
82    }
83
84    /// Get a channel by ChannelId (fast path, no hashing).
85    pub fn get_by_id(&self, id: ChannelId) -> Option<&dyn Channel> {
86        self.channels.get(id.0)?.as_deref()
87    }
88
89    /// Get a mutable channel by ChannelId (fast path, no hashing).
90    pub fn get_by_id_mut(&mut self, id: ChannelId) -> Option<&mut dyn Channel> {
91        self.channels.get_mut(id.0)?.as_deref_mut()
92    }
93
94    /// Look up the ChannelId for a type. Use this once, then use
95    /// `get_by_id` for repeated fast access.
96    pub fn id_of<T: 'static>(&self) -> Option<ChannelId> {
97        self.id_map.get(&TypeId::of::<T>()).copied()
98    }
99
100    /// Clear all ephemeral and topic channels (called between supersteps).
101    ///
102    /// # REVIEW(002): Selective clearing via is_ephemeral()
103    /// Only calls `clear()` on channels that opt in via `is_ephemeral() == true`.
104    /// Previously called `clear()` on ALL channels, relying on persistent channels
105    /// having no-op impls. The new approach is explicit and won't break if a future
106    /// Channel type has a meaningful `clear()` that shouldn't run between supersteps.
107    pub fn clear_ephemeral(&mut self) {
108        for channel in self.channels.iter_mut().flatten() {
109            if channel.is_ephemeral() {
110                channel.clear();
111            }
112        }
113    }
114
115    /// Create a snapshot (deep clone) for snapshot isolation during parallel execution.
116    /// Each channel is cloned via `clone_box()`.
117    pub fn snapshot(&self) -> ChannelStore {
118        let channels = self
119            .channels
120            .iter()
121            .map(|ch| ch.as_ref().map(|c| c.clone_box()))
122            .collect();
123
124        ChannelStore {
125            id_map: self.id_map.clone(),
126            channels,
127        }
128    }
129
130    /// Number of channels in the store.
131    pub fn len(&self) -> usize {
132        self.id_map.len()
133    }
134
135    /// Whether the store is empty.
136    pub fn is_empty(&self) -> bool {
137        self.id_map.is_empty()
138    }
139}
140
141impl Default for ChannelStore {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::channel::{Appender, EphemeralValue, LastValue, Topic};
151
152    #[test]
153    fn test_insert_and_get_last_value() {
154        let mut store = ChannelStore::new();
155        store.insert(LastValue::new(42u32));
156
157        let val = store.get::<LastValue<u32>>().unwrap();
158        assert_eq!(*val.get(), 42);
159    }
160
161    #[test]
162    fn test_insert_and_get_appender() {
163        let mut store = ChannelStore::new();
164        store.insert(Appender::<String>::with_initial(vec![
165            "a".into(),
166            "b".into(),
167        ]));
168
169        let val = store.get::<Appender<String>>().unwrap();
170        assert_eq!(val.get(), &["a", "b"]);
171    }
172
173    #[test]
174    fn test_get_missing_returns_none() {
175        let store = ChannelStore::new();
176        assert!(store.get::<LastValue<u32>>().is_none());
177    }
178
179    #[test]
180    fn test_overwrite_same_type() {
181        let mut store = ChannelStore::new();
182        store.insert(LastValue::new(1u32));
183        store.insert(LastValue::new(2u32));
184
185        let val = store.get::<LastValue<u32>>().unwrap();
186        assert_eq!(*val.get(), 2);
187        assert_eq!(store.len(), 1);
188    }
189
190    #[test]
191    fn test_multiple_types() {
192        let mut store = ChannelStore::new();
193        store.insert(LastValue::new(42u32));
194        store.insert(LastValue::new("hello".to_string()));
195        store.insert(Appender::<i32>::new());
196
197        assert_eq!(store.len(), 3);
198        assert_eq!(*store.get::<LastValue<u32>>().unwrap().get(), 42);
199        assert_eq!(store.get::<LastValue<String>>().unwrap().get(), "hello");
200        assert!(store.get::<Appender<i32>>().unwrap().get().is_empty());
201    }
202
203    #[test]
204    fn test_get_mut_and_merge() {
205        let mut store = ChannelStore::new();
206        store.insert(Appender::<String>::new());
207
208        let appender = store.get_mut::<Appender<String>>().unwrap();
209        appender.merge(Box::new("item1".to_string()));
210        appender.merge(Box::new("item2".to_string()));
211
212        let appender = store.get::<Appender<String>>().unwrap();
213        assert_eq!(appender.get(), &["item1", "item2"]);
214    }
215
216    #[test]
217    fn test_channel_id_fast_path() {
218        let mut store = ChannelStore::new();
219        store.insert(LastValue::new(99u32));
220
221        let id = store.id_of::<LastValue<u32>>().unwrap();
222        let channel = store.get_by_id(id).unwrap();
223        assert_eq!(channel.type_name(), "LastValue");
224    }
225
226    #[test]
227    fn test_snapshot_isolation() {
228        let mut store = ChannelStore::new();
229        store.insert(LastValue::new(1u32));
230
231        // Take snapshot
232        let snapshot = store.snapshot();
233
234        // Modify original
235        store
236            .get_mut::<LastValue<u32>>()
237            .unwrap()
238            .merge(Box::new(2u32));
239
240        // Snapshot unchanged
241        assert_eq!(*snapshot.get::<LastValue<u32>>().unwrap().get(), 1);
242        // Original updated
243        assert_eq!(*store.get::<LastValue<u32>>().unwrap().get(), 2);
244    }
245
246    #[test]
247    fn test_clear_ephemeral() {
248        let mut store = ChannelStore::new();
249        store.insert(LastValue::new(42u32));
250        store.insert(EphemeralValue::<String>::new());
251        store.insert(Topic::<i32>::new());
252
253        // Set values
254        store
255            .get_mut::<EphemeralValue<String>>()
256            .unwrap()
257            .merge(Box::new("signal".to_string()));
258        store.get_mut::<Topic<i32>>().unwrap().merge(Box::new(1i32));
259
260        assert!(
261            store
262                .get::<EphemeralValue<String>>()
263                .unwrap()
264                .get()
265                .is_some()
266        );
267        assert!(!store.get::<Topic<i32>>().unwrap().get().is_empty());
268
269        // Clear ephemeral channels
270        store.clear_ephemeral();
271
272        // Ephemeral and Topic cleared
273        assert!(
274            store
275                .get::<EphemeralValue<String>>()
276                .unwrap()
277                .get()
278                .is_none()
279        );
280        assert!(store.get::<Topic<i32>>().unwrap().get().is_empty());
281
282        // LastValue NOT cleared
283        assert_eq!(*store.get::<LastValue<u32>>().unwrap().get(), 42);
284    }
285}