1use serde::{Serialize, de::DeserializeOwned};
7use std::any::Any;
8
9use crate::error::PeError;
10
11pub trait Channel: Any + Send + Sync {
16 fn merge(&mut self, update: Box<dyn Any + Send>);
18
19 fn clone_box(&self) -> Box<dyn Channel>;
21
22 fn clear(&mut self);
24
25 fn is_ephemeral(&self) -> bool {
34 false }
36
37 fn checkpoint(&self) -> Result<Vec<u8>, PeError>;
45
46 fn type_name(&self) -> &'static str;
48
49 fn as_any(&self) -> &dyn Any;
51
52 fn as_any_mut(&mut self) -> &mut dyn Any;
54}
55
56#[derive(Debug, Clone)]
59pub struct LastValue<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
60 value: T,
61}
62
63impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> LastValue<T> {
64 pub fn new(initial: T) -> Self {
65 Self { value: initial }
66 }
67
68 pub fn get(&self) -> &T {
69 &self.value
70 }
71}
72
73impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for LastValue<T> {
74 fn merge(&mut self, update: Box<dyn Any + Send>) {
75 if let Ok(val) = update.downcast::<T>() {
76 self.value = *val;
77 }
78 }
79
80 fn clone_box(&self) -> Box<dyn Channel> {
81 Box::new(self.clone())
82 }
83
84 fn clear(&mut self) {
85 }
87
88 fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
89 bincode::serialize(&self.value).map_err(|e| PeError::Storage {
90 details: format!("LastValue checkpoint failed: {e}"),
91 })
92 }
93
94 fn type_name(&self) -> &'static str {
95 "LastValue"
96 }
97
98 fn as_any(&self) -> &dyn Any {
99 self
100 }
101
102 fn as_any_mut(&mut self) -> &mut dyn Any {
103 self
104 }
105}
106
107#[derive(Debug, Clone)]
110pub struct Appender<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
111 values: Vec<T>,
112}
113
114impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default for Appender<T> {
115 fn default() -> Self {
116 Self { values: vec![] }
117 }
118}
119
120impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Appender<T> {
121 pub fn new() -> Self {
122 Self::default()
123 }
124
125 pub fn with_initial(values: Vec<T>) -> Self {
126 Self { values }
127 }
128
129 pub fn get(&self) -> &[T] {
130 &self.values
131 }
132}
133
134impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for Appender<T> {
135 fn merge(&mut self, update: Box<dyn Any + Send>) {
136 match update.downcast::<Vec<T>>() {
138 Ok(items) => self.values.extend(*items),
139 Err(update) => {
140 if let Ok(item) = update.downcast::<T>() {
141 self.values.push(*item);
142 }
143 }
144 }
145 }
146
147 fn clone_box(&self) -> Box<dyn Channel> {
148 Box::new(self.clone())
149 }
150
151 fn clear(&mut self) {
152 }
154
155 fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
156 bincode::serialize(&self.values).map_err(|e| PeError::Storage {
157 details: format!("Appender checkpoint failed: {e}"),
158 })
159 }
160
161 fn type_name(&self) -> &'static str {
162 "Appender"
163 }
164
165 fn as_any(&self) -> &dyn Any {
166 self
167 }
168
169 fn as_any_mut(&mut self) -> &mut dyn Any {
170 self
171 }
172}
173
174#[derive(Debug, Clone)]
177pub struct EphemeralValue<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
178 value: Option<T>,
179}
180
181impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default
182 for EphemeralValue<T>
183{
184 fn default() -> Self {
185 Self { value: None }
186 }
187}
188
189impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> EphemeralValue<T> {
190 pub fn new() -> Self {
191 Self::default()
192 }
193
194 pub fn get(&self) -> Option<&T> {
195 self.value.as_ref()
196 }
197}
198
199impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel
200 for EphemeralValue<T>
201{
202 fn merge(&mut self, update: Box<dyn Any + Send>) {
203 if let Ok(val) = update.downcast::<T>() {
204 self.value = Some(*val);
205 }
206 }
207
208 fn clone_box(&self) -> Box<dyn Channel> {
209 Box::new(self.clone())
210 }
211
212 fn clear(&mut self) {
213 self.value = None; }
215
216 fn is_ephemeral(&self) -> bool {
217 true
218 }
219
220 fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
221 bincode::serialize(&self.value).map_err(|e| PeError::Storage {
222 details: format!("EphemeralValue checkpoint failed: {e}"),
223 })
224 }
225
226 fn type_name(&self) -> &'static str {
227 "EphemeralValue"
228 }
229
230 fn as_any(&self) -> &dyn Any {
231 self
232 }
233
234 fn as_any_mut(&mut self) -> &mut dyn Any {
235 self
236 }
237}
238
239#[derive(Debug, Clone)]
242pub struct Topic<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> {
243 values: Vec<T>,
244}
245
246impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Default for Topic<T> {
247 fn default() -> Self {
248 Self { values: vec![] }
249 }
250}
251
252impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Topic<T> {
253 pub fn new() -> Self {
254 Self::default()
255 }
256
257 pub fn get(&self) -> &[T] {
258 &self.values
259 }
260}
261
262impl<T: Clone + Send + Sync + Serialize + DeserializeOwned + 'static> Channel for Topic<T> {
263 fn merge(&mut self, update: Box<dyn Any + Send>) {
264 match update.downcast::<Vec<T>>() {
265 Ok(items) => self.values.extend(*items),
266 Err(update) => {
267 if let Ok(item) = update.downcast::<T>() {
268 self.values.push(*item);
269 }
270 }
271 }
272 }
273
274 fn clone_box(&self) -> Box<dyn Channel> {
275 Box::new(self.clone())
276 }
277
278 fn clear(&mut self) {
279 self.values.clear(); }
281
282 fn is_ephemeral(&self) -> bool {
283 true
284 }
285
286 fn checkpoint(&self) -> Result<Vec<u8>, PeError> {
287 bincode::serialize(&self.values).map_err(|e| PeError::Storage {
288 details: format!("Topic checkpoint failed: {e}"),
289 })
290 }
291
292 fn type_name(&self) -> &'static str {
293 "Topic"
294 }
295
296 fn as_any(&self) -> &dyn Any {
297 self
298 }
299
300 fn as_any_mut(&mut self) -> &mut dyn Any {
301 self
302 }
303}