<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Store - Cano Documentation</title>
<meta name="description" content="Learn how to use the Store in Cano - thread-safe shared state for pipeline data passing between workflow stages.">
<meta property="og:title" content="Store - Cano Documentation">
<meta property="og:description" content="Thread-safe shared state for pipeline data passing between workflow stages.">
<link rel="stylesheet" href="styles.css">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/themes/prism-tomorrow.min.css">
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&family=Outfit:wght@500;700&family=Fira+Code&display=swap" rel="stylesheet">
<script src="https://cdnjs.cloudflare.com/ajax/libs/mermaid/10.6.1/mermaid.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/prism.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-rust.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-toml.min.js"></script>
<script src="./script.js" defer></script>
<style>
.page-toc {
background: var(--card-bg);
border: 1px solid var(--border-color);
border-radius: 0.75rem;
padding: 1.5rem 2rem;
margin-bottom: 3rem;
}
.page-toc-title {
font-size: 0.75rem;
font-weight: 700;
letter-spacing: 0.1em;
text-transform: uppercase;
color: var(--primary-color);
margin-bottom: 0.75rem;
}
.page-toc ol {
list-style: none;
counter-reset: toc-counter;
display: grid;
grid-template-columns: repeat(auto-fill, minmax(240px, 1fr));
gap: 0.25rem 2rem;
padding: 0;
margin: 0;
}
.page-toc li {
counter-increment: toc-counter;
}
.page-toc li a {
display: flex;
align-items: center;
gap: 0.5rem;
padding: 0.35rem 0;
color: #94a3b8;
font-size: 0.925rem;
font-weight: 500;
text-decoration: none;
transition: color 0.2s;
}
.page-toc li a::before {
content: counter(toc-counter, decimal-leading-zero);
font-size: 0.75rem;
font-weight: 700;
color: var(--border-color);
font-family: 'Fira Code', monospace;
transition: color 0.2s;
}
.page-toc li a:hover {
color: var(--primary-color);
}
.page-toc li a:hover::before {
color: var(--primary-color);
}
.section-divider {
border: none;
height: 1px;
background: linear-gradient(to right, var(--border-color), transparent);
margin: 3rem 0 0;
}
.callout {
padding: 1.25rem 1.5rem;
border-radius: 0.75rem;
margin: 1.5rem 0 2rem;
border-left: 4px solid;
font-size: 1rem;
}
.callout p {
margin: 0;
font-size: 1rem;
line-height: 1.6;
}
.callout-label {
display: flex;
align-items: center;
gap: 0.5rem;
font-weight: 700;
font-size: 0.8rem;
letter-spacing: 0.05em;
text-transform: uppercase;
margin-bottom: 0.5rem;
}
.callout-info {
background: rgba(56, 189, 248, 0.08);
border-color: var(--primary-color);
}
.callout-info .callout-label {
color: var(--primary-color);
}
.callout-tip {
background: rgba(52, 211, 153, 0.08);
border-color: #34d399;
}
.callout-tip .callout-label {
color: #34d399;
}
.callout-warning {
background: rgba(251, 191, 36, 0.08);
border-color: #fbbf24;
}
.callout-warning .callout-label {
color: #fbbf24;
}
.code-block {
position: relative;
margin: 1.5rem 0;
}
.code-block-label {
display: inline-flex;
align-items: center;
gap: 0.4rem;
background: var(--card-bg);
border: 1px solid var(--border-color);
border-bottom: 1px solid var(--code-bg);
border-radius: 0.5rem 0.5rem 0 0;
padding: 0.4rem 1rem;
font-size: 0.78rem;
font-weight: 600;
color: #94a3b8;
letter-spacing: 0.02em;
position: relative;
top: 1px;
z-index: 1;
}
.code-block-label .label-icon {
font-size: 0.85rem;
opacity: 0.7;
}
.code-block pre {
margin-top: 0 !important;
border-top-left-radius: 0 !important;
}
.styled-table {
width: 100%;
border-collapse: separate;
border-spacing: 0;
margin: 2rem 0;
background: var(--card-bg);
border-radius: 0.75rem;
overflow: hidden;
border: 1px solid var(--border-color);
font-size: 0.95rem;
}
.styled-table thead tr {
background: rgba(255, 255, 255, 0.04);
}
.styled-table th {
padding: 1rem 1.25rem;
text-align: left;
font-weight: 600;
color: #e2e8f0;
font-size: 0.85rem;
letter-spacing: 0.03em;
text-transform: uppercase;
border-bottom: 1px solid var(--border-color);
}
.styled-table td {
padding: 0.875rem 1.25rem;
border-top: 1px solid rgba(255, 255, 255, 0.04);
color: #94a3b8;
}
.styled-table tbody tr:first-child td {
border-top: none;
}
.styled-table tbody tr:hover {
background: rgba(255, 255, 255, 0.02);
}
.styled-table td code {
font-size: 0.82rem;
}
.error-cards .card {
border-top: 3px solid;
transition: transform 0.2s, border-color 0.2s;
}
.error-cards .card:nth-child(1) {
border-top-color: var(--primary-color);
}
.error-cards .card:nth-child(2) {
border-top-color: var(--secondary-color);
}
.error-cards .card:nth-child(3) {
border-top-color: #fbbf24;
}
.error-cards .card:nth-child(4) {
border-top-color: var(--accent-color);
}
.error-cards .card h3 {
font-family: 'Fira Code', monospace;
font-size: 1.1rem;
}
.method-comparison {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 0;
margin: 2rem 0;
border-radius: 0.75rem;
overflow: hidden;
border: 1px solid var(--border-color);
}
.method-col {
padding: 1.5rem;
background: var(--card-bg);
}
.method-col:first-child {
border-right: 1px solid var(--border-color);
}
.method-col h4 {
margin-top: 0;
margin-bottom: 0.5rem;
font-family: 'Fira Code', monospace;
font-size: 1rem;
}
.method-col:first-child h4 {
color: var(--primary-color);
}
.method-col:last-child h4 {
color: #34d399;
}
.method-col p {
font-size: 0.9rem;
margin-bottom: 0;
}
.method-badge {
display: inline-block;
font-size: 0.7rem;
font-weight: 600;
letter-spacing: 0.05em;
text-transform: uppercase;
padding: 0.15rem 0.5rem;
border-radius: 0.25rem;
margin-bottom: 0.5rem;
}
.method-badge-clone {
background: rgba(56, 189, 248, 0.12);
color: var(--primary-color);
}
.method-badge-zerocopy {
background: rgba(52, 211, 153, 0.12);
color: #34d399;
}
@media (max-width: 768px) {
.page-toc ol {
grid-template-columns: 1fr;
}
.page-toc {
padding: 1.25rem 1.25rem;
}
.method-comparison {
grid-template-columns: 1fr;
}
.method-col:first-child {
border-right: none;
border-bottom: 1px solid var(--border-color);
}
.method-col {
padding: 1.25rem;
}
.callout {
padding: 1rem 1.25rem;
}
.styled-table {
font-size: 0.85rem;
display: block;
overflow-x: auto;
-webkit-overflow-scrolling: touch;
}
.styled-table th,
.styled-table td {
padding: 0.625rem 0.75rem;
white-space: nowrap;
}
}
@media (max-width: 480px) {
.styled-table th,
.styled-table td {
padding: 0.5rem 0.625rem;
font-size: 0.8rem;
}
}
</style>
</head>
<body>
<button id="menu-toggle" class="menu-toggle" aria-label="Toggle navigation" aria-expanded="false">☰</button>
<div class="sidebar-overlay"></div>
<nav class="sidebar" role="navigation" aria-label="Main navigation">
<a href="index.html" class="logo">
<img src="logo.png" alt="" style="height: 24px; vertical-align: middle; margin-right: 8px;">
Cano
</a>
<ul class="nav-links">
<li><a href="index.html">Home</a></li>
<li><a href="task.html">Tasks</a></li>
<li><a href="nodes.html">Nodes</a></li>
<li><a href="workflows.html">Workflows</a></li>
<li><a href="store.html" class="active">Store</a></li>
<li><a href="scheduler.html">Scheduler</a></li>
<li><a href="tracing.html">Tracing</a></li>
</ul>
<div class="sidebar-footer">
<span class="version-badge">v0.8.0</span>
<div class="sidebar-links">
<a href="https://github.com/nassor/cano" title="GitHub Repository" aria-label="GitHub">GitHub</a>
<a href="https://crates.io/crates/cano" title="Crates.io" aria-label="Crates.io">Crates.io</a>
<a href="https://docs.rs/cano" title="API Documentation" aria-label="API Docs">Docs.rs</a>
</div>
</div>
</nav>
<main class="main-content">
<h1>Store</h1>
<p class="subtitle">Thread-safe shared state for pipeline data passing between workflow stages.</p>
<p>
The store is the shared data layer that connects workflow stages. Tasks and nodes write
values during their execution and read values produced by upstream stages. The default
implementation, <code>MemoryStore</code>, is an <code>Arc<RwLock<HashMap>></code>
that is safe to clone and share across async tasks. Custom backends implement the
<code>KeyValueStore</code> trait.
</p>
<p>
<code>Workflow</code> is generic over the store type:
<code>Workflow<TState, TStore = MemoryStore></code>. The same applies to
<code>Scheduler<TState, TStore = MemoryStore></code>. Unless you need a custom
backend, the default is sufficient.
</p>
<div class="callout callout-info">
<div class="callout-label">Key concept</div>
<p>
The store is the glue between workflow stages. Each task reads input from the store and writes
its output back, creating a natural data pipeline without tight coupling between stages.
</p>
</div>
<nav class="page-toc" aria-label="Table of contents">
<div class="page-toc-title">On this page</div>
<ol>
<li><a href="#thread-safety">Thread Safety Model</a></li>
<li><a href="#api-reference">API Reference</a></li>
<li><a href="#basic-usage">Basic Usage</a></li>
<li><a href="#zero-copy">Zero-Copy Reads</a></li>
<li><a href="#workflow-usage">Store in a Workflow</a></li>
<li><a href="#custom-backends">Custom Store Backends</a></li>
<li><a href="#error-handling">Error Handling</a></li>
</ol>
</nav>
<hr class="section-divider">
<h2 id="thread-safety">Thread Safety Model</h2>
<div class="mermaid">
graph LR
S["Arc<RwLock<HashMap>>"]
S --> R1["Task A (read)"]
S --> R2["Task B (read)"]
S --> W1["Task C (write)"]
style S fill:#1e293b,stroke:#38bdf8
</div>
<p>
<code>MemoryStore</code> uses <code>Arc<RwLock<_>></code> for interior mutability.
Multiple readers hold shared locks concurrently; writers get exclusive access.
Because <code>MemoryStore</code> wraps its inner map in <code>Arc</code>, cloning the store
produces a second handle to the <em>same</em> underlying data -- not a copy.
</p>
<div class="code-block">
<span class="code-block-label"><span class="label-icon">🔒</span> Clone shares the same data</span>
<pre><code class="language-rust">let store = MemoryStore::new();
let store2 = store.clone(); // same backing map — not a copy
store.put("key", 42i32)?;
let val: i32 = store2.get("key")?; // returns 42</code></pre>
</div>
<hr class="section-divider">
<h2 id="api-reference">API Reference</h2>
<table class="styled-table">
<thead>
<tr>
<th>Method</th>
<th>Signature</th>
<th>Notes</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>get</code></td>
<td><code>get<T: Clone>(&self, key: &str) -> StoreResult<T></code></td>
<td>Returns a clone. Errors on missing key or type mismatch.</td>
</tr>
<tr>
<td><code>get_shared</code></td>
<td><code>get_shared<T: Send + Sync + Clone>(&self, key: &str) -> StoreResult<Arc<T>></code></td>
<td>Zero-copy read. Returns the stored <code>Arc</code> pointer directly when possible.</td>
</tr>
<tr>
<td><code>put</code></td>
<td><code>put<T: Send + Sync + Clone>(&self, key: &str, value: T) -> StoreResult<()></code></td>
<td>Inserts or replaces. Accepts any type.</td>
</tr>
<tr>
<td><code>remove</code></td>
<td><code>remove(&self, key: &str) -> StoreResult<()></code></td>
<td>Silent no-op on missing keys. Errors only on lock failure.</td>
</tr>
<tr>
<td><code>append</code></td>
<td><code>append<T: Send + Sync + Clone>(&self, key: &str, item: T) -> StoreResult<()></code></td>
<td>Appends to an existing <code>Vec<T></code>, or creates one. Type mismatch errors if the key holds a non-Vec.</td>
</tr>
<tr>
<td><code>contains_key</code></td>
<td><code>contains_key(&self, key: &str) -> StoreResult<bool></code></td>
<td>Returns <code>Ok(true)</code> if the key exists.</td>
</tr>
<tr>
<td><code>keys</code></td>
<td><code>keys(&self) -> StoreResult<Vec<String>></code></td>
<td>Returns all keys. Order is unspecified (HashMap).</td>
</tr>
<tr>
<td><code>len</code></td>
<td><code>len(&self) -> StoreResult<usize></code></td>
<td>Number of key-value pairs currently stored.</td>
</tr>
<tr>
<td><code>is_empty</code></td>
<td><code>is_empty(&self) -> StoreResult<bool></code></td>
<td>Default impl delegates to <code>len()</code>.</td>
</tr>
<tr>
<td><code>clear</code></td>
<td><code>clear(&self) -> StoreResult<()></code></td>
<td>Removes all entries.</td>
</tr>
</tbody>
</table>
<hr class="section-divider">
<h2 id="basic-usage">Basic Usage</h2>
<div class="code-block">
<span class="code-block-label"><span class="label-icon">📌</span> Common store operations</span>
<pre><code class="language-rust">use cano::prelude::*;
let store = MemoryStore::new();
// Store primitive and composite types
store.put("count", 42u32)?;
store.put("labels", vec!["a".to_string(), "b".to_string()])?;
// Retrieve with explicit type annotation
let count: u32 = store.get("count")?;
let labels: Vec<String> = store.get("labels")?;
// Append to a Vec (creates if absent)
store.append("log", "step 1 complete".to_string())?;
store.append("log", "step 2 complete".to_string())?;
let log: Vec<String> = store.get("log")?;
// Check existence before reading optional keys
if store.contains_key("result")? {
let result: String = store.get("result")?;
}
// Remove a key — silent no-op if absent
store.remove("scratch")?;</code></pre>
</div>
<div class="callout callout-tip">
<div class="callout-label">Tip</div>
<p>
Use <code>contains_key()</code> before <code>get()</code> for optional keys, or handle the
<code>KeyNotFound</code> error directly. The <code>append()</code> method is handy for
building up collections across multiple workflow stages without overwriting previous entries.
</p>
</div>
<hr class="section-divider">
<h2 id="zero-copy">Zero-Copy Reads with get_shared()</h2>
<p>
<code>get_shared()</code> returns an <code>Arc<T></code> pointing to the value
in the store. When <code>MemoryStore</code> stored the value, it wrapped it in an
<code>Arc</code> internally. <code>get_shared()</code> returns a clone of that pointer --
a cheap reference-count bump, not a data copy. This is useful when passing large
values (e.g., a model's weight tensor or a large dataset) to multiple downstream tasks.
</p>
<div class="code-block">
<span class="code-block-label"><span class="label-icon">⚡</span> Zero-copy with Arc</span>
<pre><code class="language-rust">use cano::prelude::*;
use std::sync::Arc;
let store = MemoryStore::new();
store.put("dataset", vec![0u8; 1_000_000])?; // 1 MB
// Both handles point to the same allocation — no copy
let handle_a: Arc<Vec<u8>> = store.get_shared("dataset")?;
let handle_b: Arc<Vec<u8>> = store.get_shared("dataset")?;
assert!(Arc::ptr_eq(&handle_a, &handle_b));</code></pre>
</div>
<div class="method-comparison">
<div class="method-col">
<span class="method-badge method-badge-clone">Clones data</span>
<h4>get()</h4>
<p>
<code>get()</code> always clones the underlying value. Use when you need an owned copy.
</p>
</div>
<div class="method-col">
<span class="method-badge method-badge-zerocopy">Zero-copy</span>
<h4>get_shared()</h4>
<p>
Prefer <code>get_shared()</code> for large or frequently-read values.
</p>
</div>
</div>
<hr class="section-divider">
<h2 id="workflow-usage">Store in a Workflow</h2>
<p>
Pass a store instance to <code>Workflow::new()</code>. The store is shared by reference
across all registered tasks for the duration of a single <code>orchestrate()</code> call.
</p>
<div class="code-block">
<span class="code-block-label"><span class="label-icon">⚙</span> End-to-end workflow with store</span>
<pre><code class="language-rust">use async_trait::async_trait;
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { Ingest, Transform, Complete }
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
// Pre-populate before the workflow starts
store.put("batch_size", 256usize)?;
let workflow = Workflow::new(store.clone())
.register(Stage::Ingest, |store: &MemoryStore| async move {
let batch: usize = store.get("batch_size")?;
let records: Vec<u32> = (0..batch as u32).collect();
store.put("records", records)?;
Ok(TaskResult::Single(Stage::Transform))
})
.register(Stage::Transform, |store: &MemoryStore| async move {
let records: Vec<u32> = store.get("records")?;
let transformed: Vec<u32> = records.into_iter().map(|x| x * 2).collect();
store.put("result", transformed)?;
Ok(TaskResult::Single(Stage::Complete))
})
.add_exit_state(Stage::Complete);
workflow.orchestrate(Stage::Ingest).await?;
// Read results after the workflow completes
let result: Vec<u32> = store.get("result")?;
println!("Processed {} records", result.len());
Ok(())
}</code></pre>
</div>
<hr class="section-divider">
<h2 id="custom-backends">Custom Store Backends</h2>
<p>
Implement <code>KeyValueStore</code> to plug in any storage backend: a database,
a distributed cache, a file-backed store, or an in-process channel. The trait
requires <code>Send + Sync</code>, so implementations typically use interior mutability.
</p>
<div class="code-block">
<span class="code-block-label"><span class="label-icon">🔧</span> Custom namespaced store backend</span>
<pre><code class="language-rust">use cano::store::{KeyValueStore, StoreResult, StoreError};
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
/// Example: a store that prefixes all keys with a namespace.
#[derive(Clone)]
pub struct NamespacedStore {
namespace: String,
inner: MemoryStore,
}
impl NamespacedStore {
pub fn new(namespace: impl Into<String>) -> Self {
Self {
namespace: namespace.into(),
inner: MemoryStore::new(),
}
}
fn ns_key(&self, key: &str) -> String {
format!("{}::{}", self.namespace, key)
}
}
impl KeyValueStore for NamespacedStore {
fn get<T: 'static + Clone>(&self, key: &str) -> StoreResult<T> {
self.inner.get(&self.ns_key(key))
}
fn put<T: 'static + Send + Sync + Clone>(&self, key: &str, value: T) -> StoreResult<()> {
self.inner.put(&self.ns_key(key), value)
}
fn remove(&self, key: &str) -> StoreResult<()> {
self.inner.remove(&self.ns_key(key))
}
fn append<T: 'static + Send + Sync + Clone>(&self, key: &str, item: T) -> StoreResult<()> {
self.inner.append(&self.ns_key(key), item)
}
fn contains_key(&self, key: &str) -> StoreResult<bool> {
self.inner.contains_key(&self.ns_key(key))
}
fn keys(&self) -> StoreResult<Vec<String>> {
// Strip the namespace prefix before returning keys to callers
let prefix = format!("{}::", self.namespace);
let raw_keys = self.inner.keys()?;
Ok(raw_keys
.into_iter()
.filter_map(|k| k.strip_prefix(&prefix).map(str::to_string))
.collect())
}
fn len(&self) -> StoreResult<usize> {
// Count only keys in this namespace
Ok(self.keys()?.len())
}
fn clear(&self) -> StoreResult<()> {
// Remove only keys in this namespace
for key in self.keys()? {
self.remove(&key)?;
}
Ok(())
}
}
// Use it like any other store
let store = NamespacedStore::new("pipeline_v2");
store.put("status", "running".to_string())?;
let workflow = Workflow::new(store.clone())
// ... register tasks ...
;</code></pre>
</div>
<div class="callout callout-warning">
<div class="callout-label">Trait object note</div>
<p>
<code>KeyValueStore</code> is not object-safe due to its generic methods.
You cannot store it as <code>Box<dyn KeyValueStore></code> directly. Use a concrete type parameter
(<code>Workflow<S, MyStore></code>) or wrap the store in a newtype that exposes a non-generic API.
</p>
</div>
<hr class="section-divider">
<h2 id="error-handling">Error Handling</h2>
<p>
Store operations return <code>StoreResult<T></code>, which is
<code>Result<T, StoreError></code>. <code>StoreError</code> converts automatically
to <code>CanoError::Store</code> via the <code>From</code> impl, so the <code>?</code>
operator works transparently in task and node methods.
</p>
<div class="card-grid error-cards">
<div class="card">
<h3>KeyNotFound</h3>
<p>The requested key does not exist. Use <code>contains_key()</code> to guard optional reads, or handle the error with <code>.unwrap_or_default()</code>.</p>
</div>
<div class="card">
<h3>TypeMismatch</h3>
<p>The stored value cannot be downcast to the requested type. Ensure consistent type usage per key across all pipeline stages.</p>
</div>
<div class="card">
<h3>LockError</h3>
<p>The internal <code>RwLock</code> is poisoned -- a thread panicked while holding the lock. Typically fatal; log and restart.</p>
</div>
<div class="card">
<h3>AppendTypeMismatch</h3>
<p>The key exists but holds a non-<code>Vec</code> value. Mixing scalar and collection writes to the same key is a logic error.</p>
</div>
</div>
</main>
</body>
</html>