allora_core/patterns/splitter.rs
1//! Splitter pattern: derives multiple logical messages from a single inbound `Exchange`.
2//!
3//! Implements the Enterprise Integration Pattern (EIP) "Splitter". A splitter breaks a
4//! composite or aggregate message (e.g. a delimited string, a JSON array, a batch) into
5//! multiple individual messages. In this minimal implementation the provided closure
6//! returns a `Vec<Message>` representing the split parts.
7//!
8//! # Current Behavior & Limitation
9//! The splitter invokes the user-supplied closure and, if the returned vector is non-empty,
10//! stores ONLY the first resulting message in `exchange.out_msg`. The rest are discarded.
11//! This is intentionally minimal for now. Future versions may:
12//! * Emit all parts downstream via a `Vec<Message>` property.
13//! * Produce cloned `Exchange`s for each part.
14//! * Integrate with a downstream `RecipientList` or `Aggregator` automatically.
15//!
16//! # Use Cases
17//! * Taking a CSV line and extracting the first column for downstream processing.
18//! * Extracting primary record from a batch for quick validation.
19//! * Demonstration / scaffolding before implementing full fan-out semantics.
20//!
21//! # Example (tokenizing text payload)
22//! ```rust
23//! use allora_core::{patterns::splitter::Splitter, route::Route, Exchange, Message};
24//! let splitter = Splitter::new(|exchange: &Exchange| {
25//! exchange.in_msg.body_text()
26//! .map(|t| t.split_whitespace().map(Message::from_text).collect())
27//! .unwrap_or_else(Vec::new)
28//! });
29//! let route = Route::new().add(splitter).build();
30//! let mut exchange = Exchange::new(Message::from_text("one two three"));
31//! let rt = tokio::runtime::Runtime::new().unwrap();
32//! rt.block_on(async { route.run(&mut exchange).await.unwrap(); });
33//! assert_eq!(exchange.out_msg.unwrap().body_text(), Some("one"));
34//! ```
35//!
36//! # Edge Cases
37//! * Empty returned vector => `out_msg` remains unchanged (None if unset).
38//! * Mixed payload types allowed; only first message used when non-empty.
39//! * Closure should be pure / side-effect free aside from constructing messages.
40//!
41//! # Testing Strategies
42//! * Verify first-element extraction from multiple tokens.
43//! * Ensure no `out_msg` when closure returns empty slice.
44//! * Provide heterogeneous messages and confirm only first selected.
45
46use crate::{error::Result, message::Message, processor::Processor, Exchange};
47use std::fmt::{Debug, Formatter, Result as FmtResult};
48
49pub struct Splitter<F>
50where
51 F: Fn(&Exchange) -> Vec<Message> + Send + Sync + 'static,
52{
53 func: F,
54}
55
56impl<F> Debug for Splitter<F>
57where
58 F: Fn(&Exchange) -> Vec<Message> + Send + Sync + 'static,
59{
60 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
61 f.write_str("Splitter{func=*closure*}")
62 }
63}
64
65impl<F> Splitter<F>
66where
67 F: Fn(&Exchange) -> Vec<Message> + Send + Sync + 'static,
68{
69 pub fn new(func: F) -> Self {
70 Self { func }
71 }
72}
73
74#[async_trait::async_trait]
75impl<F> Processor for Splitter<F>
76where
77 F: Fn(&Exchange) -> Vec<Message> + Send + Sync + 'static,
78{
79 async fn process(&self, exchange: &mut Exchange) -> Result<()> {
80 let messages = (self.func)(exchange);
81 if let Some(first) = messages.get(0) {
82 exchange.out_msg = Some(first.clone());
83 }
84 Ok(())
85 }
86}