Skip to main content

jamjet_worker/
worker.rs

1use crate::executor::{ExecutionResult, NodeExecutor};
2use crate::heartbeat::spawn_heartbeat;
3use jamjet_agents::AgentRegistry;
4use jamjet_core::node::NodeKind;
5use jamjet_core::workflow::ExecutionId;
6use jamjet_ir::workflow::{NodeDef, WorkflowIr};
7use jamjet_policy::autonomy::{AutonomyContext, AutonomyDecision, AutonomyEnforcer};
8use jamjet_policy::engine::node_kind_tag;
9use jamjet_policy::{EvaluationContext, PolicyDecision, PolicyEvaluator};
10use jamjet_state::backend::{StateBackend, WorkItem};
11use jamjet_state::budget::BudgetState;
12use jamjet_state::event::EventKind;
13use jamjet_telemetry::{gen_ai_attrs, record_gen_ai_usage};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17use tracing::{info, instrument, warn};
18
19/// A JamJet worker process.
20///
21/// Workers pull work items from the queue, execute them, and report results.
22pub struct Worker {
23    pub worker_id: String,
24    backend: Arc<dyn StateBackend>,
25    /// Optional agent registry — required for autonomy enforcement.
26    agents: Option<Arc<dyn AgentRegistry>>,
27    queue_types: Vec<String>,
28    poll_interval: Duration,
29    executors: HashMap<String, Arc<dyn NodeExecutor>>,
30}
31
32impl Worker {
33    pub fn new(
34        worker_id: String,
35        backend: Arc<dyn StateBackend>,
36        queue_types: Vec<String>,
37    ) -> Self {
38        Self {
39            worker_id,
40            backend,
41            agents: None,
42            queue_types,
43            poll_interval: Duration::from_millis(500),
44            executors: HashMap::new(),
45        }
46    }
47
48    /// Attach an agent registry for autonomy enforcement.
49    pub fn with_agents(mut self, agents: Arc<dyn AgentRegistry>) -> Self {
50        self.agents = Some(agents);
51        self
52    }
53
54    pub fn register_executor(
55        mut self,
56        kind: impl Into<String>,
57        executor: Arc<dyn NodeExecutor>,
58    ) -> Self {
59        self.executors.insert(kind.into(), executor);
60        self
61    }
62
63    pub async fn run(&self) {
64        info!(
65            worker_id = %self.worker_id,
66            queues = ?self.queue_types,
67            "Worker started"
68        );
69        loop {
70            match self.poll_and_execute().await {
71                Ok(true) => {}
72                Ok(false) => tokio::time::sleep(self.poll_interval).await,
73                Err(e) => {
74                    warn!(worker_id = %self.worker_id, "Worker error: {e}");
75                    tokio::time::sleep(self.poll_interval).await;
76                }
77            }
78        }
79    }
80
81    #[instrument(skip(self), fields(worker_id = %self.worker_id))]
82    async fn poll_and_execute(&self) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
83        let queue_refs: Vec<&str> = self.queue_types.iter().map(|s| s.as_str()).collect();
84        let item = self
85            .backend
86            .claim_work_item(&self.worker_id, &queue_refs)
87            .await?;
88        let Some(item) = item else { return Ok(false) };
89        info!(
90            worker_id = %self.worker_id,
91            execution_id = %item.execution_id,
92            node_id = %item.node_id,
93            attempt = item.attempt,
94            "Claimed work item"
95        );
96        self.execute_item(item).await?;
97        Ok(true)
98    }
99
100    async fn execute_item(
101        &self,
102        item: WorkItem,
103    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
104        let start = std::time::Instant::now();
105        let execution_id = item.execution_id.clone();
106        let node_id = item.node_id.clone();
107        let tenant_id = item.tenant_id.clone();
108        let attempt = item.attempt;
109        let item_id = item.id;
110
111        let node_span = tracing::info_span!(
112            "jamjet.node",
113            "jamjet.execution.id" = %execution_id,
114            "jamjet.node.id" = %node_id,
115            "jamjet.attempt" = attempt,
116            "jamjet.worker.id" = %self.worker_id,
117            "jamjet.workflow.id" = tracing::field::Empty,
118            "jamjet.workflow.version" = tracing::field::Empty,
119            "jamjet.node.kind" = tracing::field::Empty,
120            "gen_ai.system" = tracing::field::Empty,
121            "gen_ai.request.model" = tracing::field::Empty,
122            "gen_ai.usage.input_tokens" = tracing::field::Empty,
123            "gen_ai.usage.output_tokens" = tracing::field::Empty,
124            "gen_ai.response.finish_reasons" = tracing::field::Empty,
125        );
126        let _span_guard = node_span.enter();
127
128        // Emit NodeStarted.
129        let seq = self.backend.latest_sequence(&execution_id).await? + 1;
130        self.backend
131            .append_event(jamjet_state::Event::new(
132                execution_id.clone(),
133                seq,
134                EventKind::NodeStarted {
135                    node_id: node_id.clone(),
136                    worker_id: self.worker_id.clone(),
137                    attempt,
138                },
139            ))
140            .await?;
141
142        // Spawn heartbeat.
143        let heartbeat = spawn_heartbeat(
144            Arc::clone(&self.backend),
145            item_id,
146            self.worker_id.clone(),
147            Duration::from_secs(15),
148        );
149
150        // Load workflow IR.
151        let (workflow_id, workflow_version) = parse_payload(&item.payload);
152        node_span.record(gen_ai_attrs::JAMJET_WORKFLOW_ID, workflow_id.as_str());
153        node_span.record(
154            gen_ai_attrs::JAMJET_WORKFLOW_VERSION,
155            workflow_version.as_str(),
156        );
157
158        let result: Result<ExecutionResult, String> = match self
159            .load_ir(&workflow_id, &workflow_version)
160            .await
161        {
162            Err(e) => Err(format!("failed to load IR: {e}")),
163            Ok(ir) => match ir.node(&node_id) {
164                None => Err(format!("node {node_id} not found in IR")),
165                Some(node_def) => {
166                    let kind = &node_def.kind;
167                    let kind_tag = node_kind_tag(kind);
168                    node_span.record(gen_ai_attrs::JAMJET_NODE_KIND, kind_tag.as_str());
169
170                    // Load budget state from latest snapshot.
171                    let budget = self.load_budget_state(&execution_id).await;
172
173                    // Policy check.
174                    if let Some(r) = self
175                        .check_policy(&execution_id, &node_id, &tenant_id, kind, node_def, &ir)
176                        .await
177                    {
178                        heartbeat.abort();
179                        return r;
180                    }
181
182                    // Autonomy check.
183                    if let Some(r) = self
184                        .check_autonomy(&execution_id, &node_id, kind, &budget)
185                        .await
186                    {
187                        heartbeat.abort();
188                        return r;
189                    }
190
191                    // Execute.
192                    let exec_result = match self.executors.get(&kind_tag) {
193                        Some(executor) if kind_tag == "agent_tool" => {
194                            let (tx, mut rx) = tokio::sync::mpsc::channel::<serde_json::Value>(64);
195                            let backend = Arc::clone(&self.backend);
196                            let eid = execution_id.clone();
197                            let receiver_handle = tokio::spawn(async move {
198                                while let Some(event) = rx.recv().await {
199                                    backend
200                                        .patch_append_array(&eid, "agent_tool_events", event)
201                                        .await
202                                        .map_err(|e| format!("patch_append_array failed: {e}"))?;
203                                }
204                                Ok::<(), String>(())
205                            });
206                            let result = executor.execute_streaming(&item, tx).await;
207                            match receiver_handle.await {
208                                Ok(Err(e)) => Err(e),
209                                Err(e) => Err(format!("Receiver task panicked: {e}")),
210                                Ok(Ok(())) => result,
211                            }
212                        }
213                        Some(executor) => executor.execute(&item).await,
214                        None => {
215                            info!(node_id = %node_id, kind = %kind_tag, "No executor; using stub");
216                            Ok(ExecutionResult {
217                                output: serde_json::json!({}),
218                                state_patch: serde_json::json!({}),
219                                duration_ms: start.elapsed().as_millis() as u64,
220                                gen_ai_system: None,
221                                gen_ai_model: None,
222                                input_tokens: None,
223                                output_tokens: None,
224                                finish_reason: None,
225                            })
226                        }
227                    };
228
229                    // Budget enforcement after execution.
230                    match exec_result {
231                        Ok(mut result) => {
232                            let mut budget = budget;
233                            if let Some(r) = self
234                                .check_budget_after_execution(
235                                    &execution_id,
236                                    &node_id,
237                                    &ir,
238                                    &mut budget,
239                                    result.input_tokens,
240                                    result.output_tokens,
241                                    None,
242                                    &mut result.state_patch,
243                                )
244                                .await
245                            {
246                                heartbeat.abort();
247                                return r;
248                            }
249                            Ok(result)
250                        }
251                        Err(e) => Err(e),
252                    }
253                }
254            },
255        };
256
257        let duration_ms = start.elapsed().as_millis() as u64;
258        heartbeat.abort();
259
260        match result {
261            Ok(exec_result) => {
262                if let (Some(system), Some(model), Some(input), Some(output)) = (
263                    &exec_result.gen_ai_system,
264                    &exec_result.gen_ai_model,
265                    exec_result.input_tokens,
266                    exec_result.output_tokens,
267                ) {
268                    record_gen_ai_usage(&node_span, system, model, input, output);
269                }
270                if let Some(finish_reason) = &exec_result.finish_reason {
271                    node_span.record(
272                        gen_ai_attrs::RESPONSE_FINISH_REASONS,
273                        finish_reason.as_str(),
274                    );
275                }
276
277                self.backend.complete_work_item(item_id).await?;
278
279                let seq = self.backend.latest_sequence(&execution_id).await? + 1;
280                self.backend
281                    .append_event(jamjet_state::Event::new(
282                        execution_id.clone(),
283                        seq,
284                        EventKind::NodeCompleted {
285                            node_id: node_id.clone(),
286                            output: exec_result.output,
287                            state_patch: exec_result.state_patch,
288                            duration_ms,
289                            gen_ai_system: exec_result.gen_ai_system,
290                            gen_ai_model: exec_result.gen_ai_model,
291                            input_tokens: exec_result.input_tokens,
292                            output_tokens: exec_result.output_tokens,
293                            finish_reason: exec_result.finish_reason,
294                            cost_usd: None,
295                            provenance: None,
296                        },
297                    ))
298                    .await?;
299
300                info!(execution_id = %execution_id, node_id = %node_id, duration_ms, "Node completed");
301            }
302            Err(error) => {
303                self.backend.fail_work_item(item_id, &error).await?;
304                let seq = self.backend.latest_sequence(&execution_id).await? + 1;
305                self.backend
306                    .append_event(jamjet_state::Event::new(
307                        execution_id.clone(),
308                        seq,
309                        EventKind::NodeFailed {
310                            node_id: node_id.clone(),
311                            error: error.clone(),
312                            attempt,
313                            retryable: false,
314                        },
315                    ))
316                    .await?;
317                warn!(execution_id = %execution_id, node_id = %node_id, attempt, %error, "Node failed");
318            }
319        }
320        Ok(())
321    }
322
323    // ── Policy check ──────────────────────────────────────────────────────────
324
325    async fn check_policy(
326        &self,
327        execution_id: &ExecutionId,
328        node_id: &str,
329        tenant_id: &str,
330        kind: &NodeKind,
331        node_def: &NodeDef,
332        ir: &WorkflowIr,
333    ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
334        let ctx = EvaluationContext::from_node_kind(node_id, kind);
335
336        // Load tenant policy (sits between global and workflow in the chain).
337        let tenant_policy_set = self
338            .backend
339            .get_tenant(&jamjet_state::TenantId::from(tenant_id))
340            .await
341            .ok()
342            .flatten()
343            .and_then(|t| t.policy_set());
344
345        // Build policy chain: tenant -> workflow -> node (least-specific to most-specific).
346        // The evaluator iterates in reverse, so node rules take priority.
347        let mut sets: Vec<&jamjet_ir::workflow::PolicySetIr> = Vec::new();
348        if let Some(ref tp) = tenant_policy_set {
349            sets.push(tp);
350        }
351        if let Some(p) = &ir.policy {
352            sets.push(p);
353        }
354        if let Some(p) = &node_def.policy {
355            sets.push(p);
356        }
357        if sets.is_empty() {
358            return None;
359        }
360
361        let decision = PolicyEvaluator.evaluate(&ctx, &sets);
362
363        match decision {
364            PolicyDecision::Allow => None,
365
366            PolicyDecision::Block { reason } => {
367                let policy_scope = self.identify_policy_scope(
368                    &ctx,
369                    tenant_policy_set.as_ref(),
370                    ir.policy.as_ref(),
371                    node_def.policy.as_ref(),
372                );
373                warn!(execution_id = %execution_id, node_id, %reason, %policy_scope, "Policy blocked node");
374                // Best-effort audit. The block is enforced regardless of whether
375                // recording the violation succeeds: fail closed, never open.
376                if let Ok(latest) = self.backend.latest_sequence(execution_id).await {
377                    let _ = self
378                        .backend
379                        .append_event(jamjet_state::Event::new(
380                            execution_id.clone(),
381                            latest + 1,
382                            EventKind::PolicyViolation {
383                                node_id: node_id.to_string(),
384                                rule: reason.clone(),
385                                decision: "blocked".to_string(),
386                                policy_scope,
387                            },
388                        ))
389                        .await;
390                }
391                Some(Err(format!("policy blocked: {reason}").into()))
392            }
393
394            PolicyDecision::RequireApproval { approver } => {
395                info!(execution_id = %execution_id, node_id, %approver, "Node requires approval");
396                let tool_name = ctx.tool_name.unwrap_or_else(|| node_id.to_string());
397                // If we cannot record the approval requirement, fail closed rather
398                // than letting the node run unapproved.
399                let seq = match self.backend.latest_sequence(execution_id).await {
400                    Ok(s) => s + 1,
401                    Err(e) => {
402                        return Some(Err(format!(
403                            "approval required but could not be recorded: {e}"
404                        )
405                        .into()))
406                    }
407                };
408                let _ = self
409                    .backend
410                    .append_event(jamjet_state::Event::new(
411                        execution_id.clone(),
412                        seq,
413                        EventKind::ToolApprovalRequired {
414                            node_id: node_id.to_string(),
415                            tool_name,
416                            approver,
417                            context: serde_json::json!({ "node_id": node_id }),
418                        },
419                    ))
420                    .await;
421                Some(Ok(()))
422            }
423        }
424    }
425
426    /// Determine which policy scope triggered a non-Allow decision.
427    ///
428    /// Checks each scope individually from most-specific (node) to least-specific
429    /// (tenant), returning the name of the first scope that produces a non-Allow
430    /// decision.
431    fn identify_policy_scope(
432        &self,
433        ctx: &EvaluationContext,
434        tenant_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
435        workflow_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
436        node_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
437    ) -> String {
438        // Check most-specific first (same order as evaluator's reverse iteration).
439        if let Some(p) = node_policy {
440            if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
441                return "node".to_string();
442            }
443        }
444        if let Some(p) = workflow_policy {
445            if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
446                return "workflow".to_string();
447            }
448        }
449        if let Some(p) = tenant_policy {
450            if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
451                return "tenant".to_string();
452            }
453        }
454        "unknown".to_string()
455    }
456
457    // ── Autonomy check ────────────────────────────────────────────────────────
458
459    async fn check_autonomy(
460        &self,
461        execution_id: &ExecutionId,
462        node_id: &str,
463        kind: &NodeKind,
464        budget: &BudgetState,
465    ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
466        let kind_tag = node_kind_tag(kind);
467        if kind_tag != "agent" {
468            return None;
469        }
470        let agents = self.agents.as_ref()?;
471
472        let agent_ref = serde_json::to_value(kind)
473            .ok()
474            .and_then(|v| {
475                v.get("agent_ref")
476                    .and_then(|a| a.as_str())
477                    .map(|s| s.to_string())
478            })
479            .unwrap_or_else(|| node_id.to_string());
480
481        let agent = match agents.get_by_uri(&agent_ref).await {
482            Ok(Some(a)) => a,
483            _ => return None,
484        };
485        let card = agent.card;
486
487        let ctx = AutonomyContext {
488            agent_ref: agent_ref.clone(),
489            autonomy_level: card.autonomy.clone(),
490            constraints: card.constraints.clone(),
491            current_iterations: budget.iteration_count,
492            current_tool_calls: budget.tool_call_count,
493            current_cost_usd: budget.total_cost_usd,
494            current_tokens: budget.total_tokens(),
495            consecutive_errors: budget.consecutive_error_count,
496            circuit_breaker_threshold: 3,
497        };
498
499        let decision = AutonomyEnforcer.check(&ctx, None);
500
501        match decision {
502            AutonomyDecision::Proceed => None,
503
504            AutonomyDecision::RequireToolApproval { tool_name } => {
505                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
506                let _ = self
507                    .backend
508                    .append_event(jamjet_state::Event::new(
509                        execution_id.clone(),
510                        seq,
511                        EventKind::ToolApprovalRequired {
512                            node_id: node_id.to_string(),
513                            tool_name,
514                            approver: "human".to_string(),
515                            context: serde_json::json!({ "agent_ref": agent_ref }),
516                        },
517                    ))
518                    .await;
519                Some(Ok(()))
520            }
521
522            AutonomyDecision::EscalateLimit {
523                limit_type,
524                limit_value,
525                actual_value,
526                escalation_target,
527            } => {
528                warn!(execution_id = %execution_id, node_id, %limit_type, "Autonomy limit reached");
529                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
530                let _ = self
531                    .backend
532                    .append_event(jamjet_state::Event::new(
533                        execution_id.clone(),
534                        seq,
535                        EventKind::AutonomyLimitReached {
536                            node_id: node_id.to_string(),
537                            agent_ref: agent_ref.clone(),
538                            limit_type: limit_type.clone(),
539                            limit_value,
540                            actual_value,
541                        },
542                    ))
543                    .await;
544                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
545                let _ = self
546                    .backend
547                    .append_event(jamjet_state::Event::new(
548                        execution_id.clone(),
549                        seq,
550                        EventKind::EscalationRequired {
551                            node_id: node_id.to_string(),
552                            agent_ref,
553                            reason: format!("autonomy_limit:{limit_type}"),
554                            escalation_target: escalation_target.as_str(),
555                        },
556                    ))
557                    .await;
558                Some(Err(format!("autonomy limit: {limit_type}").into()))
559            }
560
561            AutonomyDecision::TripCircuitBreaker {
562                consecutive_errors,
563                threshold,
564                escalation_target,
565            } => {
566                warn!(execution_id = %execution_id, node_id, consecutive_errors, "Circuit breaker tripped");
567                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
568                let _ = self
569                    .backend
570                    .append_event(jamjet_state::Event::new(
571                        execution_id.clone(),
572                        seq,
573                        EventKind::CircuitBreakerTripped {
574                            node_id: node_id.to_string(),
575                            agent_ref: agent_ref.clone(),
576                            consecutive_errors,
577                            threshold,
578                        },
579                    ))
580                    .await;
581                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
582                let _ = self
583                    .backend
584                    .append_event(jamjet_state::Event::new(
585                        execution_id.clone(),
586                        seq,
587                        EventKind::EscalationRequired {
588                            node_id: node_id.to_string(),
589                            agent_ref,
590                            reason: "circuit_breaker".to_string(),
591                            escalation_target: escalation_target.as_str(),
592                        },
593                    ))
594                    .await;
595                Some(Err("circuit breaker tripped".into()))
596            }
597        }
598    }
599
600    // ── Budget enforcement ────────────────────────────────────────────────────
601
602    async fn load_budget_state(&self, execution_id: &ExecutionId) -> BudgetState {
603        self.backend
604            .latest_snapshot(execution_id)
605            .await
606            .ok()
607            .flatten()
608            .map(|s| BudgetState::from_snapshot_state(&s.state))
609            .unwrap_or_default()
610    }
611
612    #[allow(clippy::too_many_arguments)]
613    async fn check_budget_after_execution(
614        &self,
615        execution_id: &ExecutionId,
616        node_id: &str,
617        ir: &WorkflowIr,
618        budget: &mut BudgetState,
619        input_tokens: Option<u64>,
620        output_tokens: Option<u64>,
621        cost_usd: Option<f64>,
622        state_patch: &mut serde_json::Value,
623    ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
624        budget.accumulate(input_tokens, output_tokens, cost_usd);
625        budget.patch_into_snapshot_state(state_patch);
626
627        if let Some(tb) = &ir.token_budget {
628            if let Some(limit) = tb.total_tokens {
629                let limit = limit as u64;
630                let current = budget.total_tokens();
631                if current > limit {
632                    return self
633                        .emit_token_budget_exceeded(
634                            execution_id,
635                            node_id,
636                            "total_tokens",
637                            limit,
638                            current,
639                        )
640                        .await;
641                }
642            }
643            if let Some(limit) = tb.input_tokens {
644                let limit = limit as u64;
645                if budget.total_input_tokens > limit {
646                    return self
647                        .emit_token_budget_exceeded(
648                            execution_id,
649                            node_id,
650                            "input_tokens",
651                            limit,
652                            budget.total_input_tokens,
653                        )
654                        .await;
655                }
656            }
657            if let Some(limit) = tb.output_tokens {
658                let limit = limit as u64;
659                if budget.total_output_tokens > limit {
660                    return self
661                        .emit_token_budget_exceeded(
662                            execution_id,
663                            node_id,
664                            "output_tokens",
665                            limit,
666                            budget.total_output_tokens,
667                        )
668                        .await;
669                }
670            }
671        }
672
673        if let Some(cost_limit) = ir.cost_budget_usd {
674            if budget.total_cost_usd > cost_limit {
675                warn!(
676                    execution_id = %execution_id,
677                    node_id,
678                    cost_limit,
679                    current = budget.total_cost_usd,
680                    "Cost budget exceeded"
681                );
682                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
683                let _ = self
684                    .backend
685                    .append_event(jamjet_state::Event::new(
686                        execution_id.clone(),
687                        seq,
688                        EventKind::CostBudgetExceeded {
689                            node_id: node_id.to_string(),
690                            limit_usd: cost_limit,
691                            current_usd: budget.total_cost_usd,
692                        },
693                    ))
694                    .await;
695                return Some(Err(format!(
696                    "cost budget exceeded: ${:.4} > ${:.4}",
697                    budget.total_cost_usd, cost_limit
698                )
699                .into()));
700            }
701        }
702
703        None
704    }
705
706    async fn emit_token_budget_exceeded(
707        &self,
708        execution_id: &ExecutionId,
709        node_id: &str,
710        kind: &str,
711        limit: u64,
712        current: u64,
713    ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
714        warn!(execution_id = %execution_id, node_id, kind, limit, current, "Token budget exceeded");
715        let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
716        let _ = self
717            .backend
718            .append_event(jamjet_state::Event::new(
719                execution_id.clone(),
720                seq,
721                EventKind::TokenBudgetExceeded {
722                    node_id: node_id.to_string(),
723                    kind: kind.to_string(),
724                    limit,
725                    current,
726                },
727            ))
728            .await;
729        Some(Err(format!(
730            "token budget exceeded: {kind} {current} > {limit}"
731        )
732        .into()))
733    }
734
735    // ── IR loading ────────────────────────────────────────────────────────────
736
737    async fn load_ir(
738        &self,
739        workflow_id: &str,
740        workflow_version: &str,
741    ) -> Result<WorkflowIr, String> {
742        let def = self
743            .backend
744            .get_workflow(workflow_id, workflow_version)
745            .await
746            .map_err(|e| e.to_string())?
747            .ok_or_else(|| format!("workflow {workflow_id} v{workflow_version} not found"))?;
748        serde_json::from_value(def.ir).map_err(|e| e.to_string())
749    }
750}
751
752fn parse_payload(payload: &serde_json::Value) -> (String, String) {
753    let workflow_id = payload
754        .get("workflow_id")
755        .and_then(|v| v.as_str())
756        .unwrap_or("unknown")
757        .to_string();
758    let workflow_version = payload
759        .get("workflow_version")
760        .and_then(|v| v.as_str())
761        .unwrap_or("1.0.0")
762        .to_string();
763    (workflow_id, workflow_version)
764}