use std::time::Duration;
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step {
ParseUrls,
FetchUrls,
Summarise,
Done,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Key {
Store,
}
struct ParseUrls;
#[task::batch(state = Step, key = Key)]
impl ParseUrls {
fn concurrency(&self) -> usize {
4
}
async fn load(&self, res: &Resources<Key>) -> Result<Vec<String>, CanoError> {
let store = res.get::<MemoryStore, _>(&Key::Store)?;
let urls: Vec<String> = store.get("input_urls")?;
println!("parse urls : loaded {} URLs", urls.len());
Ok(urls)
}
async fn process_item(&self, item: &String) -> Result<String, CanoError> {
if item.starts_with("ftp://") {
return Err(CanoError::task_execution(format!(
"unsupported scheme in {item}"
)));
}
if item.starts_with("http://") || item.starts_with("https://") {
Ok(item.clone())
} else {
Err(CanoError::task_execution(format!(
"cannot parse URL: {item}"
)))
}
}
async fn finish(
&self,
res: &Resources<Key>,
outputs: Vec<Result<String, CanoError>>,
) -> Result<TaskResult<Step>, CanoError> {
let valid: Vec<String> = outputs.into_iter().filter_map(|r| r.ok()).collect();
println!("parse urls : {} valid URLs after validation", valid.len());
if valid.is_empty() {
return Err(CanoError::task_execution("no valid URLs to fetch"));
}
let store = res.get::<MemoryStore, _>(&Key::Store)?;
store.put("valid_urls", valid)?;
Ok(TaskResult::Single(Step::FetchUrls))
}
}
struct FetchUrls;
#[task::batch(state = Step, key = Key)]
impl FetchUrls {
fn concurrency(&self) -> usize {
3
}
fn item_retry(&self) -> RetryMode {
RetryMode::fixed(2, Duration::from_millis(5))
}
async fn load(&self, res: &Resources<Key>) -> Result<Vec<String>, CanoError> {
let store = res.get::<MemoryStore, _>(&Key::Store)?;
let urls: Vec<String> = store.get("valid_urls")?;
println!(
"fetch urls : fetching {} URLs (concurrency=3, retry=2)",
urls.len()
);
Ok(urls)
}
async fn process_item(&self, item: &String) -> Result<(String, usize), CanoError> {
tokio::time::sleep(Duration::from_millis(50)).await;
let bytes = item.len() * 100; println!("fetch urls : fetched {item} → {bytes} bytes");
Ok((item.clone(), bytes))
}
async fn finish(
&self,
res: &Resources<Key>,
outputs: Vec<Result<(String, usize), CanoError>>,
) -> Result<TaskResult<Step>, CanoError> {
let results: Vec<(String, usize)> = outputs.into_iter().filter_map(|r| r.ok()).collect();
println!("fetch urls : {} URLs fetched successfully", results.len());
let store = res.get::<MemoryStore, _>(&Key::Store)?;
store.put("fetch_results", results)?;
Ok(TaskResult::Single(Step::Summarise))
}
}
struct Summarise {
url_count: usize,
}
#[task::batch(state = Step, key = Key)]
impl Summarise {
async fn load(&self, res: &Resources<Key>) -> Result<Vec<(String, usize)>, CanoError> {
let store = res.get::<MemoryStore, _>(&Key::Store)?;
let results: Vec<(String, usize)> = store.get("fetch_results")?;
Ok(results)
}
async fn process_item(&self, item: &(String, usize)) -> Result<usize, CanoError> {
Ok(item.1)
}
async fn finish(
&self,
_res: &Resources<Key>,
outputs: Vec<Result<usize, CanoError>>,
) -> Result<TaskResult<Step>, CanoError> {
let total_bytes: usize = outputs.iter().filter_map(|r| r.as_ref().ok()).sum();
let success_count = outputs.iter().filter(|r| r.is_ok()).count();
let pct = (success_count * 100)
.checked_div(self.url_count)
.unwrap_or(0);
println!(
"summarise : {success_count}/{} succeeded ({pct}%), {total_bytes} bytes total",
self.url_count
);
if pct < 50 {
return Err(CanoError::task_execution(format!(
"only {pct}% of URLs succeeded — below the 50% threshold"
)));
}
Ok(TaskResult::Single(Step::Done))
}
}
#[tokio::main]
async fn main() -> CanoResult<()> {
let input_urls = vec![
"https://example.com/page1".to_string(),
"https://example.com/page2".to_string(),
"https://example.com/page3".to_string(),
"http://legacy.example.com/api".to_string(),
"ftp://old-server.example.com/data".to_string(), "not-a-url".to_string(), ];
println!("=== batch_task example ===");
println!("input: {} URL strings\n", input_urls.len());
let store = MemoryStore::new();
store.put("input_urls", input_urls.clone())?;
let resources = Resources::<Key>::new().insert(Key::Store, store);
let url_count = input_urls.len();
let workflow = Workflow::new(resources)
.register(Step::ParseUrls, ParseUrls)
.register(Step::FetchUrls, FetchUrls)
.register(Step::Summarise, Summarise { url_count })
.add_exit_state(Step::Done);
let result = workflow.orchestrate(Step::ParseUrls).await?;
assert_eq!(result, Step::Done);
println!("\ncompleted at {result:?}");
Ok(())
}