use cano::prelude::*;
use redb::{Database, ReadableDatabase, TableDefinition};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
#[derive(Debug, Hash, Eq, PartialEq)]
enum Key {
Store,
Counter,
Params,
Db,
}
struct CounterResource {
setup_count: Arc<Mutex<u32>>,
}
impl CounterResource {
fn new() -> Self {
Self {
setup_count: Arc::new(Mutex::new(0)),
}
}
fn get_setup_count(&self) -> u32 {
*self.setup_count.lock().unwrap()
}
}
#[resource]
impl Resource for CounterResource {
async fn setup(&self) -> Result<(), CanoError> {
*self.setup_count.lock().unwrap() += 1;
println!("CounterResource: setup called");
Ok(())
}
async fn teardown(&self) -> Result<(), CanoError> {
println!("CounterResource: teardown called");
Ok(())
}
}
const RESULTS_TABLE: TableDefinition<&str, u32> = TableDefinition::new("results");
struct RedbResource {
path: PathBuf,
db: Mutex<Option<Arc<Database>>>,
}
impl RedbResource {
fn new(path: PathBuf) -> Self {
Self {
path,
db: Mutex::new(None),
}
}
fn db(&self) -> Result<Arc<Database>, CanoError> {
self.db
.lock()
.unwrap()
.as_ref()
.cloned()
.ok_or_else(|| CanoError::Generic("redb database not initialized".into()))
}
}
#[resource]
impl Resource for RedbResource {
async fn setup(&self) -> Result<(), CanoError> {
let db = Database::create(&self.path)
.map_err(|e| CanoError::Generic(format!("redb create: {e}")))?;
let write_tx = db
.begin_write()
.map_err(|e| CanoError::Generic(format!("redb begin_write: {e}")))?;
{
let _ = write_tx
.open_table(RESULTS_TABLE)
.map_err(|e| CanoError::Generic(format!("redb open_table: {e}")))?;
}
write_tx
.commit()
.map_err(|e| CanoError::Generic(format!("redb commit: {e}")))?;
*self.db.lock().unwrap() = Some(Arc::new(db));
println!("RedbResource: opened {}", self.path.display());
Ok(())
}
async fn teardown(&self) -> Result<(), CanoError> {
*self.db.lock().unwrap() = None;
if self.path.exists() {
let _ = std::fs::remove_file(&self.path);
}
println!("RedbResource: closed and removed {}", self.path.display());
Ok(())
}
}
#[derive(Resource)]
struct WorkflowParams {
multiplier: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step {
Init,
Process,
Persist,
Verify,
Done,
}
struct InitTask;
#[task(state = Step, key = Key)]
impl InitTask {
async fn run(&self, res: &Resources<Key>) -> Result<TaskResult<Step>, CanoError> {
let store = res.get::<MemoryStore, _>(&Key::Store)?;
let params = res.get::<WorkflowParams, _>(&Key::Params)?;
let value = 10u32 * params.multiplier;
store.put("value", value)?;
println!("InitTask: stored value = {value}");
Ok(TaskResult::Single(Step::Process))
}
}
struct ProcessTask;
#[task(state = Step, key = Key)]
impl ProcessTask {
async fn run(&self, res: &Resources<Key>) -> Result<TaskResult<Step>, CanoError> {
let store = res.get::<MemoryStore, _>(&Key::Store)?;
let counter = res.get::<CounterResource, _>(&Key::Counter)?;
let value: u32 = store.get("value")?;
let result = value + counter.get_setup_count();
store.put("result", result)?;
println!("ProcessTask: result = {result}");
Ok(TaskResult::Single(Step::Persist))
}
}
struct PersistTask;
#[task(state = Step, key = Key)]
impl PersistTask {
async fn run(&self, res: &Resources<Key>) -> Result<TaskResult<Step>, CanoError> {
let store = res.get::<MemoryStore, _>(&Key::Store)?;
let redb = res.get::<RedbResource, _>(&Key::Db)?;
let result: u32 = store.get("result")?;
let db = redb.db()?;
let write_tx = db
.begin_write()
.map_err(|e| CanoError::Generic(format!("redb begin_write: {e}")))?;
{
let mut table = write_tx
.open_table(RESULTS_TABLE)
.map_err(|e| CanoError::Generic(format!("redb open_table: {e}")))?;
table
.insert("final_result", &result)
.map_err(|e| CanoError::Generic(format!("redb insert: {e}")))?;
}
write_tx
.commit()
.map_err(|e| CanoError::Generic(format!("redb commit: {e}")))?;
println!("PersistTask: committed final_result = {result} to redb");
Ok(TaskResult::Single(Step::Verify))
}
}
struct VerifyTask;
#[task(state = Step, key = Key)]
impl VerifyTask {
async fn run(&self, res: &Resources<Key>) -> Result<TaskResult<Step>, CanoError> {
let redb = res.get::<RedbResource, _>(&Key::Db)?;
let db = redb.db()?;
let read_tx = db
.begin_read()
.map_err(|e| CanoError::Generic(format!("redb begin_read: {e}")))?;
let table = read_tx
.open_table(RESULTS_TABLE)
.map_err(|e| CanoError::Generic(format!("redb open_table: {e}")))?;
let value = table
.get("final_result")
.map_err(|e| CanoError::Generic(format!("redb get: {e}")))?
.ok_or_else(|| CanoError::Generic("final_result missing from redb".into()))?
.value();
println!("VerifyTask: read back {value} from redb");
Ok(TaskResult::Single(Step::Done))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
println!("Workflow Resources Example");
println!("==========================");
println!();
let store = MemoryStore::new();
let counter = CounterResource::new();
let counter_clone = CounterResource {
setup_count: Arc::clone(&counter.setup_count),
};
let db_path = std::env::temp_dir().join("cano_workflow_resources_example.redb");
if db_path.exists() {
let _ = std::fs::remove_file(&db_path);
}
let resources = Resources::<Key>::new()
.insert(Key::Store, store.clone())
.insert(Key::Counter, counter)
.insert(Key::Params, WorkflowParams { multiplier: 3 })
.insert(Key::Db, RedbResource::new(db_path));
let workflow = Workflow::new(resources)
.register(Step::Init, InitTask)
.register(Step::Process, ProcessTask)
.register(Step::Persist, PersistTask)
.register(Step::Verify, VerifyTask)
.add_exit_state(Step::Done);
println!("Running workflow...");
let final_state = workflow.orchestrate(Step::Init).await?;
assert_eq!(final_state, Step::Done);
let result: u32 = store.get("result")?;
println!("In-memory result: {result}");
assert_eq!(counter_clone.get_setup_count(), 1);
println!(
"CounterResource setup was called {} time(s)",
counter_clone.get_setup_count()
);
println!();
println!("Demonstrated:");
println!(" - Enum keys for compile-time resource safety");
println!(" - Custom Resource with setup/teardown lifecycle (CounterResource)");
println!(" - Persistent ACID resource (RedbResource) with file lifecycle");
println!(" - Plain config resource (WorkflowParams) with no-op lifecycle");
println!(" - In-memory hand-off store (MemoryStore) shared across tasks");
println!(" - Arc-based sharing for post-workflow inspection");
Ok(())
}