1use crate::quad::Quad;
31use crate::types::{Layer, UnitId};
32
33#[derive(Debug, Clone, thiserror::Error)]
35#[non_exhaustive]
36pub enum UnitProcessorError {
37 #[error("Empty input Quad for {unit} at cycle {cycle_index}")]
39 EmptyInput {
40 unit: UnitId,
42 cycle_index: u64,
44 },
45
46 #[error("Invalid output from {unit}: {reason}")]
48 InvalidOutput {
49 unit: UnitId,
51 reason: String,
53 },
54
55 #[error("Processor error in {unit}: {reason}")]
57 ProcessorError {
58 unit: UnitId,
60 reason: String,
62 },
63}
64
65pub trait UnitProcessor: Send + 'static {
89 fn process(
93 &mut self,
94 unit_id: UnitId,
95 cycle_index: u64,
96 input: &Quad,
97 data: &Quad,
98 ) -> Result<Quad, UnitProcessorError>;
99
100 fn process_layer(
107 &mut self,
108 unit_id: UnitId,
109 cycle_index: u64,
110 target_layer: Layer,
111 server_output: &Quad,
112 ) -> Result<Quad, UnitProcessorError> {
113 let root = {
114 let mut hasher = blake3::Hasher::new();
115 hasher.update(&[unit_id as u8]);
116 hasher.update(&[target_layer as u8]);
117 hasher.update(&cycle_index.to_le_bytes());
118 hasher.update(&server_output.root);
119 *hasher.finalize().as_bytes()
120 };
121
122 let pointer = {
123 let mut hasher = blake3::Hasher::new();
124 hasher.update(&[unit_id as u8]);
125 hasher.update(&[target_layer as u8]);
126 hasher.update(b"layer_pointer");
127 hasher.update(&cycle_index.to_le_bytes());
128 *hasher.finalize().as_bytes()
129 };
130
131 let mut tree = server_output.tree.clone();
132 tree.insert(
133 "processor.layer".into(),
134 format!("{target_layer:?}").into_bytes(),
135 );
136 tree.insert("processor.unit".into(), format!("{unit_id}").into_bytes());
137
138 Ok(Quad::new(root, pointer, tree))
139 }
140
141 fn externalize_state(&self, _unit_id: UnitId) -> Option<Quad> {
145 None
146 }
147
148 fn name(&self) -> &str;
150}
151
152#[cfg(feature = "test-support")]
153pub use test_support::{EchoProcessor, FailingProcessor, StubProcessor};
154
155#[cfg(feature = "test-support")]
156mod test_support {
157 use super::{Quad, UnitId, UnitProcessor, UnitProcessorError};
158
159 #[derive(Debug, Clone)]
164 pub struct StubProcessor {
165 invocation_count: u64,
166 }
167
168 impl StubProcessor {
169 pub fn new() -> Self {
171 Self { invocation_count: 0 }
172 }
173
174 pub fn invocation_count(&self) -> u64 {
176 self.invocation_count
177 }
178 }
179
180 impl Default for StubProcessor {
181 fn default() -> Self {
182 Self::new()
183 }
184 }
185
186 impl UnitProcessor for StubProcessor {
187 fn process(
188 &mut self,
189 unit_id: UnitId,
190 cycle_index: u64,
191 input: &Quad,
192 _data: &Quad,
193 ) -> Result<Quad, UnitProcessorError> {
194 self.invocation_count += 1;
195
196 let root = {
197 let mut hasher = blake3::Hasher::new();
198 hasher.update(&[unit_id as u8]);
199 hasher.update(&cycle_index.to_le_bytes());
200 hasher.update(&input.root);
201 *hasher.finalize().as_bytes()
202 };
203
204 let pointer = {
205 let mut hasher = blake3::Hasher::new();
206 hasher.update(&[unit_id as u8]);
207 hasher.update(b"pointer");
208 hasher.update(&cycle_index.to_le_bytes());
209 *hasher.finalize().as_bytes()
210 };
211
212 let mut tree = input.tree.clone();
213 tree.insert("processor.unit".into(), format!("{unit_id}").into_bytes());
214 tree.insert("processor.cycle".into(), cycle_index.to_le_bytes().to_vec());
215 tree.insert(
216 "processor.input_root_hash".into(),
217 blake3::hash(&input.root).as_bytes().to_vec(),
218 );
219 tree.insert("processor.name".into(), b"StubProcessor".to_vec());
220
221 Ok(Quad::new(root, pointer, tree))
222 }
223
224 fn name(&self) -> &str {
225 "StubProcessor"
226 }
227 }
228
229 #[derive(Debug, Clone, Default)]
231 pub struct EchoProcessor;
232
233 impl EchoProcessor {
234 pub fn new() -> Self {
236 Self
237 }
238 }
239
240 impl UnitProcessor for EchoProcessor {
241 fn process(
242 &mut self,
243 _unit_id: UnitId,
244 _cycle_index: u64,
245 input: &Quad,
246 _data: &Quad,
247 ) -> Result<Quad, UnitProcessorError> {
248 Ok(input.clone())
249 }
250
251 fn name(&self) -> &str {
252 "EchoProcessor"
253 }
254 }
255
256 #[derive(Debug, Clone)]
260 pub struct FailingProcessor {
261 message: String,
262 }
263
264 impl FailingProcessor {
265 pub fn new(message: impl Into<String>) -> Self {
267 Self { message: message.into() }
268 }
269 }
270
271 impl UnitProcessor for FailingProcessor {
272 fn process(
273 &mut self,
274 unit_id: UnitId,
275 _cycle_index: u64,
276 _input: &Quad,
277 _data: &Quad,
278 ) -> Result<Quad, UnitProcessorError> {
279 Err(UnitProcessorError::ProcessorError {
280 unit: unit_id,
281 reason: self.message.clone(),
282 })
283 }
284
285 fn name(&self) -> &str {
286 "FailingProcessor"
287 }
288 }
289
290 #[cfg(test)]
291 mod tests {
292 use super::*;
293 use crate::quad::Tree as QuadTree;
294 use crate::types::Layer;
295
296 fn make_input_quad() -> Quad {
297 let mut tree = QuadTree::new();
298 tree.insert("input.data".into(), vec![1, 2, 3]);
299 Quad::from_strings("input_root", "stripped_sentinel", tree)
300 }
301
302 #[test]
303 fn stub_produces_non_empty_output() {
304 let mut proc = StubProcessor::new();
305 let input = make_input_quad();
306 let output = proc.process(UnitId::FU, 1, &input, &Quad::default()).unwrap();
307 assert_ne!(output.root, [0u8; 32]);
308 assert_ne!(output.pointer, [0u8; 32]);
309 assert!(!output.tree.is_empty());
310 }
311
312 #[test]
313 fn stub_output_depends_on_unit_id() {
314 let input = make_input_quad();
315 let out_fu = StubProcessor::new().process(UnitId::FU, 1, &input, &Quad::default()).unwrap();
316 let out_mu = StubProcessor::new().process(UnitId::MU, 1, &input, &Quad::default()).unwrap();
317 assert_ne!(out_fu.root, out_mu.root);
318 assert_ne!(out_fu.pointer, out_mu.pointer);
319 }
320
321 #[test]
322 fn stub_output_depends_on_cycle_index() {
323 let input = make_input_quad();
324 let out_c1 = StubProcessor::new().process(UnitId::FU, 1, &input, &Quad::default()).unwrap();
325 let out_c2 = StubProcessor::new().process(UnitId::FU, 2, &input, &Quad::default()).unwrap();
326 assert_ne!(out_c1.root, out_c2.root);
327 }
328
329 #[test]
330 fn stub_pointer_does_not_depend_on_input_pointer() {
331 let input_a = Quad::new([1u8; 32], [0u8; 32], QuadTree::new());
332 let input_b = Quad::new([1u8; 32], [99u8; 32], QuadTree::new());
333 let out_a = StubProcessor::new().process(UnitId::FU, 1, &input_a, &Quad::default()).unwrap();
334 let out_b = StubProcessor::new().process(UnitId::FU, 1, &input_b, &Quad::default()).unwrap();
335 assert_eq!(out_a.pointer, out_b.pointer);
336 }
337
338 #[test]
339 fn echo_returns_input_unchanged() {
340 let mut proc = EchoProcessor::new();
341 let input = make_input_quad();
342 let output = proc.process(UnitId::FU, 1, &input, &Quad::default()).unwrap();
343 assert_eq!(output, input);
344 }
345
346 #[test]
347 fn failing_processor_always_fails() {
348 let mut proc = FailingProcessor::new("intentional failure");
349 let input = make_input_quad();
350 let result = proc.process(UnitId::FU, 1, &input, &Quad::default());
351 assert!(result.is_err());
352 }
353
354 #[test]
355 fn processor_is_send() {
356 fn assert_send<T: Send>() {}
357 assert_send::<StubProcessor>();
358 assert_send::<EchoProcessor>();
359 assert_send::<FailingProcessor>();
360 }
361
362 #[test]
363 fn default_process_layer_produces_output() {
364 let mut proc = StubProcessor::new();
365 let server_output = proc.process(UnitId::FU, 1, &make_input_quad(), &Quad::default()).unwrap();
366 let client_out = proc.process_layer(UnitId::FU, 1, Layer::Client, &server_output).unwrap();
367 assert_ne!(client_out.root, [0u8; 32]);
368 }
369
370 #[test]
371 fn unit_processor_error_display() {
372 let e = UnitProcessorError::ProcessorError {
373 unit: UnitId::FU,
374 reason: "test".into(),
375 };
376 assert!(e.to_string().contains("FU"));
377 }
378 }
379}