use cano::prelude::*;
use std::sync::Arc;
struct NamespacedStore {
namespace: String,
inner: MemoryStore,
}
impl NamespacedStore {
fn new(namespace: impl Into<String>, inner: MemoryStore) -> Self {
Self {
namespace: namespace.into(),
inner,
}
}
fn put<T: Send + Sync + 'static>(&self, key: &str, value: T) -> Result<(), CanoError> {
let namespaced = format!("{}/{key}", self.namespace);
self.inner
.put(&namespaced, value)
.map_err(|e| CanoError::store(format!("{e}")))
}
fn get<T: Clone + 'static>(&self, key: &str) -> Result<T, CanoError> {
let namespaced = format!("{}/{key}", self.namespace);
self.inner
.get(&namespaced)
.map_err(|e| CanoError::store(format!("{e}")))
}
}
#[resource]
impl Resource for NamespacedStore {}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step {
WriteA,
WriteB,
ReadBack,
LoadLarge,
SharedArc,
Done,
}
struct WriterA;
#[task(state = Step)]
impl WriterA {
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let store = res.get::<NamespacedStore, _>("ns_a")?;
store.put("result", 100u32)?;
store.put("label", "from-writer-a".to_string())?;
println!(" WriterA: wrote result=100, label='from-writer-a' in namespace 'a'");
Ok(TaskResult::Single(Step::WriteB))
}
}
struct WriterB;
#[task(state = Step)]
impl WriterB {
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let store = res.get::<NamespacedStore, _>("ns_b")?;
store.put("result", 200u32)?;
store.put("label", "from-writer-b".to_string())?;
println!(" WriterB: wrote result=200, label='from-writer-b' in namespace 'b'");
Ok(TaskResult::Single(Step::ReadBack))
}
}
struct ReadBack;
#[task(state = Step)]
impl ReadBack {
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let ns_a = res.get::<NamespacedStore, _>("ns_a")?;
let ns_b = res.get::<NamespacedStore, _>("ns_b")?;
let a_result: u32 = ns_a.get("result")?;
let a_label: String = ns_a.get("label")?;
let b_result: u32 = ns_b.get("result")?;
let b_label: String = ns_b.get("label")?;
println!(" ReadBack: namespace 'a' -> result={a_result}, label='{a_label}'");
println!(" ReadBack: namespace 'b' -> result={b_result}, label='{b_label}'");
assert_eq!(a_result, 100, "namespace 'a' must be independent of 'b'");
assert_eq!(b_result, 200, "namespace 'b' must be independent of 'a'");
assert_ne!(a_result, b_result, "no cross-namespace collision");
println!(" ReadBack: namespaces are isolated — no key collisions");
Ok(TaskResult::Single(Step::LoadLarge))
}
}
struct LargePayload {
data: Vec<u8>,
}
impl LargePayload {
fn new(size: usize) -> Self {
Self {
data: vec![0xAB; size],
}
}
}
struct LoadLarge;
#[task(state = Step)]
impl LoadLarge {
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let store = res.get::<MemoryStore, _>("mem")?;
let payload = LargePayload::new(1_000_000); store.put("payload", payload)?;
println!(" LoadLarge: stored 1 MiB payload");
Ok(TaskResult::Single(Step::SharedArc))
}
}
struct SharedArc;
#[task(state = Step)]
impl SharedArc {
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let store = res.get::<MemoryStore, _>("mem")?;
let arc1: Arc<LargePayload> = store.get_shared("payload")?;
let arc2: Arc<LargePayload> = store.get_shared("payload")?;
println!(" SharedArc: arc1 data length = {} bytes", arc1.data.len());
println!(" SharedArc: arc2 data length = {} bytes", arc2.data.len());
assert!(
Arc::ptr_eq(&arc1, &arc2),
"get_shared must return arcs pointing at the same allocation"
);
println!(" SharedArc: Arc::ptr_eq(&arc1, &arc2) == true — zero-copy sharing confirmed");
let arc3 = Arc::clone(&arc1);
assert!(Arc::ptr_eq(&arc1, &arc3));
println!(" SharedArc: Arc::clone is also O(1) and ptr_eq");
Ok(TaskResult::Single(Step::Done))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
println!("=== Custom Store Backend and get_shared Demo ===\n");
let shared_mem = MemoryStore::new();
let resources = Resources::new()
.insert("mem", shared_mem.clone())
.insert("ns_a", NamespacedStore::new("a", shared_mem.clone()))
.insert("ns_b", NamespacedStore::new("b", shared_mem.clone()));
println!("-- Part 1: NamespacedStore (key isolation) --");
let workflow = Workflow::new(resources)
.register(Step::WriteA, WriterA)
.register(Step::WriteB, WriterB)
.register(Step::ReadBack, ReadBack)
.register(Step::LoadLarge, LoadLarge)
.register(Step::SharedArc, SharedArc)
.add_exit_state(Step::Done);
println!("\n-- Part 2: MemoryStore::get_shared (Arc zero-copy sharing) --");
let result = workflow.orchestrate(Step::WriteA).await?;
println!("\ncompleted at {result:?}");
println!("\n=== Done ===");
Ok(())
}