use crate::flow_dispatcher::{DispatchCtx, DispatchError, NodeOutcome};
use crate::flow_execution_event::{now_ms, FlowExecutionEvent};
use crate::ir_nodes::{
IRConsensusBlock, IRDeliberateBlock, IRDiscover, IREmit, IRMutateStep,
IRPersistStep, IRPublish, IRPurgeStep, IRRetrieveStep, IRTransactBlock,
};
use crate::store::audit_chain::StoreMutationKind;
use crate::store::capability;
use crate::store::epistemic;
use crate::store::row_stream;
use crate::store::filter::SqlValue;
use crate::store::postgres_backend::{PostgresStoreBackend, StoreError};
use crate::store::registry::StoreHandle;
pub fn emit_to_channel(channel_ref: &str, value: &str, ctx: &mut DispatchCtx) -> String {
let key = format!("__channel_{channel_ref}");
let existing = ctx.let_bindings.get(&key).cloned().unwrap_or_default();
let updated = if existing.is_empty() {
value.to_string()
} else {
format!("{existing}\n{value}")
};
ctx.let_bindings.insert(key, updated);
value.to_string()
}
pub fn publish_capability(channel_ref: &str, shield_ref: &str, ctx: &mut DispatchCtx) -> String {
let key = format!("__pub_{channel_ref}");
ctx.let_bindings.insert(key, shield_ref.to_string());
format!("published {channel_ref} with {shield_ref}")
}
pub fn discover_capability(capability_ref: &str, ctx: &DispatchCtx) -> String {
let key = format!("__pub_{capability_ref}");
ctx.let_bindings.get(&key).cloned().unwrap_or_default()
}
pub fn persist_to_store(store_name: &str, ctx: &mut DispatchCtx) -> usize {
let prefix = format!("__store_{store_name}_");
let user_bindings: Vec<(String, String)> = ctx
.let_bindings
.iter()
.filter(|(k, _)| !k.starts_with("__"))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let count = user_bindings.len();
for (k, v) in user_bindings {
ctx.let_bindings.insert(format!("{prefix}{k}"), v);
}
count
}
pub fn retrieve_from_store(
store_name: &str,
where_expr: &str,
ctx: &DispatchCtx,
) -> String {
let key = format!("__store_{store_name}_{where_expr}");
ctx.let_bindings.get(&key).cloned().unwrap_or_default()
}
pub fn mutate_store(store_name: &str, where_expr: &str, ctx: &mut DispatchCtx) -> u64 {
let key = format!("__store_{store_name}_{where_expr}");
if !ctx.let_bindings.contains_key(&key) {
return 0;
}
let new_value = ctx
.let_bindings
.get(where_expr)
.cloned()
.unwrap_or_else(|| where_expr.to_string());
ctx.let_bindings.insert(key, new_value);
1
}
pub fn purge_from_store(
store_name: &str,
where_expr: &str,
ctx: &mut DispatchCtx,
) -> u64 {
let key = format!("__store_{store_name}_{where_expr}");
if ctx.let_bindings.remove(&key).is_some() {
1
} else {
0
}
}
fn resolve_pg_backend(
ctx: &DispatchCtx,
store_name: &str,
) -> Result<Option<(PostgresStoreBackend, Option<f64>)>, StoreError> {
let Some(registry) = ctx.store_registry.as_ref() else {
return Ok(None);
};
match registry.resolve(store_name)? {
StoreHandle::InMemory => Ok(None),
StoreHandle::Postgres(backend) => {
let floor =
registry.spec(store_name).and_then(|s| s.confidence_floor);
Ok(Some((backend, floor)))
}
}
}
fn sql_row_from_bindings(ctx: &DispatchCtx) -> Vec<(String, SqlValue)> {
let mut row: Vec<(String, SqlValue)> = ctx
.let_bindings
.iter()
.filter(|(k, _)| !k.starts_with("__"))
.map(|(k, v)| (k.clone(), SqlValue::Text(v.clone())))
.collect();
row.sort_by(|a, b| a.0.cmp(&b.0));
row
}
fn store_row(fields: &[(String, String)], ctx: &DispatchCtx) -> Vec<(String, SqlValue)> {
if fields.is_empty() {
return sql_row_from_bindings(ctx);
}
fields
.iter()
.map(|(col, expr)| {
(
col.clone(),
SqlValue::Text(crate::exec_context::interpolate_vars(
expr,
&ctx.let_bindings,
)),
)
})
.collect()
}
fn sql_dispatch_error(e: StoreError) -> DispatchError {
DispatchError::BackendError {
name: "axonstore".to_string(),
message: e.to_string(),
}
}
fn enforce_store_capability(
ctx: &DispatchCtx,
store_name: &str,
) -> Result<(), DispatchError> {
let Some(held) = ctx.held_capabilities.as_ref() else {
return Ok(());
};
let required = ctx
.store_registry
.as_ref()
.and_then(|r| r.spec(store_name))
.map(|s| s.capability.as_str())
.unwrap_or("");
capability::check_store_capability(store_name, required, held).map_err(
|denied| DispatchError::BackendError {
name: "axonstore.capability".to_string(),
message: denied.to_string(),
},
)
}
fn record_store_mutation(
ctx: &DispatchCtx,
kind: StoreMutationKind,
store: &str,
summary: &str,
) {
let mut chain = ctx
.audit_chain
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
chain.record(kind, store, summary);
}
pub async fn run_emit(
node: &IREmit,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.channel_ref.is_empty() {
"Emit".to_string()
} else {
node.channel_ref.clone()
};
emit_step_start(ctx, &step_name, step_index, "emit")?;
let resolved_value = ctx
.let_bindings
.get(&node.value_ref)
.cloned()
.unwrap_or_else(|| node.value_ref.clone());
let emitted = emit_to_channel(&node.channel_ref, &resolved_value, ctx);
emit_step_complete(ctx, &step_name, step_index, &emitted, 0)?;
Ok(NodeOutcome::Completed {
output: emitted,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_publish(
node: &IRPublish,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.channel_ref.is_empty() {
"Publish".to_string()
} else {
node.channel_ref.clone()
};
emit_step_start(ctx, &step_name, step_index, "publish")?;
let output = publish_capability(&node.channel_ref, &node.shield_ref, ctx);
emit_step_complete(ctx, &step_name, step_index, &output, 0)?;
Ok(NodeOutcome::Completed {
output,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_discover(
node: &IRDiscover,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.alias.is_empty() {
"Discover".to_string()
} else {
node.alias.clone()
};
emit_step_start(ctx, &step_name, step_index, "discover")?;
let discovered = discover_capability(&node.capability_ref, ctx);
if !node.alias.is_empty() {
ctx.let_bindings.insert(node.alias.clone(), discovered.clone());
}
emit_step_complete(ctx, &step_name, step_index, &discovered, 0)?;
Ok(NodeOutcome::Completed {
output: discovered,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_persist(
node: &IRPersistStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.store_name.is_empty() {
"Persist".to_string()
} else {
node.store_name.clone()
};
enforce_store_capability(ctx, &node.store_name)?;
emit_step_start(ctx, &step_name, step_index, "persist")?;
let output = match resolve_pg_backend(ctx, &node.store_name) {
Ok(Some((backend, floor))) => {
let row = store_row(&node.fields, ctx);
epistemic::enforce_persist_floor(&row, floor, &node.store_name)
.map_err(|e| sql_dispatch_error(StoreError::from(e)))?;
let n = backend
.insert(&node.store_name, &row)
.await
.map_err(sql_dispatch_error)?;
format!("persisted {n} row(s) to `{}`", node.store_name)
}
Ok(None) => {
let count = persist_to_store(&node.store_name, ctx);
format!("persisted {count} entries to `{}`", node.store_name)
}
Err(e) => return Err(sql_dispatch_error(e)),
};
record_store_mutation(ctx, StoreMutationKind::Persist, &node.store_name, &output);
emit_step_complete(ctx, &step_name, step_index, &output, 0)?;
Ok(NodeOutcome::Completed {
output,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_retrieve(
node: &IRRetrieveStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.alias.is_empty() {
"Retrieve".to_string()
} else {
node.alias.clone()
};
enforce_store_capability(ctx, &node.store_name)?;
emit_step_start(ctx, &step_name, step_index, "retrieve")?;
let value = match resolve_pg_backend(ctx, &node.store_name) {
Ok(Some((backend, floor))) => {
let stream_outcome = row_stream::stream_retrieve(
&backend,
&node.store_name,
&node.where_expr,
row_stream::DEFAULT_RETRIEVE_POLICY,
row_stream::DEFAULT_MAX_ROWS,
&ctx.cancel,
&ctx.let_bindings,
)
.await
.map_err(sql_dispatch_error)?;
let metadata = row_stream::stream_metadata(
row_stream::DEFAULT_RETRIEVE_POLICY,
&stream_outcome,
);
let floored = epistemic::enforce_retrieve_floor(
epistemic::mark_retrieved(stream_outcome.rows),
floor,
);
let mut envelope = epistemic::retrieve_envelope(&floored, floor);
envelope["stream"] = metadata;
serde_json::to_string(&envelope).unwrap_or_else(|_| "{}".to_string())
}
Ok(None) => retrieve_from_store(&node.store_name, &node.where_expr, ctx),
Err(e) => return Err(sql_dispatch_error(e)),
};
if !node.alias.is_empty() {
ctx.let_bindings.insert(node.alias.clone(), value.clone());
}
emit_step_complete(ctx, &step_name, step_index, &value, 0)?;
Ok(NodeOutcome::Completed {
output: value,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_mutate(
node: &IRMutateStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.store_name.is_empty() {
"Mutate".to_string()
} else {
node.store_name.clone()
};
enforce_store_capability(ctx, &node.store_name)?;
emit_step_start(ctx, &step_name, step_index, "mutate")?;
let output = match resolve_pg_backend(ctx, &node.store_name) {
Ok(Some((backend, _floor))) => {
let row = store_row(&node.fields, ctx);
let n = backend
.mutate(&node.store_name, &node.where_expr, &row, &ctx.let_bindings)
.await
.map_err(sql_dispatch_error)?;
format!("mutated {n} row(s) in `{}`", node.store_name)
}
Ok(None) => {
let count = mutate_store(&node.store_name, &node.where_expr, ctx);
format!("mutated {count} entries in `{}`", node.store_name)
}
Err(e) => return Err(sql_dispatch_error(e)),
};
record_store_mutation(ctx, StoreMutationKind::Mutate, &node.store_name, &output);
emit_step_complete(ctx, &step_name, step_index, &output, 0)?;
Ok(NodeOutcome::Completed {
output,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_purge(
node: &IRPurgeStep,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
let step_name = if node.store_name.is_empty() {
"Purge".to_string()
} else {
node.store_name.clone()
};
enforce_store_capability(ctx, &node.store_name)?;
emit_step_start(ctx, &step_name, step_index, "purge")?;
let output = match resolve_pg_backend(ctx, &node.store_name) {
Ok(Some((backend, _floor))) => {
let n = backend
.purge(&node.store_name, &node.where_expr, &ctx.let_bindings)
.await
.map_err(sql_dispatch_error)?;
format!("purged {n} row(s) from `{}`", node.store_name)
}
Ok(None) => {
let count = purge_from_store(&node.store_name, &node.where_expr, ctx);
format!("purged {count} entries from `{}`", node.store_name)
}
Err(e) => return Err(sql_dispatch_error(e)),
};
record_store_mutation(ctx, StoreMutationKind::Purge, &node.store_name, &output);
emit_step_complete(ctx, &step_name, step_index, &output, 0)?;
Ok(NodeOutcome::Completed {
output,
tokens_emitted: 0,
step_index,
})
}
pub async fn run_transact(
_node: &IRTransactBlock,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
emit_step_start(ctx, "Transact", step_index, "transact")?;
ctx.let_bindings
.insert("__txn_active".to_string(), "true".to_string());
emit_step_complete(ctx, "Transact", step_index, "", 0)?;
Ok(NodeOutcome::Completed {
output: String::new(),
tokens_emitted: 0,
step_index,
})
}
pub async fn run_deliberate(
_node: &IRDeliberateBlock,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
emit_step_start(ctx, "Deliberate", step_index, "deliberate")?;
emit_step_complete(ctx, "Deliberate", step_index, "", 0)?;
Ok(NodeOutcome::Completed {
output: String::new(),
tokens_emitted: 0,
step_index,
})
}
pub async fn run_consensus(
_node: &IRConsensusBlock,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
if ctx.cancel.is_cancelled() {
return Err(DispatchError::UpstreamCancelled);
}
let step_index = ctx.step_counter;
ctx.step_counter += 1;
emit_step_start(ctx, "Consensus", step_index, "consensus")?;
emit_step_complete(ctx, "Consensus", step_index, "", 0)?;
Ok(NodeOutcome::Completed {
output: String::new(),
tokens_emitted: 0,
step_index,
})
}
fn emit_step_start(
ctx: &mut DispatchCtx,
step_name: &str,
step_index: usize,
step_type: &str,
) -> Result<(), DispatchError> {
ctx.tx
.send(FlowExecutionEvent::StepStart {
step_name: step_name.to_string(),
step_index,
step_type: step_type.to_string(),
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)
}
fn emit_step_complete(
ctx: &mut DispatchCtx,
step_name: &str,
step_index: usize,
full_output: &str,
tokens_output: u64,
) -> Result<(), DispatchError> {
ctx.tx
.send(FlowExecutionEvent::StepComplete {
step_name: step_name.to_string(),
step_index,
success: true,
full_output: full_output.to_string(),
tokens_input: 0,
tokens_output,
timestamp_ms: now_ms(),
})
.map_err(|_| DispatchError::ChannelClosed)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cancel_token::CancellationFlag;
use crate::ir_nodes::*;
use tokio::sync::mpsc;
fn fresh_ctx() -> (
DispatchCtx,
mpsc::UnboundedReceiver<FlowExecutionEvent>,
) {
let (tx, rx) = mpsc::unbounded_channel();
let ctx = DispatchCtx::new(
"TestFlow",
"stub",
"",
CancellationFlag::new(),
tx,
);
(ctx, rx)
}
#[test]
fn emit_to_channel_appends_to_buffer() {
let (mut ctx, _rx) = fresh_ctx();
emit_to_channel("c1", "v1", &mut ctx);
emit_to_channel("c1", "v2", &mut ctx);
let buffer = ctx.let_bindings.get("__channel_c1").unwrap();
assert_eq!(buffer, "v1\nv2");
}
#[test]
fn publish_then_discover_round_trip() {
let (mut ctx, _rx) = fresh_ctx();
publish_capability("user_inbox", "shield_pii", &mut ctx);
assert_eq!(discover_capability("user_inbox", &ctx), "shield_pii");
}
#[test]
fn discover_missing_returns_empty() {
let (ctx, _rx) = fresh_ctx();
assert_eq!(discover_capability("never_set", &ctx), "");
}
#[test]
fn persist_snapshots_user_bindings() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("name".into(), "alice".into());
ctx.let_bindings.insert("age".into(), "30".into());
ctx.let_bindings
.insert("__internal".into(), "should_not_be_snapshotted".into());
let count = persist_to_store("users", &mut ctx);
assert_eq!(count, 2);
assert_eq!(ctx.let_bindings.get("__store_users_name").unwrap(), "alice");
assert_eq!(ctx.let_bindings.get("__store_users_age").unwrap(), "30");
assert!(!ctx.let_bindings.contains_key("__store_users___internal"));
}
#[test]
fn retrieve_from_store_returns_persisted_value() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("city".into(), "Bogota".into());
persist_to_store("locations", &mut ctx);
assert_eq!(retrieve_from_store("locations", "city", &ctx), "Bogota");
}
#[test]
fn mutate_store_updates_existing_entry() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("counter".into(), "1".into());
persist_to_store("metrics", &mut ctx);
ctx.let_bindings.insert("counter".into(), "2".into());
let count = mutate_store("metrics", "counter", &mut ctx);
assert_eq!(count, 1);
assert_eq!(
ctx.let_bindings.get("__store_metrics_counter").unwrap(),
"2"
);
}
#[test]
fn mutate_missing_entry_returns_zero() {
let (mut ctx, _rx) = fresh_ctx();
assert_eq!(mutate_store("empty", "k", &mut ctx), 0);
}
#[test]
fn purge_removes_entry() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("key".into(), "value".into());
persist_to_store("s", &mut ctx);
let count = purge_from_store("s", "key", &mut ctx);
assert_eq!(count, 1);
assert!(!ctx.let_bindings.contains_key("__store_s_key"));
}
#[test]
fn purge_missing_returns_zero() {
let (mut ctx, _rx) = fresh_ctx();
assert_eq!(purge_from_store("s", "absent", &mut ctx), 0);
}
#[tokio::test]
async fn run_emit_appends_to_channel_buffer() {
let (mut ctx, mut rx) = fresh_ctx();
ctx.let_bindings.insert("payload".into(), "hello".into());
let node = IREmit {
node_type: "emit",
source_line: 0,
source_column: 0,
channel_ref: "out_channel".into(),
value_ref: "payload".into(),
value_is_channel: false,
};
let outcome = run_emit(&node, &mut ctx).await.unwrap();
match outcome {
NodeOutcome::Completed { output, .. } => assert_eq!(output, "hello"),
other => panic!("expected Completed, got {other:?}"),
}
assert_eq!(
ctx.let_bindings.get("__channel_out_channel").unwrap(),
"hello"
);
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "emit");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn run_publish_records_capability() {
let (mut ctx, mut rx) = fresh_ctx();
let node = IRPublish {
node_type: "publish",
source_line: 0,
source_column: 0,
channel_ref: "secure_chan".into(),
shield_ref: "hipaa".into(),
};
run_publish(&node, &mut ctx).await.unwrap();
assert_eq!(
ctx.let_bindings.get("__pub_secure_chan").unwrap(),
"hipaa"
);
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "publish");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn run_discover_binds_under_alias() {
let (mut ctx, mut rx) = fresh_ctx();
publish_capability("secure_chan", "hipaa", &mut ctx);
let node = IRDiscover {
node_type: "discover",
source_line: 0,
source_column: 0,
capability_ref: "secure_chan".into(),
alias: "found".into(),
};
run_discover(&node, &mut ctx).await.unwrap();
assert_eq!(ctx.let_bindings.get("found").unwrap(), "hipaa");
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "discover");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn run_persist_then_retrieve_round_trip() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("id".into(), "42".into());
ctx.let_bindings.insert("name".into(), "test".into());
let persist = IRPersistStep {
node_type: "persist",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "entities".into(),
};
run_persist(&persist, &mut ctx).await.unwrap();
let retrieve = IRRetrieveStep {
node_type: "retrieve",
source_line: 0,
source_column: 0,
store_name: "entities".into(),
where_expr: "id".into(),
alias: "retrieved_id".into(),
};
run_retrieve(&retrieve, &mut ctx).await.unwrap();
assert_eq!(ctx.let_bindings.get("retrieved_id").unwrap(), "42");
}
#[test]
fn store_row_scopes_to_the_declared_field_block() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("message".into(), "hello".into());
ctx.let_bindings.insert("tenant_id".into(), "acme".into());
ctx.let_bindings
.insert("channel_kind".into(), "whatsapp".into());
let node = IRPersistStep {
node_type: "persist",
source_line: 0,
source_column: 0,
store_name: "chat_history".into(),
fields: vec![
("sender".into(), "user".into()),
("content".into(), "${message}".into()),
("tenant_id".into(), "${tenant_id}".into()),
],
};
let row = store_row(&node.fields, &ctx);
assert_eq!(
row,
vec![
("sender".to_string(), SqlValue::Text("user".into())),
("content".to_string(), SqlValue::Text("hello".into())),
("tenant_id".to_string(), SqlValue::Text("acme".into())),
]
);
assert!(!row
.iter()
.any(|(c, _)| c == "channel_kind" || c == "message"));
}
#[test]
fn store_row_without_a_block_falls_back_to_user_bindings() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("a".into(), "1".into());
ctx.let_bindings.insert("b".into(), "2".into());
let node = IRPersistStep {
node_type: "persist",
source_line: 0,
source_column: 0,
store_name: "s".into(),
fields: Vec::new(),
};
assert_eq!(store_row(&node.fields, &ctx), sql_row_from_bindings(&ctx));
}
#[test]
fn store_row_for_a_mutate_node_scopes_to_its_set_block() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("new_balance".into(), "500".into());
ctx.let_bindings.insert("tenant_id".into(), "acme".into());
let node = IRMutateStep {
node_type: "mutate",
source_line: 0,
source_column: 0,
store_name: "accounts".into(),
where_expr: "id = 1".into(),
fields: vec![
("balance".into(), "${new_balance}".into()),
("status".into(), "active".into()),
],
};
let row = store_row(&node.fields, &ctx);
assert_eq!(
row,
vec![
("balance".to_string(), SqlValue::Text("500".into())),
("status".to_string(), SqlValue::Text("active".into())),
]
);
assert!(!row.iter().any(|(c, _)| c == "tenant_id"));
}
#[tokio::test]
async fn run_mutate_updates_existing() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("counter".into(), "1".into());
let persist = IRPersistStep {
node_type: "persist",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "stats".into(),
};
run_persist(&persist, &mut ctx).await.unwrap();
ctx.let_bindings.insert("counter".into(), "2".into());
let mutate = IRMutateStep {
node_type: "mutate",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "stats".into(),
where_expr: "counter".into(),
};
let outcome = run_mutate(&mutate, &mut ctx).await.unwrap();
match outcome {
NodeOutcome::Completed { output, .. } => {
assert!(output.contains("mutated 1 entries"));
}
other => panic!("expected Completed, got {other:?}"),
}
assert_eq!(
ctx.let_bindings.get("__store_stats_counter").unwrap(),
"2"
);
}
#[tokio::test]
async fn run_purge_removes_persisted_entry() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("tmp".into(), "data".into());
run_persist(
&IRPersistStep {
node_type: "persist",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "scratch".into(),
},
&mut ctx,
)
.await
.unwrap();
let outcome = run_purge(
&IRPurgeStep {
node_type: "purge",
source_line: 0,
source_column: 0,
store_name: "scratch".into(),
where_expr: "tmp".into(),
},
&mut ctx,
)
.await
.unwrap();
match outcome {
NodeOutcome::Completed { output, .. } => {
assert!(output.contains("purged 1 entries"));
}
other => panic!("expected Completed, got {other:?}"),
}
assert!(!ctx.let_bindings.contains_key("__store_scratch_tmp"));
}
#[tokio::test]
async fn run_transact_sets_active_marker() {
let (mut ctx, mut rx) = fresh_ctx();
run_transact(
&IRTransactBlock {
node_type: "transact",
source_line: 0,
source_column: 0,
},
&mut ctx,
)
.await
.unwrap();
assert_eq!(ctx.let_bindings.get("__txn_active").unwrap(), "true");
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "transact");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn run_deliberate_canonical_wire_shape() {
let (mut ctx, mut rx) = fresh_ctx();
run_deliberate(
&IRDeliberateBlock {
node_type: "deliberate",
source_line: 0,
source_column: 0,
},
&mut ctx,
)
.await
.unwrap();
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "deliberate");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn run_consensus_canonical_wire_shape() {
let (mut ctx, mut rx) = fresh_ctx();
run_consensus(
&IRConsensusBlock {
node_type: "consensus",
source_line: 0,
source_column: 0,
},
&mut ctx,
)
.await
.unwrap();
let first = rx.try_recv().unwrap();
match first {
FlowExecutionEvent::StepStart { step_type, .. } => {
assert_eq!(step_type, "consensus");
}
e => panic!("expected StepStart, got {e:?}"),
}
}
#[tokio::test]
async fn every_handler_short_circuits_on_cancel() {
let cancel = CancellationFlag::new();
cancel.cancel();
let (tx, _rx) = mpsc::unbounded_channel();
let mut ctx = DispatchCtx::new("F", "stub", "", cancel, tx);
let emit = IREmit {
node_type: "emit",
source_line: 0,
source_column: 0,
channel_ref: "c".into(),
value_ref: "v".into(),
value_is_channel: false,
};
assert!(matches!(run_emit(&emit, &mut ctx).await, Err(DispatchError::UpstreamCancelled)));
let publish = IRPublish {
node_type: "publish",
source_line: 0,
source_column: 0,
channel_ref: "c".into(),
shield_ref: "s".into(),
};
assert!(matches!(run_publish(&publish, &mut ctx).await, Err(DispatchError::UpstreamCancelled)));
let discover = IRDiscover {
node_type: "discover",
source_line: 0,
source_column: 0,
capability_ref: "c".into(),
alias: "a".into(),
};
assert!(matches!(run_discover(&discover, &mut ctx).await, Err(DispatchError::UpstreamCancelled)));
let persist = IRPersistStep {
node_type: "persist",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "s".into(),
};
assert!(matches!(run_persist(&persist, &mut ctx).await, Err(DispatchError::UpstreamCancelled)));
let retrieve = IRRetrieveStep {
node_type: "retrieve",
source_line: 0,
source_column: 0,
store_name: "s".into(),
where_expr: "w".into(),
alias: "a".into(),
};
assert!(matches!(run_retrieve(&retrieve, &mut ctx).await, Err(DispatchError::UpstreamCancelled)));
let mutate = IRMutateStep {
node_type: "mutate",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "s".into(),
where_expr: "w".into(),
};
assert!(matches!(run_mutate(&mutate, &mut ctx).await, Err(DispatchError::UpstreamCancelled)));
let purge = IRPurgeStep {
node_type: "purge",
source_line: 0,
source_column: 0,
store_name: "s".into(),
where_expr: "w".into(),
};
assert!(matches!(run_purge(&purge, &mut ctx).await, Err(DispatchError::UpstreamCancelled)));
let transact = IRTransactBlock {
node_type: "transact",
source_line: 0,
source_column: 0,
};
assert!(matches!(run_transact(&transact, &mut ctx).await, Err(DispatchError::UpstreamCancelled)));
let deliberate = IRDeliberateBlock {
node_type: "deliberate",
source_line: 0,
source_column: 0,
};
assert!(matches!(run_deliberate(&deliberate, &mut ctx).await, Err(DispatchError::UpstreamCancelled)));
let consensus = IRConsensusBlock {
node_type: "consensus",
source_line: 0,
source_column: 0,
};
assert!(matches!(run_consensus(&consensus, &mut ctx).await, Err(DispatchError::UpstreamCancelled)));
}
fn axonstore(name: &str, backend: &str, connection: &str) -> IRAxonStore {
IRAxonStore {
node_type: "axonstore",
source_line: 0,
source_column: 0,
name: name.to_string(),
backend: backend.to_string(),
connection: connection.to_string(),
confidence_floor: None,
isolation: String::new(),
on_breach: String::new(),
capability: String::new(),
column_schema: None,
}
}
fn ctx_with_registry(
specs: &[IRAxonStore],
) -> (DispatchCtx, mpsc::UnboundedReceiver<FlowExecutionEvent>) {
let (tx, rx) = mpsc::unbounded_channel();
let registry = crate::store::registry::StoreRegistry::build(specs).unwrap();
let ctx = DispatchCtx::new("TestFlow", "stub", "", CancellationFlag::new(), tx)
.with_store_registry(std::sync::Arc::new(registry));
(ctx, rx)
}
#[test]
fn resolve_pg_backend_no_registry_is_kv() {
let (ctx, _rx) = fresh_ctx();
assert!(resolve_pg_backend(&ctx, "anything").unwrap().is_none());
}
#[test]
fn resolve_pg_backend_in_memory_store_is_kv() {
let (ctx, _rx) = ctx_with_registry(&[axonstore("cache", "in_memory", "")]);
assert!(resolve_pg_backend(&ctx, "cache").unwrap().is_none());
assert!(resolve_pg_backend(&ctx, "undeclared").unwrap().is_none());
}
#[test]
fn resolve_pg_backend_missing_env_var_errors_not_kv_fallback() {
let (ctx, _rx) = ctx_with_registry(&[axonstore(
"tenants",
"postgresql",
"env:AXON_NONEXISTENT_VAR_FASE35F",
)]);
assert!(matches!(
resolve_pg_backend(&ctx, "tenants"),
Err(StoreError::MissingEnvVar { .. })
));
}
#[tokio::test]
async fn run_retrieve_postgresql_missing_env_surfaces_backend_error() {
let (mut ctx, _rx) = ctx_with_registry(&[axonstore(
"tenants",
"postgresql",
"env:AXON_NONEXISTENT_VAR_FASE35F",
)]);
let node = IRRetrieveStep {
node_type: "retrieve",
source_line: 0,
source_column: 0,
store_name: "tenants".into(),
where_expr: "id = 1".into(),
alias: "found".into(),
};
assert!(matches!(
run_retrieve(&node, &mut ctx).await,
Err(DispatchError::BackendError { .. })
));
}
#[tokio::test]
async fn run_persist_postgresql_malformed_dsn_surfaces_backend_error() {
let (mut ctx, _rx) =
ctx_with_registry(&[axonstore("events", "postgresql", "not a dsn")]);
ctx.let_bindings.insert("kind".into(), "login".into());
let node = IRPersistStep {
node_type: "persist",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "events".into(),
};
assert!(matches!(
run_persist(&node, &mut ctx).await,
Err(DispatchError::BackendError { .. })
));
}
#[tokio::test]
async fn run_persist_in_memory_store_keeps_byte_identical_kv_path() {
let (mut ctx, _rx) = ctx_with_registry(&[axonstore("cache", "in_memory", "")]);
ctx.let_bindings.insert("k".into(), "v".into());
let node = IRPersistStep {
node_type: "persist",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "cache".into(),
};
match run_persist(&node, &mut ctx).await.unwrap() {
NodeOutcome::Completed { output, .. } => {
assert!(output.contains("entries"), "KV path output shape");
}
other => panic!("expected Completed, got {other:?}"),
}
assert_eq!(ctx.let_bindings.get("__store_cache_k").unwrap(), "v");
}
#[tokio::test]
async fn run_persist_below_confidence_floor_is_blocked() {
let mut store =
axonstore("ledger", "postgresql", "postgresql://u:p@localhost:5432/db");
store.confidence_floor = Some(0.8);
let (mut ctx, _rx) = ctx_with_registry(&[store]);
ctx.let_bindings.insert("amount".into(), "100".into()); let node = IRPersistStep {
node_type: "persist",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "ledger".into(),
};
assert!(matches!(
run_persist(&node, &mut ctx).await,
Err(DispatchError::BackendError { .. })
));
}
fn gated_kv(name: &str, capability: &str) -> IRAxonStore {
let mut s = axonstore(name, "in_memory", "");
s.capability = capability.to_string();
s
}
fn ctx_with_caps(
specs: &[IRAxonStore],
held: Vec<String>,
) -> (DispatchCtx, mpsc::UnboundedReceiver<FlowExecutionEvent>) {
let (tx, rx) = mpsc::unbounded_channel();
let registry = crate::store::registry::StoreRegistry::build(specs).unwrap();
let ctx = DispatchCtx::new("F", "stub", "", CancellationFlag::new(), tx)
.with_store_registry(std::sync::Arc::new(registry))
.with_held_capabilities(held);
(ctx, rx)
}
fn retrieve_node(store: &str) -> IRRetrieveStep {
IRRetrieveStep {
node_type: "retrieve",
source_line: 0,
source_column: 0,
store_name: store.to_string(),
where_expr: "k".to_string(),
alias: "v".to_string(),
}
}
#[tokio::test]
async fn retrieve_denied_when_capability_not_held() {
let (mut ctx, _rx) = ctx_with_caps(
&[gated_kv("tenants", "tenant.read")],
vec!["audit.write".to_string()],
);
assert!(matches!(
run_retrieve(&retrieve_node("tenants"), &mut ctx).await,
Err(DispatchError::BackendError { .. })
));
}
#[tokio::test]
async fn retrieve_allowed_when_capability_held() {
let (mut ctx, _rx) = ctx_with_caps(
&[gated_kv("tenants", "tenant.read")],
vec!["tenant.read".to_string()],
);
assert!(run_retrieve(&retrieve_node("tenants"), &mut ctx).await.is_ok());
}
#[tokio::test]
async fn persist_into_gated_store_denied_without_capability() {
let (mut ctx, _rx) =
ctx_with_caps(&[gated_kv("ledger", "ledger.write")], vec![]);
let node = IRPersistStep {
node_type: "persist",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "ledger".into(),
};
assert!(matches!(
run_persist(&node, &mut ctx).await,
Err(DispatchError::BackendError { .. })
));
}
#[tokio::test]
async fn ungated_store_needs_no_capability() {
let (mut ctx, _rx) =
ctx_with_caps(&[axonstore("cache", "in_memory", "")], vec![]);
assert!(run_retrieve(&retrieve_node("cache"), &mut ctx).await.is_ok());
}
#[tokio::test]
async fn no_capability_context_skips_the_runtime_recheck() {
let (mut ctx, _rx) = ctx_with_registry(&[gated_kv("tenants", "tenant.read")]);
assert!(ctx.held_capabilities.is_none());
assert!(run_retrieve(&retrieve_node("tenants"), &mut ctx).await.is_ok());
}
#[tokio::test]
async fn persist_appends_a_delta_to_the_audit_chain() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("k".into(), "v".into());
let node = IRPersistStep {
node_type: "persist",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "s".into(),
};
run_persist(&node, &mut ctx).await.unwrap();
let chain = ctx.audit_chain.lock().unwrap();
assert_eq!(chain.len(), 1);
assert_eq!(
chain.verify(),
crate::store::audit_chain::ChainVerdict::Intact
);
}
#[tokio::test]
async fn retrieve_does_not_append_an_audit_delta() {
let (mut ctx, _rx) = fresh_ctx();
run_retrieve(&retrieve_node("s"), &mut ctx).await.unwrap();
assert!(ctx.audit_chain.lock().unwrap().is_empty());
}
#[tokio::test]
async fn persist_mutate_purge_chain_into_one_verifiable_history() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("k".into(), "v".into());
run_persist(
&IRPersistStep {
node_type: "persist",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "s".into(),
},
&mut ctx,
)
.await
.unwrap();
run_mutate(
&IRMutateStep {
node_type: "mutate",
fields: Vec::new(),
source_line: 0,
source_column: 0,
store_name: "s".into(),
where_expr: "k".into(),
},
&mut ctx,
)
.await
.unwrap();
run_purge(
&IRPurgeStep {
node_type: "purge",
source_line: 0,
source_column: 0,
store_name: "s".into(),
where_expr: "k".into(),
},
&mut ctx,
)
.await
.unwrap();
let chain = ctx.audit_chain.lock().unwrap();
assert_eq!(chain.len(), 3, "three mutations → three chained deltas");
assert_eq!(
chain.verify(),
crate::store::audit_chain::ChainVerdict::Intact
);
}
#[test]
fn sql_row_from_bindings_excludes_namespace_keys_and_sorts() {
let (mut ctx, _rx) = fresh_ctx();
ctx.let_bindings.insert("name".into(), "Alice".into());
ctx.let_bindings.insert("id".into(), "7".into());
ctx.let_bindings
.insert("__store_internal".into(), "bookkeeping".into());
let row = sql_row_from_bindings(&ctx);
assert_eq!(
row,
vec![
("id".to_string(), SqlValue::Text("7".to_string())),
("name".to_string(), SqlValue::Text("Alice".to_string())),
]
);
}
}