use crate::quad::Quad;
use crate::types::{Layer, UnitId};
#[derive(Debug, Clone, thiserror::Error)]
#[non_exhaustive]
pub enum UnitProcessorError {
#[error("Empty input Quad for {unit} at cycle {cycle_index}")]
EmptyInput {
unit: UnitId,
cycle_index: u64,
},
#[error("Invalid output from {unit}: {reason}")]
InvalidOutput {
unit: UnitId,
reason: String,
},
#[error("Processor error in {unit}: {reason}")]
ProcessorError {
unit: UnitId,
reason: String,
},
}
pub trait UnitProcessor: Send + 'static {
fn process(
&mut self,
unit_id: UnitId,
cycle_index: u64,
input: &Quad,
data: &Quad,
) -> Result<Quad, UnitProcessorError>;
fn process_layer(
&mut self,
unit_id: UnitId,
cycle_index: u64,
target_layer: Layer,
server_output: &Quad,
) -> Result<Quad, UnitProcessorError> {
let root = {
let mut hasher = blake3::Hasher::new();
hasher.update(&[unit_id as u8]);
hasher.update(&[target_layer as u8]);
hasher.update(&cycle_index.to_le_bytes());
hasher.update(&server_output.root);
*hasher.finalize().as_bytes()
};
let pointer = {
let mut hasher = blake3::Hasher::new();
hasher.update(&[unit_id as u8]);
hasher.update(&[target_layer as u8]);
hasher.update(b"layer_pointer");
hasher.update(&cycle_index.to_le_bytes());
*hasher.finalize().as_bytes()
};
let mut tree = server_output.tree.clone();
tree.insert(
"processor.layer".into(),
format!("{target_layer:?}").into_bytes(),
);
tree.insert("processor.unit".into(), format!("{unit_id}").into_bytes());
Ok(Quad::new(root, pointer, tree))
}
fn externalize_state(&self, _unit_id: UnitId) -> Option<Quad> {
None
}
fn name(&self) -> &str;
}
#[cfg(feature = "test-support")]
pub use test_support::{EchoProcessor, FailingProcessor, StubProcessor};
#[cfg(feature = "test-support")]
mod test_support {
use super::{Quad, UnitId, UnitProcessor, UnitProcessorError};
#[derive(Debug, Clone)]
pub struct StubProcessor {
invocation_count: u64,
}
impl StubProcessor {
pub fn new() -> Self {
Self { invocation_count: 0 }
}
pub fn invocation_count(&self) -> u64 {
self.invocation_count
}
}
impl Default for StubProcessor {
fn default() -> Self {
Self::new()
}
}
impl UnitProcessor for StubProcessor {
fn process(
&mut self,
unit_id: UnitId,
cycle_index: u64,
input: &Quad,
_data: &Quad,
) -> Result<Quad, UnitProcessorError> {
self.invocation_count += 1;
let root = {
let mut hasher = blake3::Hasher::new();
hasher.update(&[unit_id as u8]);
hasher.update(&cycle_index.to_le_bytes());
hasher.update(&input.root);
*hasher.finalize().as_bytes()
};
let pointer = {
let mut hasher = blake3::Hasher::new();
hasher.update(&[unit_id as u8]);
hasher.update(b"pointer");
hasher.update(&cycle_index.to_le_bytes());
*hasher.finalize().as_bytes()
};
let mut tree = input.tree.clone();
tree.insert("processor.unit".into(), format!("{unit_id}").into_bytes());
tree.insert("processor.cycle".into(), cycle_index.to_le_bytes().to_vec());
tree.insert(
"processor.input_root_hash".into(),
blake3::hash(&input.root).as_bytes().to_vec(),
);
tree.insert("processor.name".into(), b"StubProcessor".to_vec());
Ok(Quad::new(root, pointer, tree))
}
fn name(&self) -> &str {
"StubProcessor"
}
}
#[derive(Debug, Clone, Default)]
pub struct EchoProcessor;
impl EchoProcessor {
pub fn new() -> Self {
Self
}
}
impl UnitProcessor for EchoProcessor {
fn process(
&mut self,
_unit_id: UnitId,
_cycle_index: u64,
input: &Quad,
_data: &Quad,
) -> Result<Quad, UnitProcessorError> {
Ok(input.clone())
}
fn name(&self) -> &str {
"EchoProcessor"
}
}
#[derive(Debug, Clone)]
pub struct FailingProcessor {
message: String,
}
impl FailingProcessor {
pub fn new(message: impl Into<String>) -> Self {
Self { message: message.into() }
}
}
impl UnitProcessor for FailingProcessor {
fn process(
&mut self,
unit_id: UnitId,
_cycle_index: u64,
_input: &Quad,
_data: &Quad,
) -> Result<Quad, UnitProcessorError> {
Err(UnitProcessorError::ProcessorError {
unit: unit_id,
reason: self.message.clone(),
})
}
fn name(&self) -> &str {
"FailingProcessor"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::quad::Tree as QuadTree;
use crate::types::Layer;
fn make_input_quad() -> Quad {
let mut tree = QuadTree::new();
tree.insert("input.data".into(), vec![1, 2, 3]);
Quad::from_strings("input_root", "stripped_sentinel", tree)
}
#[test]
fn stub_produces_non_empty_output() {
let mut proc = StubProcessor::new();
let input = make_input_quad();
let output = proc.process(UnitId::FU, 1, &input, &Quad::default()).unwrap();
assert_ne!(output.root, [0u8; 32]);
assert_ne!(output.pointer, [0u8; 32]);
assert!(!output.tree.is_empty());
}
#[test]
fn stub_output_depends_on_unit_id() {
let input = make_input_quad();
let out_fu = StubProcessor::new().process(UnitId::FU, 1, &input, &Quad::default()).unwrap();
let out_mu = StubProcessor::new().process(UnitId::MU, 1, &input, &Quad::default()).unwrap();
assert_ne!(out_fu.root, out_mu.root);
assert_ne!(out_fu.pointer, out_mu.pointer);
}
#[test]
fn stub_output_depends_on_cycle_index() {
let input = make_input_quad();
let out_c1 = StubProcessor::new().process(UnitId::FU, 1, &input, &Quad::default()).unwrap();
let out_c2 = StubProcessor::new().process(UnitId::FU, 2, &input, &Quad::default()).unwrap();
assert_ne!(out_c1.root, out_c2.root);
}
#[test]
fn stub_pointer_does_not_depend_on_input_pointer() {
let input_a = Quad::new([1u8; 32], [0u8; 32], QuadTree::new());
let input_b = Quad::new([1u8; 32], [99u8; 32], QuadTree::new());
let out_a = StubProcessor::new().process(UnitId::FU, 1, &input_a, &Quad::default()).unwrap();
let out_b = StubProcessor::new().process(UnitId::FU, 1, &input_b, &Quad::default()).unwrap();
assert_eq!(out_a.pointer, out_b.pointer);
}
#[test]
fn echo_returns_input_unchanged() {
let mut proc = EchoProcessor::new();
let input = make_input_quad();
let output = proc.process(UnitId::FU, 1, &input, &Quad::default()).unwrap();
assert_eq!(output, input);
}
#[test]
fn failing_processor_always_fails() {
let mut proc = FailingProcessor::new("intentional failure");
let input = make_input_quad();
let result = proc.process(UnitId::FU, 1, &input, &Quad::default());
assert!(result.is_err());
}
#[test]
fn processor_is_send() {
fn assert_send<T: Send>() {}
assert_send::<StubProcessor>();
assert_send::<EchoProcessor>();
assert_send::<FailingProcessor>();
}
#[test]
fn default_process_layer_produces_output() {
let mut proc = StubProcessor::new();
let server_output = proc.process(UnitId::FU, 1, &make_input_quad(), &Quad::default()).unwrap();
let client_out = proc.process_layer(UnitId::FU, 1, Layer::Client, &server_output).unwrap();
assert_ne!(client_out.root, [0u8; 32]);
}
#[test]
fn unit_processor_error_display() {
let e = UnitProcessorError::ProcessorError {
unit: UnitId::FU,
reason: "test".into(),
};
assert!(e.to_string().contains("FU"));
}
}
}