pe-core 0.1.0

Core types for Potential Expectations — messages, channels, state, traits
Documentation
//! ChannelStore — heterogeneous typed channel storage.
//!
//! Each entry is a type-erased `Channel` accessed by `TypeId`.
//! Uses ChannelId indirection (pattern from Bevy ECS) for fast indexed access
//! after initial type resolution.
//!
//! Based on Decision 1 (TypeId type-map) and research pattern P5.

use std::any::TypeId;
use std::collections::HashMap;

use crate::channel::Channel;

/// Opaque index into the channel storage. Created once from TypeId,
/// then used for fast direct access without hashing.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ChannelId(usize);

/// Heterogeneous typed channel map.
///
/// Stores type-erased `Channel` values indexed by `TypeId`.
/// Uses a two-level lookup: `TypeId → ChannelId (usize) → Channel`.
///
/// # Example
///
/// ```
/// use pe_core::channel::{LastValue, Appender};
/// use pe_core::channel_store::ChannelStore;
///
/// let mut store = ChannelStore::new();
/// store.insert::<LastValue<String>>(LastValue::new("hello".into()));
/// store.insert::<Appender<i32>>(Appender::new());
///
/// let val = store.get::<LastValue<String>>().unwrap();
/// assert_eq!(val.get(), "hello");
/// ```
pub struct ChannelStore {
    /// TypeId → index into `channels` vec
    id_map: HashMap<TypeId, ChannelId>,
    /// Indexed channel storage
    channels: Vec<Option<Box<dyn Channel>>>,
}

impl ChannelStore {
    /// Create an empty channel store.
    pub fn new() -> Self {
        Self {
            id_map: HashMap::new(),
            channels: Vec::new(),
        }
    }

    /// Insert a channel by its concrete type.
    /// Overwrites any existing channel of the same type.
    pub fn insert<T: Channel + 'static>(&mut self, channel: T) {
        let type_id = TypeId::of::<T>();
        if let Some(&id) = self.id_map.get(&type_id) {
            self.channels[id.0] = Some(Box::new(channel));
        } else {
            let id = ChannelId(self.channels.len());
            self.channels.push(Some(Box::new(channel)));
            self.id_map.insert(type_id, id);
        }
    }

    /// Get an immutable reference to a channel by its concrete type.
    pub fn get<T: Channel + 'static>(&self) -> Option<&T> {
        let type_id = TypeId::of::<T>();
        let &id = self.id_map.get(&type_id)?;
        self.channels[id.0]
            .as_ref()
            .and_then(|ch| ch.as_any().downcast_ref::<T>())
    }

    /// Get a mutable reference to a channel by its concrete type.
    pub fn get_mut<T: Channel + 'static>(&mut self) -> Option<&mut T> {
        let type_id = TypeId::of::<T>();
        let &id = self.id_map.get(&type_id)?;
        self.channels[id.0]
            .as_mut()
            .and_then(|ch| ch.as_any_mut().downcast_mut::<T>())
    }

    /// Get a channel by ChannelId (fast path, no hashing).
    pub fn get_by_id(&self, id: ChannelId) -> Option<&dyn Channel> {
        self.channels.get(id.0)?.as_deref()
    }

    /// Get a mutable channel by ChannelId (fast path, no hashing).
    pub fn get_by_id_mut(&mut self, id: ChannelId) -> Option<&mut dyn Channel> {
        self.channels.get_mut(id.0)?.as_deref_mut()
    }

    /// Look up the ChannelId for a type. Use this once, then use
    /// `get_by_id` for repeated fast access.
    pub fn id_of<T: 'static>(&self) -> Option<ChannelId> {
        self.id_map.get(&TypeId::of::<T>()).copied()
    }

    /// Clear all ephemeral and topic channels (called between supersteps).
    ///
    /// # REVIEW(002): Selective clearing via is_ephemeral()
    /// Only calls `clear()` on channels that opt in via `is_ephemeral() == true`.
    /// Previously called `clear()` on ALL channels, relying on persistent channels
    /// having no-op impls. The new approach is explicit and won't break if a future
    /// Channel type has a meaningful `clear()` that shouldn't run between supersteps.
    pub fn clear_ephemeral(&mut self) {
        for channel in self.channels.iter_mut().flatten() {
            if channel.is_ephemeral() {
                channel.clear();
            }
        }
    }

    /// Create a snapshot (deep clone) for snapshot isolation during parallel execution.
    /// Each channel is cloned via `clone_box()`.
    pub fn snapshot(&self) -> ChannelStore {
        let channels = self
            .channels
            .iter()
            .map(|ch| ch.as_ref().map(|c| c.clone_box()))
            .collect();

        ChannelStore {
            id_map: self.id_map.clone(),
            channels,
        }
    }

    /// Number of channels in the store.
    pub fn len(&self) -> usize {
        self.id_map.len()
    }

    /// Whether the store is empty.
    pub fn is_empty(&self) -> bool {
        self.id_map.is_empty()
    }
}

impl Default for ChannelStore {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::channel::{Appender, EphemeralValue, LastValue, Topic};

    #[test]
    fn test_insert_and_get_last_value() {
        let mut store = ChannelStore::new();
        store.insert(LastValue::new(42u32));

        let val = store.get::<LastValue<u32>>().unwrap();
        assert_eq!(*val.get(), 42);
    }

    #[test]
    fn test_insert_and_get_appender() {
        let mut store = ChannelStore::new();
        store.insert(Appender::<String>::with_initial(vec![
            "a".into(),
            "b".into(),
        ]));

        let val = store.get::<Appender<String>>().unwrap();
        assert_eq!(val.get(), &["a", "b"]);
    }

    #[test]
    fn test_get_missing_returns_none() {
        let store = ChannelStore::new();
        assert!(store.get::<LastValue<u32>>().is_none());
    }

    #[test]
    fn test_overwrite_same_type() {
        let mut store = ChannelStore::new();
        store.insert(LastValue::new(1u32));
        store.insert(LastValue::new(2u32));

        let val = store.get::<LastValue<u32>>().unwrap();
        assert_eq!(*val.get(), 2);
        assert_eq!(store.len(), 1);
    }

    #[test]
    fn test_multiple_types() {
        let mut store = ChannelStore::new();
        store.insert(LastValue::new(42u32));
        store.insert(LastValue::new("hello".to_string()));
        store.insert(Appender::<i32>::new());

        assert_eq!(store.len(), 3);
        assert_eq!(*store.get::<LastValue<u32>>().unwrap().get(), 42);
        assert_eq!(store.get::<LastValue<String>>().unwrap().get(), "hello");
        assert!(store.get::<Appender<i32>>().unwrap().get().is_empty());
    }

    #[test]
    fn test_get_mut_and_merge() {
        let mut store = ChannelStore::new();
        store.insert(Appender::<String>::new());

        let appender = store.get_mut::<Appender<String>>().unwrap();
        appender.merge(Box::new("item1".to_string()));
        appender.merge(Box::new("item2".to_string()));

        let appender = store.get::<Appender<String>>().unwrap();
        assert_eq!(appender.get(), &["item1", "item2"]);
    }

    #[test]
    fn test_channel_id_fast_path() {
        let mut store = ChannelStore::new();
        store.insert(LastValue::new(99u32));

        let id = store.id_of::<LastValue<u32>>().unwrap();
        let channel = store.get_by_id(id).unwrap();
        assert_eq!(channel.type_name(), "LastValue");
    }

    #[test]
    fn test_snapshot_isolation() {
        let mut store = ChannelStore::new();
        store.insert(LastValue::new(1u32));

        // Take snapshot
        let snapshot = store.snapshot();

        // Modify original
        store
            .get_mut::<LastValue<u32>>()
            .unwrap()
            .merge(Box::new(2u32));

        // Snapshot unchanged
        assert_eq!(*snapshot.get::<LastValue<u32>>().unwrap().get(), 1);
        // Original updated
        assert_eq!(*store.get::<LastValue<u32>>().unwrap().get(), 2);
    }

    #[test]
    fn test_clear_ephemeral() {
        let mut store = ChannelStore::new();
        store.insert(LastValue::new(42u32));
        store.insert(EphemeralValue::<String>::new());
        store.insert(Topic::<i32>::new());

        // Set values
        store
            .get_mut::<EphemeralValue<String>>()
            .unwrap()
            .merge(Box::new("signal".to_string()));
        store.get_mut::<Topic<i32>>().unwrap().merge(Box::new(1i32));

        assert!(
            store
                .get::<EphemeralValue<String>>()
                .unwrap()
                .get()
                .is_some()
        );
        assert!(!store.get::<Topic<i32>>().unwrap().get().is_empty());

        // Clear ephemeral channels
        store.clear_ephemeral();

        // Ephemeral and Topic cleared
        assert!(
            store
                .get::<EphemeralValue<String>>()
                .unwrap()
                .get()
                .is_none()
        );
        assert!(store.get::<Topic<i32>>().unwrap().get().is_empty());

        // LastValue NOT cleared
        assert_eq!(*store.get::<LastValue<u32>>().unwrap().get(), 42);
    }
}