Skip to main content

pe_core/
channel.rs

1//! Channel system — the actual state mechanism underneath everything.
2//!
3//! Every state field is a channel. Each channel type has different update semantics.
4//! Based on Group 4 of the pre-plan.
5
6use serde::{Serialize, de::DeserializeOwned};
7use std::any::Any;
8
9use crate::error::PeError;
10
11/// Core channel trait — knows how to merge updates into itself.
12///
13/// Every state field is backed by a channel. The channel type determines
14/// merge semantics (overwrite, append, ephemeral, etc.).
15pub trait Channel: Any + Send + Sync {
16    /// Merge a boxed update value into this channel.
17    fn merge(&mut self, update: Box<dyn Any + Send>);
18
19    /// Clone the channel (for snapshot isolation during parallel execution).
20    fn clone_box(&self) -> Box<dyn Channel>;
21
22    /// Clear the channel (for ephemeral values — called between supersteps).
23    fn clear(&mut self);
24
25    /// Whether this channel should be cleared between supersteps.
26    ///
27    /// # REVIEW(002): Selective clearing
28    /// `ChannelStore::clear_ephemeral()` previously called `clear()` on ALL
29    /// channels, relying on LastValue/Appender having no-op `clear()` impls.
30    /// This is fragile: any future Channel with a meaningful `clear()` that
31    /// shouldn't run between supersteps would silently lose data. This method
32    /// lets the store ask each channel whether it participates in superstep clearing.
33    fn is_ephemeral(&self) -> bool {
34        false // default: persistent channels don't clear
35    }
36
37    /// Serialize channel state for checkpointing.
38    ///
39    /// # REVIEW(002): Returns `Result` instead of `Vec<u8>`
40    /// Previously returned `Vec<u8>` with `unwrap_or_default()` on serialization
41    /// failure, which would silently produce corrupt/empty checkpoint data.
42    /// For a library promising durable execution, silent data loss is unacceptable.
43    /// Callers must handle the error (log, retry, abort checkpoint).
44    fn checkpoint(&self) -> Result<Vec<u8>, PeError>;
45
46    /// Name of this channel type (for debugging).
47    fn type_name(&self) -> &'static str;
48
49    /// Downcast support — returns self as `&dyn Any`.
50    fn as_any(&self) -> &dyn Any;
51
52    /// Downcast support — returns self as `&mut dyn Any`.
53    fn as_any_mut(&mut self) -> &mut dyn Any;
54}
55
56/// LastValue — stores the last written value. Default channel type.
57/// At most one update per step. Last write wins.
58#[derive(Debug, Clone)]
59pub struct LastValue<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
60    value: T,
61}
62
63impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> LastValue<T> {
64    pub fn new(initial: T) -> Self {
65        Self { value: initial }
66    }
67
68    pub fn get(&self) -> &T {
69        &self.value
70    }
71}
72
73impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for LastValue<T> {
74    fn merge(&mut self, update: Box<dyn Any + Send>) {
75        if let Ok(val) = update.downcast::<T>() {
76            self.value = *val;
77        }
78    }
79
80    fn clone_box(&self) -> Box<dyn Channel> {
81        Box::new(self.clone())
82    }
83
84    fn clear(&mut self) {
85        // LastValue does NOT clear between steps
86    }
87
88    fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
89        bincode::serialize(&self.value).map_err(|e| PeError::Storage {
90            details: format!("LastValue checkpoint failed: {e}"),
91        })
92    }
93
94    fn type_name(&self) -> &'static str {
95        "LastValue"
96    }
97
98    fn as_any(&self) -> &dyn Any {
99        self
100    }
101
102    fn as_any_mut(&mut self) -> &mut dyn Any {
103        self
104    }
105}
106
107/// Appender — collects values into a Vec. Multiple updates per step allowed.
108/// Used for messages, tool calls, build results, etc.
109#[derive(Debug, Clone)]
110pub struct Appender<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
111    values: Vec<T>,
112}
113
114impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default for Appender<T> {
115    fn default() -> Self {
116        Self { values: vec![] }
117    }
118}
119
120impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Appender<T> {
121    pub fn new() -> Self {
122        Self::default()
123    }
124
125    pub fn with_initial(values: Vec<T>) -> Self {
126        Self { values }
127    }
128
129    pub fn get(&self) -> &[T] {
130        &self.values
131    }
132}
133
134impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for Appender<T> {
135    fn merge(&mut self, update: Box<dyn Any + Send>) {
136        // Try Vec<T> first; if that fails, downcast returns Err with the original Box
137        match update.downcast::<Vec<T>>() {
138            Ok(items) => self.values.extend(*items),
139            Err(update) => {
140                if let Ok(item) = update.downcast::<T>() {
141                    self.values.push(*item);
142                }
143            }
144        }
145    }
146
147    fn clone_box(&self) -> Box<dyn Channel> {
148        Box::new(self.clone())
149    }
150
151    fn clear(&mut self) {
152        // Appender does NOT clear between steps
153    }
154
155    fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
156        bincode::serialize(&self.values).map_err(|e| PeError::Storage {
157            details: format!("Appender checkpoint failed: {e}"),
158        })
159    }
160
161    fn type_name(&self) -> &'static str {
162        "Appender"
163    }
164
165    fn as_any(&self) -> &dyn Any {
166        self
167    }
168
169    fn as_any_mut(&mut self) -> &mut dyn Any {
170        self
171    }
172}
173
174/// EphemeralValue — cleared after each superstep. One-time signal.
175/// Used for temporary routing signals, one-shot flags.
176#[derive(Debug, Clone)]
177pub struct EphemeralValue<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
178    value: Option<T>,
179}
180
181impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default
182    for EphemeralValue<T>
183{
184    fn default() -> Self {
185        Self { value: None }
186    }
187}
188
189impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> EphemeralValue<T> {
190    pub fn new() -> Self {
191        Self::default()
192    }
193
194    pub fn get(&self) -> Option<&T> {
195        self.value.as_ref()
196    }
197}
198
199impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel
200    for EphemeralValue<T>
201{
202    fn merge(&mut self, update: Box<dyn Any + Send>) {
203        if let Ok(val) = update.downcast::<T>() {
204            self.value = Some(*val);
205        }
206    }
207
208    fn clone_box(&self) -> Box<dyn Channel> {
209        Box::new(self.clone())
210    }
211
212    fn clear(&mut self) {
213        self.value = None; // Cleared every superstep
214    }
215
216    fn is_ephemeral(&self) -> bool {
217        true
218    }
219
220    fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
221        bincode::serialize(&self.value).map_err(|e| PeError::Storage {
222            details: format!("EphemeralValue checkpoint failed: {e}"),
223        })
224    }
225
226    fn type_name(&self) -> &'static str {
227        "EphemeralValue"
228    }
229
230    fn as_any(&self) -> &dyn Any {
231        self
232    }
233
234    fn as_any_mut(&mut self) -> &mut dyn Any {
235        self
236    }
237}
238
239/// Topic — collects multiple updates into a list per step. PubSub style.
240/// Multiple nodes can write to the same topic in one superstep.
241#[derive(Debug, Clone)]
242pub struct Topic<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
243    values: Vec<T>,
244}
245
246impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default for Topic<T> {
247    fn default() -> Self {
248        Self { values: vec![] }
249    }
250}
251
252impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Topic<T> {
253    pub fn new() -> Self {
254        Self::default()
255    }
256
257    pub fn get(&self) -> &[T] {
258        &self.values
259    }
260}
261
262impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for Topic<T> {
263    fn merge(&mut self, update: Box<dyn Any + Send>) {
264        match update.downcast::<Vec<T>>() {
265            Ok(items) => self.values.extend(*items),
266            Err(update) => {
267                if let Ok(item) = update.downcast::<T>() {
268                    self.values.push(*item);
269                }
270            }
271        }
272    }
273
274    fn clone_box(&self) -> Box<dyn Channel> {
275        Box::new(self.clone())
276    }
277
278    fn clear(&mut self) {
279        self.values.clear(); // Topics clear each superstep — collect fresh each round
280    }
281
282    fn is_ephemeral(&self) -> bool {
283        true
284    }
285
286    fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
287        bincode::serialize(&self.values).map_err(|e| PeError::Storage {
288            details: format!("Topic checkpoint failed: {e}"),
289        })
290    }
291
292    fn type_name(&self) -> &'static str {
293        "Topic"
294    }
295
296    fn as_any(&self) -> &dyn Any {
297        self
298    }
299
300    fn as_any_mut(&mut self) -> &mut dyn Any {
301        self
302    }
303}