pe-core 0.1.0

Core types for Potential Expectations — messages, channels, state, traits
Documentation
//! Channel system — the actual state mechanism underneath everything.
//!
//! Every state field is a channel. Each channel type has different update semantics.
//! Based on Group 4 of the pre-plan.

use serde::{Serialize, de::DeserializeOwned};
use std::any::Any;

use crate::error::PeError;

/// Core channel trait — knows how to merge updates into itself.
///
/// Every state field is backed by a channel. The channel type determines
/// merge semantics (overwrite, append, ephemeral, etc.).
pub trait Channel: Any + Send + Sync {
    /// Merge a boxed update value into this channel.
    fn merge(&mut self, update: Box<dyn Any + Send>);

    /// Clone the channel (for snapshot isolation during parallel execution).
    fn clone_box(&self) -> Box<dyn Channel>;

    /// Clear the channel (for ephemeral values — called between supersteps).
    fn clear(&mut self);

    /// Whether this channel should be cleared between supersteps.
    ///
    /// # REVIEW(002): Selective clearing
    /// `ChannelStore::clear_ephemeral()` previously called `clear()` on ALL
    /// channels, relying on LastValue/Appender having no-op `clear()` impls.
    /// This is fragile: any future Channel with a meaningful `clear()` that
    /// shouldn't run between supersteps would silently lose data. This method
    /// lets the store ask each channel whether it participates in superstep clearing.
    fn is_ephemeral(&self) -> bool {
        false // default: persistent channels don't clear
    }

    /// Serialize channel state for checkpointing.
    ///
    /// # REVIEW(002): Returns `Result` instead of `Vec<u8>`
    /// Previously returned `Vec<u8>` with `unwrap_or_default()` on serialization
    /// failure, which would silently produce corrupt/empty checkpoint data.
    /// For a library promising durable execution, silent data loss is unacceptable.
    /// Callers must handle the error (log, retry, abort checkpoint).
    fn checkpoint(&self) -> Result<Vec<u8>, PeError>;

    /// Name of this channel type (for debugging).
    fn type_name(&self) -> &'static str;

    /// Downcast support — returns self as `&dyn Any`.
    fn as_any(&self) -> &dyn Any;

    /// Downcast support — returns self as `&mut dyn Any`.
    fn as_any_mut(&mut self) -> &mut dyn Any;
}

/// LastValue — stores the last written value. Default channel type.
/// At most one update per step. Last write wins.
#[derive(Debug, Clone)]
pub struct LastValue<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
    value: T,
}

impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> LastValue<T> {
    pub fn new(initial: T) -> Self {
        Self { value: initial }
    }

    pub fn get(&self) -> &T {
        &self.value
    }
}

impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for LastValue<T> {
    fn merge(&mut self, update: Box<dyn Any + Send>) {
        if let Ok(val) = update.downcast::<T>() {
            self.value = *val;
        }
    }

    fn clone_box(&self) -> Box<dyn Channel> {
        Box::new(self.clone())
    }

    fn clear(&mut self) {
        // LastValue does NOT clear between steps
    }

    fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
        bincode::serialize(&self.value).map_err(|e| PeError::Storage {
            details: format!("LastValue checkpoint failed: {e}"),
        })
    }

    fn type_name(&self) -> &'static str {
        "LastValue"
    }

    fn as_any(&self) -> &dyn Any {
        self
    }

    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
}

/// Appender — collects values into a Vec. Multiple updates per step allowed.
/// Used for messages, tool calls, build results, etc.
#[derive(Debug, Clone)]
pub struct Appender<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
    values: Vec<T>,
}

impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default for Appender<T> {
    fn default() -> Self {
        Self { values: vec![] }
    }
}

impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Appender<T> {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn with_initial(values: Vec<T>) -> Self {
        Self { values }
    }

    pub fn get(&self) -> &[T] {
        &self.values
    }
}

impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for Appender<T> {
    fn merge(&mut self, update: Box<dyn Any + Send>) {
        // Try Vec<T> first; if that fails, downcast returns Err with the original Box
        match update.downcast::<Vec<T>>() {
            Ok(items) => self.values.extend(*items),
            Err(update) => {
                if let Ok(item) = update.downcast::<T>() {
                    self.values.push(*item);
                }
            }
        }
    }

    fn clone_box(&self) -> Box<dyn Channel> {
        Box::new(self.clone())
    }

    fn clear(&mut self) {
        // Appender does NOT clear between steps
    }

    fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
        bincode::serialize(&self.values).map_err(|e| PeError::Storage {
            details: format!("Appender checkpoint failed: {e}"),
        })
    }

    fn type_name(&self) -> &'static str {
        "Appender"
    }

    fn as_any(&self) -> &dyn Any {
        self
    }

    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
}

/// EphemeralValue — cleared after each superstep. One-time signal.
/// Used for temporary routing signals, one-shot flags.
#[derive(Debug, Clone)]
pub struct EphemeralValue<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
    value: Option<T>,
}

impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default
    for EphemeralValue<T>
{
    fn default() -> Self {
        Self { value: None }
    }
}

impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> EphemeralValue<T> {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn get(&self) -> Option<&T> {
        self.value.as_ref()
    }
}

impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel
    for EphemeralValue<T>
{
    fn merge(&mut self, update: Box<dyn Any + Send>) {
        if let Ok(val) = update.downcast::<T>() {
            self.value = Some(*val);
        }
    }

    fn clone_box(&self) -> Box<dyn Channel> {
        Box::new(self.clone())
    }

    fn clear(&mut self) {
        self.value = None; // Cleared every superstep
    }

    fn is_ephemeral(&self) -> bool {
        true
    }

    fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
        bincode::serialize(&self.value).map_err(|e| PeError::Storage {
            details: format!("EphemeralValue checkpoint failed: {e}"),
        })
    }

    fn type_name(&self) -> &'static str {
        "EphemeralValue"
    }

    fn as_any(&self) -> &dyn Any {
        self
    }

    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
}

/// Topic — collects multiple updates into a list per step. PubSub style.
/// Multiple nodes can write to the same topic in one superstep.
#[derive(Debug, Clone)]
pub struct Topic<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
    values: Vec<T>,
}

impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default for Topic<T> {
    fn default() -> Self {
        Self { values: vec![] }
    }
}

impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Topic<T> {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn get(&self) -> &[T] {
        &self.values
    }
}

impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for Topic<T> {
    fn merge(&mut self, update: Box<dyn Any + Send>) {
        match update.downcast::<Vec<T>>() {
            Ok(items) => self.values.extend(*items),
            Err(update) => {
                if let Ok(item) = update.downcast::<T>() {
                    self.values.push(*item);
                }
            }
        }
    }

    fn clone_box(&self) -> Box<dyn Channel> {
        Box::new(self.clone())
    }

    fn clear(&mut self) {
        self.values.clear(); // Topics clear each superstep — collect fresh each round
    }

    fn is_ephemeral(&self) -> bool {
        true
    }

    fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
        bincode::serialize(&self.values).map_err(|e| PeError::Storage {
            details: format!("Topic checkpoint failed: {e}"),
        })
    }

    fn type_name(&self) -> &'static str {
        "Topic"
    }

    fn as_any(&self) -> &dyn Any {
        self
    }

    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
}