use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use async_trait::async_trait;
use parking_lot::Mutex;
use serde_json::{Map, Value};
use tokio::sync::{mpsc, Semaphore};
use tokio_util::sync::CancellationToken;
use crate::core::configs::op_config::{CompiledLink, LoopConfig, OpBound, OpConfig, OpType};
use crate::core::engine::{FrameEvent, FrameSender, Scheduler};
use crate::core::exceptions::{OpError, OperonError};
use crate::core::middleware::MiddlewareContext;
use crate::core::ops::edges::PARENT;
use crate::core::registry::OpRegistry;
use crate::core::states::cell::{default_context, ContextId};
use crate::core::states::ref_::{RefArg, RefConfig, RefTransform, StreamPolicy};
type SlotKey = (String, String, ContextId);
pub type SharedScratch = Arc<Mutex<HashMap<String, Value>>>;
#[derive(Debug)]
struct RuntimeState {
slots: HashMap<SlotKey, Value>,
scratch: SharedScratch,
}
impl RuntimeState {
fn with_capacity(cap: usize, scratch: SharedScratch) -> Self {
Self {
slots: HashMap::with_capacity(cap),
scratch,
}
}
fn set(&mut self, op: &str, var: &str, ctx: &ContextId, value: Value) {
self.slots
.insert((op.to_string(), var.to_string(), ctx.clone()), value);
}
fn get(&self, op: &str, var: &str, ctx: &ContextId) -> Option<&Value> {
let mut probe = ctx.clone();
loop {
if let Some(v) = self
.slots
.get(&(op.to_string(), var.to_string(), probe.clone()))
{
return Some(v);
}
if probe.is_empty() {
return None;
}
probe.pop();
}
}
fn scratch_get(&self, key: &str) -> Value {
self.scratch.lock().get(key).cloned().unwrap_or(Value::Null)
}
}
#[derive(Debug)]
enum SchedulerEvent {
Frame {
op: String,
ctx: ContextId,
result: Map<String, Value>,
},
Eof {
op: String,
ctx: ContextId,
},
Interrupt {
op: String,
ctx: ContextId,
ctx_to_cancel: ContextId,
reason: String,
},
}
fn parse_interrupt(value: &Value) -> Option<(ContextId, String)> {
let payload = value
.as_object()?
.get(crate::core::ops::events::INTERRUPT_KEY)?
.as_object()?;
let ctx_arr = payload.get("ctx_to_cancel")?.as_array()?;
let ctx: ContextId = ctx_arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
let reason = payload
.get("reason")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
Some((ctx, reason))
}
fn is_descendant_or_equal(child: &ContextId, parent: &ContextId) -> bool {
child.len() >= parent.len() && child[..parent.len()] == parent[..]
}
#[allow(clippy::too_many_arguments)]
async fn sweep_ctx(
ctx_prefix: &ContextId,
exclude: &(String, ContextId),
rx: &mut mpsc::Receiver<SchedulerEvent>,
tx: mpsc::Sender<SchedulerEvent>,
tasks_by_ctx: Arc<Mutex<HashMap<ContextId, HashMap<String, tokio::task::JoinHandle<()>>>>>,
inflight: &mut i32,
ready: &mut HashMap<ContextId, HashMap<String, i32>>,
seq_origins: &mut HashMap<(String, ContextId), (String, String)>,
seq_queues: &mut HashMap<(String, String), VecDeque<ContextId>>,
seq_active: &mut HashMap<(String, String), bool>,
collect_bufs: &mut HashMap<(String, String), Vec<(ContextId, Map<String, Value>)>>,
to_dispatch: &mut Vec<(String, ContextId)>,
) {
let mut keep: Vec<SchedulerEvent> = Vec::new();
while let Ok(ev) = rx.try_recv() {
let drop_it = is_descendant_or_equal(event_ctx(&ev), ctx_prefix);
if drop_it {
if matches!(&ev, SchedulerEvent::Eof { .. }) {
*inflight -= 1;
}
} else {
keep.push(ev);
}
}
for ev in keep {
let _ = tx.try_send(ev);
}
let mut to_await: Vec<(String, ContextId, tokio::task::JoinHandle<()>)> = Vec::new();
{
let mut t = tasks_by_ctx.lock();
let ctxs: Vec<ContextId> = t
.keys()
.filter(|c| is_descendant_or_equal(c, ctx_prefix))
.cloned()
.collect();
for ctx in ctxs {
let bucket = t.get_mut(&ctx).unwrap();
let op_names: Vec<String> = bucket.keys().cloned().collect();
for op_name in op_names {
if &(op_name.clone(), ctx.clone()) == exclude {
continue;
}
let Some(handle) = bucket.remove(&op_name) else {
continue;
};
if !handle.is_finished() {
handle.abort();
*inflight -= 1; }
to_await.push((op_name, ctx.clone(), handle));
}
if bucket.is_empty() {
t.remove(&ctx);
}
}
}
for (_op, _ctx, h) in to_await {
let _ = h.await;
}
let emitter_ctx = &exclude.1;
let to_clear: Vec<ContextId> = ready
.keys()
.filter(|c| is_descendant_or_equal(c, ctx_prefix) && *c != emitter_ctx)
.cloned()
.collect();
for c in to_clear {
ready.remove(&c);
}
let so_keys: Vec<(String, ContextId)> = seq_origins
.keys()
.filter(|(_op, c)| is_descendant_or_equal(c, ctx_prefix) && c != emitter_ctx)
.cloned()
.collect();
for key in so_keys {
let Some(seq_key) = seq_origins.remove(&key) else {
continue;
};
let Some(q) = seq_queues.get_mut(&seq_key) else {
seq_active.insert(seq_key, false);
continue;
};
let kept: VecDeque<ContextId> = q
.drain(..)
.filter(|c| !is_descendant_or_equal(c, ctx_prefix))
.collect();
if kept.is_empty() {
seq_queues.remove(&seq_key);
seq_active.insert(seq_key, false);
continue;
}
let mut kept = kept;
let next_ctx = kept.pop_front().expect("kept non-empty");
*q = kept;
seq_origins.insert((seq_key.1.clone(), next_ctx.clone()), seq_key.clone());
*inflight += 1;
to_dispatch.push((seq_key.1.clone(), next_ctx));
}
let cb_keys: Vec<(String, String)> = collect_bufs
.iter()
.filter(|(_k, buf)| {
buf.iter()
.any(|(c, _)| is_descendant_or_equal(c, ctx_prefix))
})
.map(|(k, _)| k.clone())
.collect();
for k in cb_keys {
collect_bufs.remove(&k);
}
}
fn event_ctx(ev: &SchedulerEvent) -> &ContextId {
match ev {
SchedulerEvent::Frame { ctx, .. } => ctx,
SchedulerEvent::Eof { ctx, .. } => ctx,
SchedulerEvent::Interrupt { ctx, .. } => ctx,
}
}
fn event_op(ev: &SchedulerEvent) -> &str {
match ev {
SchedulerEvent::Frame { op, .. } => op,
SchedulerEvent::Eof { op, .. } => op,
SchedulerEvent::Interrupt { op, .. } => op,
}
}
pub struct GraphScheduler {
graph: Arc<OpConfig>,
registry: Arc<dyn OpRegistry>,
out_vars: HashMap<String, Vec<(String, String)>>,
child_schedulers: Arc<HashMap<String, Arc<GraphScheduler>>>,
initial_ready_hm: HashMap<String, i32>,
slot_capacity: usize,
seq_edge_capacity: usize,
collect_edge_capacity: usize,
edge_policies: HashMap<(String, String), StreamPolicy>,
input_plans: Arc<HashMap<String, Vec<InputSlot>>>,
branches: Arc<HashMap<String, (Vec<(CompiledRef, String)>, Option<String>)>>,
}
impl std::fmt::Debug for GraphScheduler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GraphScheduler")
.field("graph_name", &self.graph.name)
.field("op_count", &self.graph.ops.len())
.finish()
}
}
impl GraphScheduler {
pub fn new(graph: Arc<OpConfig>, registry: Arc<dyn OpRegistry>) -> Result<Self, OperonError> {
if !graph.is_graph() {
return Err(OperonError::Config(format!(
"top-level op must be a graph, got {:?}",
graph.kind
)));
}
let out_vars = compute_out_vars(&graph);
let child_schedulers = Arc::new(build_child_schedulers(&graph, ®istry)?);
let initial_ready_hm: HashMap<String, i32> = graph
.initial_ready_count
.iter()
.map(|(k, v)| (k.clone(), *v))
.collect();
let slot_capacity: usize = graph
.ops
.values()
.map(|op| op.inputs.len() + op.outputs.len())
.sum::<usize>()
.max(16);
let mut edge_policies: HashMap<(String, String), StreamPolicy> = HashMap::new();
let mut seq_edge_capacity = 0usize;
let mut collect_edge_capacity = 0usize;
for (src_name, links) in &graph.compiled_adj {
for link in links {
let key = (src_name.clone(), link.dst.clone());
let policy = resolve_edge_policy(&graph, src_name, &link.dst);
if let Some(p) = policy {
edge_policies.insert(key.clone(), p);
if p.collect {
collect_edge_capacity += 1;
} else {
seq_edge_capacity += 1;
}
} else {
seq_edge_capacity += 1;
}
}
}
let graph_key_owned = if graph.full_name.is_empty() {
graph.name.clone()
} else {
graph.full_name.clone()
};
let input_plans = Arc::new(compile_input_plans(&graph, &graph_key_owned));
let branches = Arc::new(compile_branches(&graph, &graph_key_owned));
Ok(Self {
graph,
registry,
out_vars,
child_schedulers,
initial_ready_hm,
slot_capacity,
seq_edge_capacity: seq_edge_capacity.max(4),
collect_edge_capacity: collect_edge_capacity.max(4),
edge_policies,
input_plans,
branches,
})
}
fn graph_key(&self) -> &str {
if self.graph.full_name.is_empty() {
&self.graph.name
} else {
&self.graph.full_name
}
}
pub async fn run_collect(&self, inputs: Map<String, Value>) -> Result<Value, OperonError> {
use crate::core::engine::{FrameSender, TraceTap};
let tap: TraceTap = Arc::new(Mutex::new(Vec::new()));
let sender = FrameSender::tap_only(tap.clone());
let cancel = CancellationToken::new();
let ctx = MiddlewareContext::default();
let nested_scratch = Arc::new(Mutex::new(HashMap::new()));
Scheduler::run(self, inputs, ctx, sender, cancel, nested_scratch).await?;
let frames = std::mem::take(&mut *tap.lock());
let mut out: Map<String, Value> = Map::new();
for frame in frames {
for (k, v) in frame.data {
out.insert(k, v);
}
}
Ok(Value::Object(out))
}
}
fn build_child_schedulers(
graph: &OpConfig,
registry: &Arc<dyn OpRegistry>,
) -> Result<HashMap<String, Arc<GraphScheduler>>, OperonError> {
let mut out: HashMap<String, Arc<GraphScheduler>> = HashMap::new();
for (_, child) in &graph.ops {
if matches!(child.kind, OpType::Graph) {
let sub = Arc::new(GraphScheduler::new(
Arc::new(child.clone()),
registry.clone(),
)?);
out.insert(child.full_name.clone(), sub);
}
}
Ok(out)
}
#[async_trait]
impl Scheduler for GraphScheduler {
async fn run(
&self,
inputs: Map<String, Value>,
_context: MiddlewareContext,
sender: FrameSender,
cancel: CancellationToken,
scratch: SharedScratch,
) -> Result<(), OperonError> {
let state = Arc::new(Mutex::new(RuntimeState::with_capacity(
self.slot_capacity,
scratch,
)));
let root_ctx = default_context();
{
let mut s = state.lock();
for (k, param) in &self.graph.inputs {
if inputs.contains_key(k) {
continue;
}
if let Some(lit) = ¶m.literal {
s.set(self.graph_key(), k, &root_ctx, lit.clone());
} else if let Some(def) = ¶m.default {
s.set(self.graph_key(), k, &root_ctx, def.clone());
}
}
for (k, v) in &inputs {
s.set(self.graph_key(), k, &root_ctx, v.clone());
}
}
self.run_once(state.clone(), root_ctx.clone(), &sender, &cancel)
.await?;
if let Some(loop_cfg) = &self.graph.loop_config {
let max_iters = loop_cfg.max_iterations.unwrap_or(1000).max(1);
let mut current_ctx = root_ctx.clone();
let mut n_iters: u32 = 0;
while !self.loop_should_stop(loop_cfg, state.clone(), ¤t_ctx)? {
if n_iters >= max_iters - 1 {
break;
}
n_iters += 1;
let next_ctx = next_loop_ctx(¤t_ctx, n_iters);
let carry = self.collect_graph_outputs(state.clone(), ¤t_ctx);
{
let mut s = state.lock();
for (var, val) in carry {
s.set(self.graph_key(), &var, &next_ctx, val);
}
}
self.run_once(state.clone(), next_ctx.clone(), &sender.silent(), &cancel)
.await?;
current_ctx = next_ctx;
}
if n_iters > 0 {
let final_map = self.collect_graph_outputs(state.clone(), ¤t_ctx);
if !final_map.is_empty() {
sender
.send(FrameEvent {
op: self.graph.name.clone(),
context: current_ctx,
data: final_map,
})
.await?;
}
}
}
Ok(())
}
}
impl GraphScheduler {
async fn run_once(
&self,
state: Arc<Mutex<RuntimeState>>,
ctx: ContextId,
sender: &FrameSender,
cancel: &CancellationToken,
) -> Result<(), OperonError> {
let mut ready: HashMap<ContextId, HashMap<String, i32>> = HashMap::new();
ready.insert(ctx.clone(), self.initial_ready_hm.clone());
let mut inflight: i32 = 0;
let mut seq_queues: HashMap<(String, String), VecDeque<ContextId>> =
HashMap::with_capacity(self.seq_edge_capacity);
let mut seq_active: HashMap<(String, String), bool> =
HashMap::with_capacity(self.seq_edge_capacity);
let mut seq_origins: HashMap<(String, ContextId), (String, String)> = HashMap::new();
let mut collect_bufs: HashMap<(String, String), Vec<(ContextId, Map<String, Value>)>> =
HashMap::with_capacity(self.collect_edge_capacity);
let sem = Arc::new(Semaphore::new(
self.graph.max_stream_concurrent.max(1) as usize
));
let (tx, mut rx) = mpsc::channel::<SchedulerEvent>(8192);
let tasks_by_ctx: Arc<
Mutex<HashMap<ContextId, HashMap<String, tokio::task::JoinHandle<()>>>>,
> = Arc::new(Mutex::new(HashMap::new()));
for entry in &self.graph.entries {
inflight += 1;
self.spawn_op(
entry.clone(),
ctx.clone(),
state.clone(),
tx.clone(),
sem.clone(),
cancel.clone(),
tasks_by_ctx.clone(),
)
.await?;
}
while inflight > 0 {
tokio::select! {
_ = cancel.cancelled() => {
return Err(OperonError::Runtime("workflow cancelled".into()));
}
maybe_ev = rx.recv() => {
let ev = match maybe_ev {
Some(ev) => ev,
None => break,
};
match ev {
SchedulerEvent::Frame { op, ctx: frame_ctx, result } => {
self.on_frame(
&op,
&frame_ctx,
&result,
&mut ready,
&mut seq_queues,
&mut seq_active,
&mut seq_origins,
&mut collect_bufs,
&mut inflight,
state.clone(),
tx.clone(),
sem.clone(),
cancel,
sender,
tasks_by_ctx.clone(),
)
.await?;
}
SchedulerEvent::Eof { op, ctx: eof_ctx } => {
inflight -= 1;
self.on_eof(
&op,
&eof_ctx,
&mut seq_queues,
&mut seq_active,
&mut seq_origins,
&mut collect_bufs,
&mut inflight,
state.clone(),
tx.clone(),
sem.clone(),
cancel,
tasks_by_ctx.clone(),
)
.await?;
}
SchedulerEvent::Interrupt {
op: emit_op,
ctx: emit_ctx,
ctx_to_cancel,
reason,
} => {
let mut to_dispatch: Vec<(String, ContextId)> = Vec::new();
sweep_ctx(
&ctx_to_cancel,
&(emit_op.clone(), emit_ctx.clone()),
&mut rx,
tx.clone(),
tasks_by_ctx.clone(),
&mut inflight,
&mut ready,
&mut seq_origins,
&mut seq_queues,
&mut seq_active,
&mut collect_bufs,
&mut to_dispatch,
)
.await;
for (dst, next_ctx) in to_dispatch {
self.spawn_op(
dst,
next_ctx,
state.clone(),
tx.clone(),
sem.clone(),
cancel.clone(),
tasks_by_ctx.clone(),
)
.await?;
}
let mut payload = Map::new();
let mut irq = Map::new();
irq.insert(
"ctx_to_cancel".into(),
Value::Array(
ctx_to_cancel
.iter()
.map(|s| Value::String(s.clone()))
.collect(),
),
);
irq.insert("reason".into(), Value::String(reason.clone()));
irq.insert("op".into(), Value::String(emit_op.clone()));
payload.insert("__interrupt__".into(), Value::Object(irq));
sender
.send(crate::core::engine::FrameEvent {
op: "__interrupt__".to_string(),
context: emit_ctx,
data: payload,
})
.await;
}
}
}
}
}
Ok(())
}
async fn spawn_op(
&self,
op_name: String,
ctx: ContextId,
state: Arc<Mutex<RuntimeState>>,
tx: mpsc::Sender<SchedulerEvent>,
sem: Arc<Semaphore>,
cancel: CancellationToken,
tasks_by_ctx: Arc<Mutex<HashMap<ContextId, HashMap<String, tokio::task::JoinHandle<()>>>>>,
) -> Result<(), OperonError> {
let op_cfg = self
.graph
.ops
.get(&op_name)
.ok_or_else(|| OperonError::Config(format!("op '{}' not in graph", op_name)))?
.clone();
let registry = self.registry.clone();
let child_schedulers = self.child_schedulers.clone();
let plan_slice: &[InputSlot] = self
.input_plans
.get(&op_name)
.map(|v| v.as_slice())
.unwrap_or(&[]);
if matches!(op_cfg.bound, OpBound::Sync) {
if cancel.is_cancelled() {
return Ok(());
}
let frames: Vec<(ContextId, Map<String, Value>)> =
if matches!(op_cfg.kind, OpType::Branch) {
let single = match evaluate_branch(&op_cfg, &self.branches, &ctx, &state) {
Ok(target) => {
let mut m = Map::new();
m.insert("__branch_target__".into(), Value::from(target.clone()));
{
let mut s = state.lock();
s.set(
&op_cfg.full_name,
"__branch_target__",
&ctx,
Value::from(target),
);
}
m
}
Err(e) => {
{
let mut s = state.lock();
s.set(&op_cfg.full_name, "error", &ctx, Value::from(e.to_string()));
}
error_frame(&e)
}
};
vec![(ctx.clone(), single)]
} else {
match resolve_inputs(&op_cfg, plan_slice, &ctx, &state) {
Err(e) => vec![(ctx.clone(), error_frame(&e))],
Ok(inputs) => {
match execute_op(&op_cfg, ®istry, inputs, &self.child_schedulers)
.await
{
Ok(value) => {
if let Some((ctx_to_cancel, reason)) = parse_interrupt(&value) {
let irq = SchedulerEvent::Interrupt {
op: op_name.clone(),
ctx: ctx.clone(),
ctx_to_cancel,
reason,
};
if let Err(tokio::sync::mpsc::error::TrySendError::Full(
ev,
)) = tx.try_send(irq)
{
let _ = tx.send(ev).await;
}
let eof = SchedulerEvent::Eof {
op: op_name,
ctx: ctx.clone(),
};
if let Err(tokio::sync::mpsc::error::TrySendError::Full(
ev,
)) = tx.try_send(eof)
{
let _ = tx.send(ev).await;
}
return Ok(());
}
fan_out_value(&op_cfg, &ctx, value, &state)
}
Err(e) => {
{
let mut s = state.lock();
s.set(
&op_cfg.full_name,
"error",
&ctx,
Value::from(e.to_string()),
);
}
vec![(ctx.clone(), error_frame(&e))]
}
}
}
}
};
for (frame_ctx, frame_map) in frames {
let ev = SchedulerEvent::Frame {
op: op_name.clone(),
ctx: frame_ctx,
result: frame_map,
};
if let Err(tokio::sync::mpsc::error::TrySendError::Full(ev)) = tx.try_send(ev) {
let _ = tx.send(ev).await;
}
}
let eof = SchedulerEvent::Eof {
op: op_name,
ctx: ctx.clone(),
};
if let Err(tokio::sync::mpsc::error::TrySendError::Full(ev)) = tx.try_send(eof) {
let _ = tx.send(ev).await;
}
return Ok(());
}
let owned_plan: Vec<InputSlot> = plan_slice.to_vec();
let registry_op_name = op_name.clone();
let registry_ctx = ctx.clone();
let task = tokio::spawn(async move {
let _permit = match sem.acquire_owned().await {
Ok(p) => p,
Err(_) => return, };
if cancel.is_cancelled() {
return;
}
let inputs = match resolve_inputs(&op_cfg, &owned_plan, &ctx, &state) {
Ok(m) => m,
Err(e) => {
let _ = tx
.send(SchedulerEvent::Frame {
op: op_name.clone(),
ctx: ctx.clone(),
result: error_frame(&e),
})
.await;
let _ = tx.send(SchedulerEvent::Eof { op: op_name, ctx }).await;
return;
}
};
let exec_result = execute_op(&op_cfg, ®istry, inputs, &child_schedulers).await;
match exec_result {
Ok(value) => {
if let Some((ctx_to_cancel, reason)) = parse_interrupt(&value) {
let _ = tx
.send(SchedulerEvent::Interrupt {
op: op_name.clone(),
ctx: ctx.clone(),
ctx_to_cancel,
reason,
})
.await;
let _ = tx.send(SchedulerEvent::Eof { op: op_name, ctx }).await;
return;
}
let frames = fan_out_value(&op_cfg, &ctx, value, &state);
for (frame_ctx, frame_map) in frames {
let _ = tx
.send(SchedulerEvent::Frame {
op: op_name.clone(),
ctx: frame_ctx,
result: frame_map,
})
.await;
}
let _ = tx.send(SchedulerEvent::Eof { op: op_name, ctx }).await;
}
Err(e) => {
{
let mut s = state.lock();
s.set(&op_cfg.full_name, "error", &ctx, Value::from(e.to_string()));
}
let _ = tx
.send(SchedulerEvent::Frame {
op: op_name.clone(),
ctx: ctx.clone(),
result: error_frame(&e),
})
.await;
let _ = tx.send(SchedulerEvent::Eof { op: op_name, ctx }).await;
}
}
});
tasks_by_ctx
.lock()
.entry(registry_ctx)
.or_default()
.insert(registry_op_name, task);
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn on_frame(
&self,
op: &str,
ctx: &ContextId,
result: &Map<String, Value>,
ready: &mut HashMap<ContextId, HashMap<String, i32>>,
seq_queues: &mut HashMap<(String, String), VecDeque<ContextId>>,
seq_active: &mut HashMap<(String, String), bool>,
seq_origins: &mut HashMap<(String, ContextId), (String, String)>,
collect_bufs: &mut HashMap<(String, String), Vec<(ContextId, Map<String, Value>)>>,
inflight: &mut i32,
state: Arc<Mutex<RuntimeState>>,
tx: mpsc::Sender<SchedulerEvent>,
sem: Arc<Semaphore>,
cancel: &CancellationToken,
sender: &FrameSender,
tasks_by_ctx: Arc<Mutex<HashMap<ContextId, HashMap<String, tokio::task::JoinHandle<()>>>>>,
) -> Result<(), OperonError> {
if !ready.contains_key(ctx) {
ready.insert(ctx.clone(), self.initial_ready_hm.clone());
}
if let Some(mapped) = self.out_vars.get(op) {
let mut filtered = Map::new();
let graph_key = self.graph_key().to_string();
{
let mut s = state.lock();
for (src_var, dst_var) in mapped {
if let Some(v) = result.get(src_var) {
s.set(&graph_key, dst_var, ctx, v.clone());
filtered.insert(dst_var.clone(), v.clone());
}
}
}
if !filtered.is_empty() {
sender
.send(FrameEvent {
op: op.to_string(),
context: ctx.clone(),
data: filtered,
})
.await?;
}
}
let branch_target = result
.get("__branch_target__")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let Some(adj) = self.graph.compiled_adj.get(op) else {
return Ok(());
};
let adj = adj.clone();
for link in adj {
if let Some(target) = &branch_target {
if &link.dst != target {
continue;
}
}
self.route_edge_async(
op,
&link,
ctx,
result,
ready,
seq_queues,
seq_active,
seq_origins,
collect_bufs,
inflight,
state.clone(),
tx.clone(),
sem.clone(),
cancel,
tasks_by_ctx.clone(),
)
.await?;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn route_edge_async(
&self,
src: &str,
link: &CompiledLink,
ctx: &ContextId,
result: &Map<String, Value>,
ready: &mut HashMap<ContextId, HashMap<String, i32>>,
seq_queues: &mut HashMap<(String, String), VecDeque<ContextId>>,
seq_active: &mut HashMap<(String, String), bool>,
seq_origins: &mut HashMap<(String, ContextId), (String, String)>,
collect_bufs: &mut HashMap<(String, String), Vec<(ContextId, Map<String, Value>)>>,
inflight: &mut i32,
state: Arc<Mutex<RuntimeState>>,
tx: mpsc::Sender<SchedulerEvent>,
sem: Arc<Semaphore>,
cancel: &CancellationToken,
tasks_by_ctx: Arc<Mutex<HashMap<ContextId, HashMap<String, tokio::task::JoinHandle<()>>>>>,
) -> Result<(), OperonError> {
let rc = ready.get_mut(ctx).expect("ready entry seeded earlier");
let Some(count) = rc.get_mut(&link.dst) else {
return Ok(());
};
if link.soft && *count <= 0 {
return Ok(());
}
*count -= 1;
if *count != 0 {
return Ok(());
}
let policy = self
.edge_policies
.get(&(src.to_string(), link.dst.clone()))
.copied();
if let Some(p) = &policy {
if p.collect {
collect_bufs
.entry((src.to_string(), link.dst.clone()))
.or_default()
.push((ctx.clone(), result.clone()));
return Ok(());
}
if p.parallel {
*inflight += 1;
self.spawn_op(
link.dst.clone(),
ctx.clone(),
state,
tx,
sem,
cancel.clone(),
tasks_by_ctx,
)
.await?;
return Ok(());
}
}
let key = (src.to_string(), link.dst.clone());
if !*seq_active.entry(key.clone()).or_insert(false) {
*seq_active.get_mut(&key).unwrap() = true;
seq_origins.insert((link.dst.clone(), ctx.clone()), key.clone());
*inflight += 1;
self.spawn_op(
link.dst.clone(),
ctx.clone(),
state,
tx,
sem,
cancel.clone(),
tasks_by_ctx,
)
.await?;
} else {
seq_queues.entry(key).or_default().push_back(ctx.clone());
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn on_eof(
&self,
op: &str,
ctx: &ContextId,
seq_queues: &mut HashMap<(String, String), VecDeque<ContextId>>,
seq_active: &mut HashMap<(String, String), bool>,
seq_origins: &mut HashMap<(String, ContextId), (String, String)>,
collect_bufs: &mut HashMap<(String, String), Vec<(ContextId, Map<String, Value>)>>,
inflight: &mut i32,
state: Arc<Mutex<RuntimeState>>,
tx: mpsc::Sender<SchedulerEvent>,
sem: Arc<Semaphore>,
cancel: &CancellationToken,
tasks_by_ctx: Arc<Mutex<HashMap<ContextId, HashMap<String, tokio::task::JoinHandle<()>>>>>,
) -> Result<(), OperonError> {
let keys: Vec<(String, String)> = collect_bufs
.keys()
.filter(|(src, _dst)| src == op)
.cloned()
.collect();
for key in keys {
let buf = collect_bufs.remove(&key).unwrap_or_default();
if buf.is_empty() {
continue;
}
let mut merged: Map<String, Value> = Map::new();
for (_c, r) in &buf {
for (k, v) in r {
let entry = merged
.entry(k.clone())
.or_insert_with(|| Value::Array(Vec::new()));
if let Value::Array(arr) = entry {
arr.push(v.clone());
}
}
}
let mut collect_ctx = ctx.clone();
collect_ctx.push("__collect__".to_string());
let src_full = self
.graph
.ops
.get(&key.0)
.map(|o| o.full_name.clone())
.unwrap_or_else(|| key.0.clone());
{
let mut s = state.lock();
for (k, v) in &merged {
s.set(&src_full, k, &collect_ctx, v.clone());
}
}
*inflight += 1;
self.spawn_op(
key.1.clone(),
collect_ctx,
state.clone(),
tx.clone(),
sem.clone(),
cancel.clone(),
tasks_by_ctx.clone(),
)
.await?;
}
if let Some(key) = seq_origins.remove(&(op.to_string(), ctx.clone())) {
if let Some(q) = seq_queues.get_mut(&key) {
if let Some(next_ctx) = q.pop_front() {
seq_origins.insert((key.1.clone(), next_ctx.clone()), key.clone());
*inflight += 1;
self.spawn_op(
key.1.clone(),
next_ctx,
state,
tx,
sem,
cancel.clone(),
tasks_by_ctx,
)
.await?;
} else {
seq_active.insert(key, false);
}
} else {
seq_active.insert(key, false);
}
}
Ok(())
}
fn collect_graph_outputs(
&self,
state: Arc<Mutex<RuntimeState>>,
ctx: &ContextId,
) -> Map<String, Value> {
let mut out = Map::new();
let s = state.lock();
for var in self.graph.outputs.keys() {
if let Some(v) = s.get(self.graph_key(), var, ctx) {
out.insert(var.clone(), v.clone());
}
}
out
}
fn loop_should_stop(
&self,
loop_cfg: &LoopConfig,
state: Arc<Mutex<RuntimeState>>,
ctx: &ContextId,
) -> Result<bool, OperonError> {
let Some(expr) = loop_cfg.until.as_deref() else {
return Ok(false);
};
let outputs = self.collect_graph_outputs(state, ctx);
eval_until(expr, &outputs)
}
}
#[derive(Debug, Clone)]
struct CompiledRef {
source: String,
var: String,
chain: Vec<CompiledOp>,
}
#[derive(Debug, Clone)]
struct CompiledOp {
kind: TransformKind,
args: Vec<CompiledArg>,
}
#[derive(Debug, Clone)]
enum CompiledArg {
Lit(Value),
Ref(Box<CompiledRef>),
}
#[derive(Debug, Clone)]
enum TransformKind {
Eq,
Ne,
Lt,
Le,
Gt,
Ge,
Contains,
GetItem,
GetAttr,
Apply,
Call,
MatMul,
RMatMul,
And,
RAnd,
Or,
ROr,
Not,
Add,
RAdd,
Sub,
RSub,
Mul,
RMul,
TrueDiv,
RTrueDiv,
FloorDiv,
RFloorDiv,
Mod,
RMod,
Pow,
RPow,
Neg,
Pos,
Abs,
Unknown(String),
}
fn compile_ref(rc: &RefConfig, graph_key: &str) -> CompiledRef {
let source = if rc.source == PARENT {
graph_key.to_string()
} else {
rc.source.clone()
};
let chain = rc
.transforms
.iter()
.map(|t| compile_op(t, graph_key))
.collect();
CompiledRef {
source,
var: rc.var.clone(),
chain,
}
}
fn compile_op(t: &RefTransform, graph_key: &str) -> CompiledOp {
let kind = match t.name.as_str() {
"eq" => TransformKind::Eq,
"ne" => TransformKind::Ne,
"lt" => TransformKind::Lt,
"le" => TransformKind::Le,
"gt" => TransformKind::Gt,
"ge" => TransformKind::Ge,
"contains" => TransformKind::Contains,
"getitem" => TransformKind::GetItem,
"getattr" => TransformKind::GetAttr,
"apply" => TransformKind::Apply,
"call" => TransformKind::Call,
"matmul" => TransformKind::MatMul,
"rmatmul" => TransformKind::RMatMul,
"and_" => TransformKind::And,
"rand_" => TransformKind::RAnd,
"or_" => TransformKind::Or,
"ror_" => TransformKind::ROr,
"not_" => TransformKind::Not,
"add" => TransformKind::Add,
"radd" => TransformKind::RAdd,
"sub" => TransformKind::Sub,
"rsub" => TransformKind::RSub,
"mul" => TransformKind::Mul,
"rmul" => TransformKind::RMul,
"truediv" => TransformKind::TrueDiv,
"rtruediv" => TransformKind::RTrueDiv,
"floordiv" => TransformKind::FloorDiv,
"rfloordiv" => TransformKind::RFloorDiv,
"mod" => TransformKind::Mod,
"rmod" => TransformKind::RMod,
"pow" => TransformKind::Pow,
"rpow" => TransformKind::RPow,
"neg" => TransformKind::Neg,
"pos" => TransformKind::Pos,
"abs" => TransformKind::Abs,
other => TransformKind::Unknown(other.to_string()),
};
let args = t.args.iter().map(|a| compile_arg(a, graph_key)).collect();
CompiledOp { kind, args }
}
fn compile_arg(a: &RefArg, graph_key: &str) -> CompiledArg {
match a {
RefArg::Literal(v) => CompiledArg::Lit(v.clone()),
RefArg::NestedRef(r) => CompiledArg::Ref(Box::new(compile_ref(r, graph_key))),
}
}
fn eval_ref(
cref: &CompiledRef,
ctx: &ContextId,
state: &Mutex<RuntimeState>,
) -> Result<Value, OperonError> {
let base = {
let s = state.lock();
s.get(&cref.source, &cref.var, ctx)
.cloned()
.ok_or_else(|| {
OperonError::State(format!(
"ref resolution: no value for ({}, {}) at context {:?}",
cref.source, cref.var, ctx
))
})?
};
let mut current = base;
for cop in &cref.chain {
current = eval_op(current, cop, ctx, state)?;
}
Ok(current)
}
fn eval_arg(
a: &CompiledArg,
ctx: &ContextId,
state: &Mutex<RuntimeState>,
) -> Result<Value, OperonError> {
match a {
CompiledArg::Lit(v) => Ok(v.clone()),
CompiledArg::Ref(r) => eval_ref(r, ctx, state),
}
}
fn eval_op(
value: Value,
cop: &CompiledOp,
ctx: &ContextId,
state: &Mutex<RuntimeState>,
) -> Result<Value, OperonError> {
use TransformKind::*;
let arg0 = || -> Result<Value, OperonError> {
match cop.args.first() {
Some(a) => eval_arg(a, ctx, state),
None => Ok(Value::Null),
}
};
match &cop.kind {
Eq => Ok(Value::Bool(values_equal(&value, &arg0()?))),
Ne => Ok(Value::Bool(!values_equal(&value, &arg0()?))),
Lt => cmp_op(&value, &arg0()?, |o| o.is_lt()),
Le => cmp_op(&value, &arg0()?, |o| o.is_le()),
Gt => cmp_op(&value, &arg0()?, |o| o.is_gt()),
Ge => cmp_op(&value, &arg0()?, |o| o.is_ge()),
Contains => Ok(Value::Bool(value_contains(&value, &arg0()?))),
GetItem => Ok(value_getitem(&value, &arg0()?)),
GetAttr => value_getattr(&value, &arg0()?),
Apply => Err(OperonError::Runtime(
"ref transform 'apply' is not supported in the Rust runtime — \
Python callables cannot cross the JSON wire format. \
Replace `Ref.apply(...)` with `@op(rust='...')` that does the same logic."
.into(),
)),
Call => Err(OperonError::Runtime(
"ref transform 'call' is not supported in the Rust runtime — \
callable values cannot be JSON-serialized. \
Replace `ref(...)` with a dedicated `@op(rust='...')`."
.into(),
)),
MatMul => Err(OperonError::Runtime(
"ref transform 'matmul' is not supported in the Rust runtime — \
matrix multiplication requires ndarray-shaped values not present \
in the JSON `Value` model."
.into(),
)),
RMatMul => Err(OperonError::Runtime(
"ref transform 'rmatmul' is not supported in the Rust runtime — \
matrix multiplication requires ndarray-shaped values not present \
in the JSON `Value` model."
.into(),
)),
And => {
if !value_truthy(&value) {
Ok(value)
} else {
Ok(arg0()?)
}
}
RAnd => {
let lhs = arg0()?;
if !value_truthy(&lhs) {
Ok(lhs)
} else {
Ok(value)
}
}
Or => {
if value_truthy(&value) {
Ok(value)
} else {
Ok(arg0()?)
}
}
ROr => {
let lhs = arg0()?;
if value_truthy(&lhs) {
Ok(lhs)
} else {
Ok(value)
}
}
Not => Ok(Value::Bool(!value_truthy(&value))),
Add => arith(&value, &arg0()?, |l, r| l + r),
RAdd => arith(&arg0()?, &value, |l, r| l + r),
Sub => arith(&value, &arg0()?, |l, r| l - r),
RSub => arith(&arg0()?, &value, |l, r| l - r),
Mul => arith(&value, &arg0()?, |l, r| l * r),
RMul => arith(&arg0()?, &value, |l, r| l * r),
TrueDiv => arith(&value, &arg0()?, |l, r| l / r),
RTrueDiv => arith(&arg0()?, &value, |l, r| l / r),
FloorDiv => arith(&value, &arg0()?, |l, r| (l / r).floor()),
RFloorDiv => arith(&arg0()?, &value, |l, r| (l / r).floor()),
Mod => arith(&value, &arg0()?, |l, r| l.rem_euclid(r)),
RMod => arith(&arg0()?, &value, |l, r| l.rem_euclid(r)),
Pow => arith(&value, &arg0()?, |l, r| l.powf(r)),
RPow => arith(&arg0()?, &value, |l, r| l.powf(r)),
Neg => unary(&value, |v| -v),
Pos => unary(&value, |v| v),
Abs => unary(&value, |v| v.abs()),
Unknown(name) => Err(OperonError::Runtime(format!(
"ref transform '{}' not implemented in Rust runtime",
name
))),
}
}
#[derive(Debug, Clone)]
enum InputResolver {
Ref(CompiledRef),
Scratch(String),
Lit(Value),
Default(Value),
RequiredMissing,
Null,
}
#[derive(Debug, Clone)]
struct InputSlot {
var: String,
plan: InputResolver,
}
fn compile_input_plans(graph: &OpConfig, graph_key: &str) -> HashMap<String, Vec<InputSlot>> {
let mut out: HashMap<String, Vec<InputSlot>> = HashMap::with_capacity(graph.ops.len());
for (op_name, op_cfg) in &graph.ops {
let mut slots = Vec::with_capacity(op_cfg.inputs.len());
for (var, param) in &op_cfg.inputs {
let plan = if let Some(rc) = ¶m.ref_config {
InputResolver::Ref(compile_ref(rc, graph_key))
} else if let Some(sr) = ¶m.scratch {
InputResolver::Scratch(sr.key.clone())
} else if let Some(lit) = ¶m.literal {
InputResolver::Lit(lit.clone())
} else if let Some(def) = ¶m.default {
InputResolver::Default(def.clone())
} else if param.required {
InputResolver::RequiredMissing
} else {
InputResolver::Null
};
slots.push(InputSlot {
var: var.clone(),
plan,
});
}
out.insert(op_name.clone(), slots);
}
out
}
fn compile_branches(
graph: &OpConfig,
graph_key: &str,
) -> HashMap<String, (Vec<(CompiledRef, String)>, Option<String>)> {
let mut out = HashMap::new();
for (op_name, op_cfg) in &graph.ops {
if !matches!(op_cfg.kind, OpType::Branch) {
continue;
}
let cases = op_cfg
.cases
.iter()
.map(|c| (compile_ref(&c.condition, graph_key), c.target.clone()))
.collect();
out.insert(op_name.clone(), (cases, op_cfg.default.clone()));
}
out
}
fn compute_out_vars(graph: &OpConfig) -> HashMap<String, Vec<(String, String)>> {
let mut map: HashMap<String, Vec<(String, String)>> = HashMap::new();
let graph_key: &str = if graph.full_name.is_empty() {
&graph.name
} else {
&graph.full_name
};
for (op_name, op_cfg) in &graph.ops {
for (src_var, param) in &op_cfg.outputs {
let Some(ref_cfg) = ¶m.ref_config else {
continue;
};
let targets_parent = ref_cfg.source == PARENT
|| ref_cfg.source == graph_key
|| ref_cfg.source == graph.name;
if targets_parent {
map.entry(op_name.clone())
.or_default()
.push((src_var.clone(), ref_cfg.var.clone()));
}
}
}
map
}
fn resolve_edge_policy(graph: &OpConfig, src: &str, dst: &str) -> Option<StreamPolicy> {
let dst_op = graph.ops.get(dst)?;
let src_full = graph
.ops
.get(src)
.map(|o| o.full_name.as_str())
.unwrap_or(src);
for (_var, param) in &dst_op.inputs {
let Some(ref_cfg) = ¶m.ref_config else {
continue;
};
if ref_cfg.source == src || ref_cfg.source == src_full {
if let Some(p) = ref_cfg.stream_policy {
return Some(p);
}
}
}
None
}
fn resolve_inputs(
op_cfg: &OpConfig,
plan: &[InputSlot],
ctx: &ContextId,
state: &Mutex<RuntimeState>,
) -> Result<Map<String, Value>, OperonError> {
let mut resolved = Map::with_capacity(plan.len());
for slot in plan {
let value = match &slot.plan {
InputResolver::Ref(cref) => eval_ref(cref, ctx, state)?,
InputResolver::Scratch(key) => state.lock().scratch_get(key),
InputResolver::Lit(v) | InputResolver::Default(v) => v.clone(),
InputResolver::RequiredMissing => {
return Err(OperonError::Op(OpError::code_msg(
format!(
"op '{}': required input '{}' not provided",
op_cfg.full_name, slot.var
),
op_cfg.full_name.clone(),
)));
}
InputResolver::Null => Value::Null,
};
resolved.insert(slot.var.clone(), value);
}
Ok(resolved)
}
fn evaluate_branch(
op_cfg: &OpConfig,
branches: &HashMap<String, (Vec<(CompiledRef, String)>, Option<String>)>,
ctx: &ContextId,
state: &Mutex<RuntimeState>,
) -> Result<String, OperonError> {
let entry = branches.get(&op_cfg.name).ok_or_else(|| {
OperonError::Runtime(format!(
"branch '{}' missing pre-compiled case table",
op_cfg.full_name
))
})?;
for (cond, target) in &entry.0 {
let v = eval_ref(cond, ctx, state)?;
if value_truthy(&v) {
return Ok(target.clone());
}
}
if let Some(d) = &entry.1 {
return Ok(d.clone());
}
Err(OperonError::Runtime(format!(
"branch '{}' has no matching case and no default",
op_cfg.full_name
)))
}
fn values_equal(a: &Value, b: &Value) -> bool {
match (a, b) {
(Value::Number(an), Value::Number(bn)) => match (an.as_f64(), bn.as_f64()) {
(Some(af), Some(bf)) => af == bf,
_ => an == bn,
},
_ => a == b,
}
}
fn cmp_op(
a: &Value,
b: &Value,
pred: impl Fn(std::cmp::Ordering) -> bool,
) -> Result<Value, OperonError> {
use std::cmp::Ordering;
let ord = match (a, b) {
(Value::Number(an), Value::Number(bn)) => match (an.as_f64(), bn.as_f64()) {
(Some(af), Some(bf)) => af.partial_cmp(&bf).unwrap_or(Ordering::Equal),
_ => Ordering::Equal,
},
(Value::String(a), Value::String(b)) => a.cmp(b),
(Value::Bool(a), Value::Bool(b)) => a.cmp(b),
_ => {
return Err(OperonError::Runtime(format!(
"ref comparison: cannot compare {:?} and {:?}",
a, b
)));
}
};
Ok(Value::Bool(pred(ord)))
}
fn value_contains(haystack: &Value, needle: &Value) -> bool {
match haystack {
Value::Array(items) => items.iter().any(|x| values_equal(x, needle)),
Value::Object(map) => match needle {
Value::String(s) => map.contains_key(s),
_ => false,
},
Value::String(s) => match needle {
Value::String(sub) => s.contains(sub.as_str()),
_ => false,
},
_ => false,
}
}
fn value_getattr(value: &Value, key: &Value) -> Result<Value, OperonError> {
let name = key
.as_str()
.ok_or_else(|| OperonError::Runtime(format!("getattr expects str name, got {:?}", key)))?;
match value {
Value::Object(map) => Ok(map.get(name).cloned().unwrap_or(Value::Null)),
other => Err(OperonError::Runtime(format!(
"AttributeError: '{}' object has no attribute '{}'",
json_type_name(other),
name
))),
}
}
fn json_type_name(v: &Value) -> &'static str {
match v {
Value::Null => "NoneType",
Value::Bool(_) => "bool",
Value::Number(_) => "number",
Value::String(_) => "str",
Value::Array(_) => "list",
Value::Object(_) => "dict",
}
}
fn value_getitem(value: &Value, key: &Value) -> Value {
match (value, key) {
(Value::Object(map), Value::String(k)) => map.get(k).cloned().unwrap_or(Value::Null),
(Value::Array(items), Value::Number(n)) => {
if let Some(i) = n.as_i64() {
let idx = if i < 0 {
(items.len() as i64 + i) as usize
} else {
i as usize
};
items.get(idx).cloned().unwrap_or(Value::Null)
} else {
Value::Null
}
}
_ => Value::Null,
}
}
fn value_truthy(v: &Value) -> bool {
match v {
Value::Null => false,
Value::Bool(b) => *b,
Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(true),
Value::String(s) => !s.is_empty(),
Value::Array(a) => !a.is_empty(),
Value::Object(m) => !m.is_empty(),
}
}
fn arith(a: &Value, b: &Value, op: impl Fn(f64, f64) -> f64) -> Result<Value, OperonError> {
let (af, bf) = match (a.as_f64(), b.as_f64()) {
(Some(x), Some(y)) => (x, y),
_ => {
return Err(OperonError::Runtime(format!(
"ref arithmetic: non-numeric operands {:?} and {:?}",
a, b
)))
}
};
let result = op(af, bf);
serde_json::Number::from_f64(result)
.map(Value::Number)
.ok_or_else(|| {
OperonError::Runtime(format!("ref arithmetic produced non-finite: {}", result))
})
}
fn unary(a: &Value, op: impl Fn(f64) -> f64) -> Result<Value, OperonError> {
let af = a.as_f64().ok_or_else(|| {
OperonError::Runtime(format!("ref unary op: non-numeric operand {:?}", a))
})?;
let result = op(af);
serde_json::Number::from_f64(result)
.map(Value::Number)
.ok_or_else(|| {
OperonError::Runtime(format!("ref unary op produced non-finite: {}", result))
})
}
async fn execute_op(
op_cfg: &OpConfig,
registry: &Arc<dyn OpRegistry>,
inputs: Map<String, Value>,
child_schedulers: &Arc<HashMap<String, Arc<GraphScheduler>>>,
) -> Result<Value, OperonError> {
use crate::providers::ops::{execute_provider_op, is_provider_kind};
if is_provider_kind(op_cfg.kind) {
return execute_provider_op(op_cfg, inputs).await;
}
match op_cfg.kind {
OpType::Code | OpType::Lambda => {
let func_name = op_cfg.func_name.as_deref().ok_or_else(|| {
OperonError::Config(format!("code op '{}' missing func_name", op_cfg.full_name))
})?;
let func = registry
.lookup(func_name)
.or_else(|| {
func_name
.rsplit_once('.')
.and_then(|(_, short)| registry.lookup(short))
})
.ok_or_else(|| {
OperonError::Runtime(format!(
"no registered function named '{}' (register via OperonBuilder::op or the #[op] macro)",
func_name
))
})?;
func(inputs).await
}
OpType::Graph => {
let child = child_schedulers.get(&op_cfg.full_name).ok_or_else(|| {
OperonError::Runtime(format!(
"nested graph '{}' not in parent scheduler's child map; \
this should have been built at parent construction time",
op_cfg.full_name
))
})?;
child.run_collect(inputs).await
}
OpType::Parser => {
crate::core::ops::transform::parser_op::execute(inputs).await
}
other => Err(OperonError::Runtime(format!(
"op type {:?} not yet implemented for {}",
other, op_cfg.full_name
))),
}
}
fn error_frame(e: &OperonError) -> Map<String, Value> {
let mut m = Map::new();
m.insert("error".into(), Value::from(e.to_string()));
m
}
fn value_to_map(value: Value) -> Map<String, Value> {
match value {
Value::Object(m) => m,
other => {
let mut m = Map::new();
m.insert("result".into(), other);
m
}
}
}
fn fan_out_value(
op_cfg: &OpConfig,
parent_ctx: &ContextId,
value: Value,
state: &Arc<Mutex<RuntimeState>>,
) -> Vec<(ContextId, Map<String, Value>)> {
if op_cfg.is_generator {
let items = match value {
Value::Array(a) => a,
other => vec![other],
};
let mut out = Vec::with_capacity(items.len());
for (i, item) in items.into_iter().enumerate() {
let mut yield_ctx = parent_ctx.clone();
yield_ctx.push(format!("yield_{i}"));
let item_map = value_to_map(item);
{
let mut s = state.lock();
for (k, v) in &item_map {
s.set(&op_cfg.full_name, k, &yield_ctx, v.clone());
}
}
out.push((yield_ctx, item_map));
}
out
} else {
let map = value_to_map(value);
{
let mut s = state.lock();
for (k, v) in &map {
s.set(&op_cfg.full_name, k, parent_ctx, v.clone());
}
}
vec![(parent_ctx.clone(), map)]
}
}
fn next_loop_ctx(current: &ContextId, n_iters: u32) -> ContextId {
let label = format!("loop_{}", n_iters);
if n_iters == 1 {
let mut next = current.clone();
next.push(label);
next
} else {
let mut next = current.clone();
if !next.is_empty() {
next.pop();
}
next.push(label);
next
}
}
fn eval_until(expr: &str, outputs: &Map<String, Value>) -> Result<bool, OperonError> {
let expr = expr.trim();
for op in ["==", "!=", ">=", "<=", ">", "<"] {
if let Some(idx) = expr.find(op) {
let lhs = expr[..idx].trim();
let rhs = expr[idx + op.len()..].trim();
let lhs_val = lookup_operand(lhs, outputs);
let rhs_val = lookup_operand(rhs, outputs);
return compare(op, &lhs_val, &rhs_val);
}
}
let val = lookup_operand(expr, outputs);
Ok(is_truthy(&val))
}
fn lookup_operand(token: &str, outputs: &Map<String, Value>) -> Value {
let token = token.trim();
if let Ok(n) = token.parse::<i64>() {
return Value::from(n);
}
if let Ok(n) = token.parse::<f64>() {
return serde_json::Number::from_f64(n)
.map(Value::Number)
.unwrap_or(Value::Null);
}
match token {
"true" | "True" => return Value::Bool(true),
"false" | "False" => return Value::Bool(false),
"None" | "null" => return Value::Null,
_ => {}
}
if (token.starts_with('"') && token.ends_with('"'))
|| (token.starts_with('\'') && token.ends_with('\''))
{
let inner = &token[1..token.len() - 1];
return Value::from(inner);
}
outputs.get(token).cloned().unwrap_or(Value::Null)
}
fn compare(op: &str, lhs: &Value, rhs: &Value) -> Result<bool, OperonError> {
if let (Some(a), Some(b)) = (lhs.as_f64(), rhs.as_f64()) {
return Ok(match op {
"==" => a == b,
"!=" => a != b,
">=" => a >= b,
"<=" => a <= b,
">" => a > b,
"<" => a < b,
_ => unreachable!(),
});
}
let eq = lhs == rhs;
match op {
"==" => Ok(eq),
"!=" => Ok(!eq),
other => Err(OperonError::Runtime(format!(
"loop until: cannot compare non-numeric values with {}",
other
))),
}
}
fn is_truthy(v: &Value) -> bool {
match v {
Value::Null => false,
Value::Bool(b) => *b,
Value::Number(n) => n.as_f64().map(|x| x != 0.0).unwrap_or(false),
Value::String(s) => !s.is_empty(),
Value::Array(a) => !a.is_empty(),
Value::Object(o) => !o.is_empty(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn eval_until_numeric() {
let mut out = Map::new();
out.insert("count".into(), Value::from(5));
assert!(eval_until("count >= 5", &out).unwrap());
assert!(!eval_until("count >= 6", &out).unwrap());
assert!(eval_until("count == 5", &out).unwrap());
assert!(eval_until("count < 10", &out).unwrap());
}
#[test]
fn eval_until_bool_var() {
let mut out = Map::new();
out.insert("done".into(), Value::from(true));
assert!(eval_until("done", &out).unwrap());
out.insert("done".into(), Value::from(false));
assert!(!eval_until("done", &out).unwrap());
}
#[test]
fn next_loop_ctx_progression() {
let root = default_context();
let it1 = next_loop_ctx(&root, 1);
assert_eq!(it1.last().map(|s| s.as_str()), Some("loop_1"));
let it2 = next_loop_ctx(&it1, 2);
assert_eq!(it2.last().map(|s| s.as_str()), Some("loop_2"));
assert_eq!(it2.len(), it1.len());
}
#[test]
fn runtime_state_parent_walk_on_read() {
let scratch = Arc::new(Mutex::new(HashMap::new()));
let mut s = RuntimeState::with_capacity(0, scratch);
let root = default_context();
s.set("op", "v", &root, Value::from(1));
let mut deep = root.clone();
deep.push("[0]".into());
assert_eq!(s.get("op", "v", &deep), Some(&Value::from(1)));
}
}