use crate::callable::Callable;
use std::sync::Arc;
pub struct SequentialFlow<C: Callable> {
steps: Vec<Arc<C>>,
name: String,
}
impl<C: Callable> SequentialFlow<C> {
pub fn new(name: impl Into<String>) -> Self {
Self {
steps: Vec::new(),
name: name.into(),
}
}
pub fn add_step(mut self, callable: Arc<C>) -> Self {
self.steps.push(callable);
self
}
pub fn with_steps(mut self, callables: Vec<Arc<C>>) -> Self {
self.steps.extend(callables);
self
}
pub async fn execute(&self, input: &str) -> anyhow::Result<String> {
let mut current_output = input.to_string();
for step in &self.steps {
current_output = step.run(¤t_output).await?;
}
Ok(current_output)
}
pub fn name(&self) -> &str {
&self.name
}
pub fn len(&self) -> usize {
self.steps.len()
}
pub fn is_empty(&self) -> bool {
self.steps.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
struct MockCallable {
name: String,
transform: Box<dyn Fn(&str) -> String + Send + Sync>,
}
impl MockCallable {
fn new(name: &str, transform: impl Fn(&str) -> String + Send + Sync + 'static) -> Self {
Self {
name: name.to_string(),
transform: Box::new(transform),
}
}
fn uppercase(name: &str) -> Self {
Self::new(name, |s| s.to_uppercase())
}
fn append(name: &str, suffix: &'static str) -> Self {
Self::new(name, move |s| format!("{}{}", s, suffix))
}
fn prepend(name: &str, prefix: &'static str) -> Self {
Self::new(name, move |s| format!("{}{}", prefix, s))
}
}
#[async_trait]
impl Callable for MockCallable {
fn name(&self) -> &str {
&self.name
}
async fn run(&self, input: &str) -> anyhow::Result<String> {
Ok((self.transform)(input))
}
}
#[tokio::test]
async fn test_sequential_empty() {
let flow: SequentialFlow<MockCallable> = SequentialFlow::new("empty");
assert!(flow.is_empty());
assert_eq!(flow.len(), 0);
let result = flow.execute("input").await.unwrap();
assert_eq!(result, "input"); }
#[tokio::test]
async fn test_sequential_single_step() {
let step = Arc::new(MockCallable::uppercase("upper"));
let flow = SequentialFlow::new("single").add_step(step);
assert_eq!(flow.len(), 1);
assert!(!flow.is_empty());
assert_eq!(flow.name(), "single");
let result = flow.execute("hello").await.unwrap();
assert_eq!(result, "HELLO");
}
#[tokio::test]
async fn test_sequential_multiple_steps() {
let flow = SequentialFlow::new("chain")
.add_step(Arc::new(MockCallable::uppercase("step1")))
.add_step(Arc::new(MockCallable::append("step2", "!")))
.add_step(Arc::new(MockCallable::prepend("step3", ">> ")));
assert_eq!(flow.len(), 3);
let result = flow.execute("hello").await.unwrap();
assert_eq!(result, ">> HELLO!");
}
#[tokio::test]
async fn test_sequential_with_steps() {
let steps = vec![
Arc::new(MockCallable::uppercase("s1")),
Arc::new(MockCallable::append("s2", "_done")),
];
let flow = SequentialFlow::new("batch").with_steps(steps);
assert_eq!(flow.len(), 2);
let result = flow.execute("test").await.unwrap();
assert_eq!(result, "TEST_done");
}
#[tokio::test]
async fn test_sequential_error_propagation() {
struct FailingCallable {
name: String,
fail_on_call: usize,
call_count: std::sync::atomic::AtomicUsize,
}
impl FailingCallable {
fn new(name: &str, fail_on: usize) -> Self {
Self {
name: name.to_string(),
fail_on_call: fail_on,
call_count: std::sync::atomic::AtomicUsize::new(0),
}
}
}
#[async_trait]
impl Callable for FailingCallable {
fn name(&self) -> &str {
&self.name
}
async fn run(&self, input: &str) -> anyhow::Result<String> {
let n = self
.call_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if n >= self.fail_on_call {
anyhow::bail!("Intentional failure at step {}", n)
}
Ok(input.to_uppercase())
}
}
let flow = SequentialFlow::new("with_error")
.add_step(Arc::new(FailingCallable::new("step1", 10))) .add_step(Arc::new(FailingCallable::new("step2", 0))) .add_step(Arc::new(FailingCallable::new("step3", 10)));
let result = flow.execute("hello").await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Intentional failure"));
}
#[tokio::test]
async fn test_sequential_preserves_order() {
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = Arc::new(AtomicUsize::new(0));
let execution_order = Arc::new(std::sync::Mutex::new(Vec::new()));
struct OrderTracker {
name: String,
counter: Arc<AtomicUsize>,
order: Arc<std::sync::Mutex<Vec<usize>>>,
}
#[async_trait]
impl Callable for OrderTracker {
fn name(&self) -> &str {
&self.name
}
async fn run(&self, input: &str) -> anyhow::Result<String> {
let n = self.counter.fetch_add(1, Ordering::SeqCst);
self.order.lock().unwrap().push(n);
Ok(input.to_string())
}
}
let flow = SequentialFlow::new("ordered")
.add_step(Arc::new(OrderTracker {
name: "first".to_string(),
counter: counter.clone(),
order: execution_order.clone(),
}))
.add_step(Arc::new(OrderTracker {
name: "second".to_string(),
counter: counter.clone(),
order: execution_order.clone(),
}))
.add_step(Arc::new(OrderTracker {
name: "third".to_string(),
counter: counter.clone(),
order: execution_order.clone(),
}));
flow.execute("test").await.unwrap();
let order = execution_order.lock().unwrap();
assert_eq!(*order, vec![0, 1, 2]);
}
}