Skip to main content

jamjet_worker/
worker.rs

1use crate::executor::{ExecutionResult, NodeExecutor};
2use crate::heartbeat::spawn_heartbeat;
3use jamjet_agents::AgentRegistry;
4use jamjet_core::node::NodeKind;
5use jamjet_core::workflow::ExecutionId;
6use jamjet_ir::workflow::{NodeDef, WorkflowIr};
7use jamjet_policy::autonomy::{AutonomyContext, AutonomyDecision, AutonomyEnforcer};
8use jamjet_policy::engine::node_kind_tag;
9use jamjet_policy::{EvaluationContext, PolicyDecision, PolicyEvaluator};
10use jamjet_state::backend::{StateBackend, WorkItem};
11use jamjet_state::budget::BudgetState;
12use jamjet_state::event::EventKind;
13use jamjet_telemetry::{gen_ai_attrs, record_gen_ai_usage};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17use tracing::{info, instrument, warn};
18
19/// A JamJet worker process.
20///
21/// Workers pull work items from the queue, execute them, and report results.
22pub struct Worker {
23    pub worker_id: String,
24    backend: Arc<dyn StateBackend>,
25    /// Optional agent registry — required for autonomy enforcement.
26    agents: Option<Arc<dyn AgentRegistry>>,
27    queue_types: Vec<String>,
28    poll_interval: Duration,
29    executors: HashMap<String, Arc<dyn NodeExecutor>>,
30}
31
32impl Worker {
33    pub fn new(
34        worker_id: String,
35        backend: Arc<dyn StateBackend>,
36        queue_types: Vec<String>,
37    ) -> Self {
38        Self {
39            worker_id,
40            backend,
41            agents: None,
42            queue_types,
43            poll_interval: Duration::from_millis(500),
44            executors: HashMap::new(),
45        }
46    }
47
48    /// Attach an agent registry for autonomy enforcement.
49    pub fn with_agents(mut self, agents: Arc<dyn AgentRegistry>) -> Self {
50        self.agents = Some(agents);
51        self
52    }
53
54    pub fn register_executor(
55        mut self,
56        kind: impl Into<String>,
57        executor: Arc<dyn NodeExecutor>,
58    ) -> Self {
59        self.executors.insert(kind.into(), executor);
60        self
61    }
62
63    pub async fn run(&self) {
64        info!(
65            worker_id = %self.worker_id,
66            queues = ?self.queue_types,
67            "Worker started"
68        );
69        loop {
70            match self.poll_and_execute().await {
71                Ok(true) => {}
72                Ok(false) => tokio::time::sleep(self.poll_interval).await,
73                Err(e) => {
74                    warn!(worker_id = %self.worker_id, "Worker error: {e}");
75                    tokio::time::sleep(self.poll_interval).await;
76                }
77            }
78        }
79    }
80
81    #[instrument(skip(self), fields(worker_id = %self.worker_id))]
82    async fn poll_and_execute(&self) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
83        let queue_refs: Vec<&str> = self.queue_types.iter().map(|s| s.as_str()).collect();
84        let item = self
85            .backend
86            .claim_work_item(&self.worker_id, &queue_refs)
87            .await?;
88        let Some(item) = item else { return Ok(false) };
89        info!(
90            worker_id = %self.worker_id,
91            execution_id = %item.execution_id,
92            node_id = %item.node_id,
93            attempt = item.attempt,
94            "Claimed work item"
95        );
96        self.execute_item(item).await?;
97        Ok(true)
98    }
99
100    async fn execute_item(
101        &self,
102        item: WorkItem,
103    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
104        let start = std::time::Instant::now();
105        let execution_id = item.execution_id.clone();
106        let node_id = item.node_id.clone();
107        let tenant_id = item.tenant_id.clone();
108        let attempt = item.attempt;
109        let item_id = item.id;
110
111        let node_span = tracing::info_span!(
112            "jamjet.node",
113            "jamjet.execution.id" = %execution_id,
114            "jamjet.node.id" = %node_id,
115            "jamjet.attempt" = attempt,
116            "jamjet.worker.id" = %self.worker_id,
117            "jamjet.workflow.id" = tracing::field::Empty,
118            "jamjet.workflow.version" = tracing::field::Empty,
119            "jamjet.node.kind" = tracing::field::Empty,
120            "gen_ai.system" = tracing::field::Empty,
121            "gen_ai.request.model" = tracing::field::Empty,
122            "gen_ai.usage.input_tokens" = tracing::field::Empty,
123            "gen_ai.usage.output_tokens" = tracing::field::Empty,
124            "gen_ai.response.finish_reasons" = tracing::field::Empty,
125        );
126        let _span_guard = node_span.enter();
127
128        // Emit NodeStarted.
129        let seq = self.backend.latest_sequence(&execution_id).await? + 1;
130        self.backend
131            .append_event(jamjet_state::Event::new(
132                execution_id.clone(),
133                seq,
134                EventKind::NodeStarted {
135                    node_id: node_id.clone(),
136                    worker_id: self.worker_id.clone(),
137                    attempt,
138                },
139            ))
140            .await?;
141
142        // Spawn heartbeat.
143        let heartbeat = spawn_heartbeat(
144            Arc::clone(&self.backend),
145            item_id,
146            self.worker_id.clone(),
147            Duration::from_secs(15),
148        );
149
150        // Load workflow IR.
151        let (workflow_id, workflow_version) = parse_payload(&item.payload);
152        node_span.record(gen_ai_attrs::JAMJET_WORKFLOW_ID, workflow_id.as_str());
153        node_span.record(
154            gen_ai_attrs::JAMJET_WORKFLOW_VERSION,
155            workflow_version.as_str(),
156        );
157
158        let result: Result<ExecutionResult, String> = match self
159            .load_ir(&workflow_id, &workflow_version)
160            .await
161        {
162            Err(e) => Err(format!("failed to load IR: {e}")),
163            Ok(ir) => match ir.node(&node_id) {
164                None => Err(format!("node {node_id} not found in IR")),
165                Some(node_def) => {
166                    let kind = &node_def.kind;
167                    let kind_tag = node_kind_tag(kind);
168                    node_span.record(gen_ai_attrs::JAMJET_NODE_KIND, kind_tag.as_str());
169
170                    // Load budget state from latest snapshot.
171                    let budget = self.load_budget_state(&execution_id).await;
172
173                    // Policy check.
174                    if let Some(r) = self
175                        .check_policy(&execution_id, &node_id, &tenant_id, kind, node_def, &ir)
176                        .await
177                    {
178                        heartbeat.abort();
179                        return r;
180                    }
181
182                    // Autonomy check.
183                    if let Some(r) = self
184                        .check_autonomy(&execution_id, &node_id, kind, &budget)
185                        .await
186                    {
187                        heartbeat.abort();
188                        return r;
189                    }
190
191                    // Execute.
192                    let exec_result = match self.executors.get(&kind_tag) {
193                        Some(executor) if kind_tag == "agent_tool" => {
194                            let (tx, mut rx) = tokio::sync::mpsc::channel::<serde_json::Value>(64);
195                            let backend = Arc::clone(&self.backend);
196                            let eid = execution_id.clone();
197                            let receiver_handle = tokio::spawn(async move {
198                                while let Some(event) = rx.recv().await {
199                                    backend
200                                        .patch_append_array(&eid, "agent_tool_events", event)
201                                        .await
202                                        .map_err(|e| format!("patch_append_array failed: {e}"))?;
203                                }
204                                Ok::<(), String>(())
205                            });
206                            let result = executor.execute_streaming(&item, tx).await;
207                            match receiver_handle.await {
208                                Ok(Err(e)) => Err(e),
209                                Err(e) => Err(format!("Receiver task panicked: {e}")),
210                                Ok(Ok(())) => result,
211                            }
212                        }
213                        Some(executor) => executor.execute(&item).await,
214                        None => {
215                            info!(node_id = %node_id, kind = %kind_tag, "No executor; using stub");
216                            Ok(ExecutionResult {
217                                output: serde_json::json!({}),
218                                state_patch: serde_json::json!({}),
219                                duration_ms: start.elapsed().as_millis() as u64,
220                                gen_ai_system: None,
221                                gen_ai_model: None,
222                                input_tokens: None,
223                                output_tokens: None,
224                                finish_reason: None,
225                            })
226                        }
227                    };
228
229                    // Budget enforcement after execution.
230                    match exec_result {
231                        Ok(mut result) => {
232                            let mut budget = budget;
233                            if let Some(r) = self
234                                .check_budget_after_execution(
235                                    &execution_id,
236                                    &node_id,
237                                    &ir,
238                                    &mut budget,
239                                    result.input_tokens,
240                                    result.output_tokens,
241                                    None,
242                                    &mut result.state_patch,
243                                )
244                                .await
245                            {
246                                heartbeat.abort();
247                                return r;
248                            }
249                            Ok(result)
250                        }
251                        Err(e) => Err(e),
252                    }
253                }
254            },
255        };
256
257        let duration_ms = start.elapsed().as_millis() as u64;
258        heartbeat.abort();
259
260        match result {
261            Ok(exec_result) => {
262                if let (Some(system), Some(model), Some(input), Some(output)) = (
263                    &exec_result.gen_ai_system,
264                    &exec_result.gen_ai_model,
265                    exec_result.input_tokens,
266                    exec_result.output_tokens,
267                ) {
268                    record_gen_ai_usage(&node_span, system, model, input, output);
269                }
270                if let Some(finish_reason) = &exec_result.finish_reason {
271                    node_span.record(
272                        gen_ai_attrs::RESPONSE_FINISH_REASONS,
273                        finish_reason.as_str(),
274                    );
275                }
276
277                self.backend.complete_work_item(item_id).await?;
278
279                let seq = self.backend.latest_sequence(&execution_id).await? + 1;
280                self.backend
281                    .append_event(jamjet_state::Event::new(
282                        execution_id.clone(),
283                        seq,
284                        EventKind::NodeCompleted {
285                            node_id: node_id.clone(),
286                            output: exec_result.output,
287                            state_patch: exec_result.state_patch,
288                            duration_ms,
289                            gen_ai_system: exec_result.gen_ai_system,
290                            gen_ai_model: exec_result.gen_ai_model,
291                            input_tokens: exec_result.input_tokens,
292                            output_tokens: exec_result.output_tokens,
293                            finish_reason: exec_result.finish_reason,
294                            cost_usd: None,
295                            provenance: None,
296                        },
297                    ))
298                    .await?;
299
300                info!(execution_id = %execution_id, node_id = %node_id, duration_ms, "Node completed");
301            }
302            Err(error) => {
303                self.backend.fail_work_item(item_id, &error).await?;
304                let seq = self.backend.latest_sequence(&execution_id).await? + 1;
305                self.backend
306                    .append_event(jamjet_state::Event::new(
307                        execution_id.clone(),
308                        seq,
309                        EventKind::NodeFailed {
310                            node_id: node_id.clone(),
311                            error: error.clone(),
312                            attempt,
313                            retryable: false,
314                        },
315                    ))
316                    .await?;
317                warn!(execution_id = %execution_id, node_id = %node_id, attempt, %error, "Node failed");
318            }
319        }
320        Ok(())
321    }
322
323    // ── Policy check ──────────────────────────────────────────────────────────
324
325    async fn check_policy(
326        &self,
327        execution_id: &ExecutionId,
328        node_id: &str,
329        tenant_id: &str,
330        kind: &NodeKind,
331        node_def: &NodeDef,
332        ir: &WorkflowIr,
333    ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
334        let ctx = EvaluationContext::from_node_kind(node_id, kind);
335
336        // Load tenant policy (sits between global and workflow in the chain).
337        let tenant_policy_set = self
338            .backend
339            .get_tenant(&jamjet_state::TenantId::from(tenant_id))
340            .await
341            .ok()
342            .flatten()
343            .and_then(|t| t.policy_set());
344
345        // Build policy chain: tenant -> workflow -> node (least-specific to most-specific).
346        // The evaluator iterates in reverse, so node rules take priority.
347        let mut sets: Vec<&jamjet_ir::workflow::PolicySetIr> = Vec::new();
348        if let Some(ref tp) = tenant_policy_set {
349            sets.push(tp);
350        }
351        if let Some(p) = &ir.policy {
352            sets.push(p);
353        }
354        if let Some(p) = &node_def.policy {
355            sets.push(p);
356        }
357        if sets.is_empty() {
358            return None;
359        }
360
361        let decision = PolicyEvaluator.evaluate(&ctx, &sets);
362
363        match decision {
364            PolicyDecision::Allow => None,
365
366            PolicyDecision::Block { reason } => {
367                let policy_scope = self.identify_policy_scope(
368                    &ctx,
369                    tenant_policy_set.as_ref(),
370                    ir.policy.as_ref(),
371                    node_def.policy.as_ref(),
372                );
373                warn!(execution_id = %execution_id, node_id, %reason, %policy_scope, "Policy blocked node");
374                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
375                let _ = self
376                    .backend
377                    .append_event(jamjet_state::Event::new(
378                        execution_id.clone(),
379                        seq,
380                        EventKind::PolicyViolation {
381                            node_id: node_id.to_string(),
382                            rule: reason.clone(),
383                            decision: "blocked".to_string(),
384                            policy_scope,
385                        },
386                    ))
387                    .await;
388                Some(Err(format!("policy blocked: {reason}").into()))
389            }
390
391            PolicyDecision::RequireApproval { approver } => {
392                info!(execution_id = %execution_id, node_id, %approver, "Node requires approval");
393                let tool_name = ctx.tool_name.unwrap_or_else(|| node_id.to_string());
394                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
395                let _ = self
396                    .backend
397                    .append_event(jamjet_state::Event::new(
398                        execution_id.clone(),
399                        seq,
400                        EventKind::ToolApprovalRequired {
401                            node_id: node_id.to_string(),
402                            tool_name,
403                            approver,
404                            context: serde_json::json!({ "node_id": node_id }),
405                        },
406                    ))
407                    .await;
408                Some(Ok(()))
409            }
410        }
411    }
412
413    /// Determine which policy scope triggered a non-Allow decision.
414    ///
415    /// Checks each scope individually from most-specific (node) to least-specific
416    /// (tenant), returning the name of the first scope that produces a non-Allow
417    /// decision.
418    fn identify_policy_scope(
419        &self,
420        ctx: &EvaluationContext,
421        tenant_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
422        workflow_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
423        node_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
424    ) -> String {
425        // Check most-specific first (same order as evaluator's reverse iteration).
426        if let Some(p) = node_policy {
427            if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
428                return "node".to_string();
429            }
430        }
431        if let Some(p) = workflow_policy {
432            if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
433                return "workflow".to_string();
434            }
435        }
436        if let Some(p) = tenant_policy {
437            if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
438                return "tenant".to_string();
439            }
440        }
441        "unknown".to_string()
442    }
443
444    // ── Autonomy check ────────────────────────────────────────────────────────
445
446    async fn check_autonomy(
447        &self,
448        execution_id: &ExecutionId,
449        node_id: &str,
450        kind: &NodeKind,
451        budget: &BudgetState,
452    ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
453        let kind_tag = node_kind_tag(kind);
454        if kind_tag != "agent" {
455            return None;
456        }
457        let agents = self.agents.as_ref()?;
458
459        let agent_ref = serde_json::to_value(kind)
460            .ok()
461            .and_then(|v| {
462                v.get("agent_ref")
463                    .and_then(|a| a.as_str())
464                    .map(|s| s.to_string())
465            })
466            .unwrap_or_else(|| node_id.to_string());
467
468        let agent = match agents.get_by_uri(&agent_ref).await {
469            Ok(Some(a)) => a,
470            _ => return None,
471        };
472        let card = agent.card;
473
474        let ctx = AutonomyContext {
475            agent_ref: agent_ref.clone(),
476            autonomy_level: card.autonomy.clone(),
477            constraints: card.constraints.clone(),
478            current_iterations: budget.iteration_count,
479            current_tool_calls: budget.tool_call_count,
480            current_cost_usd: budget.total_cost_usd,
481            current_tokens: budget.total_tokens(),
482            consecutive_errors: budget.consecutive_error_count,
483            circuit_breaker_threshold: 3,
484        };
485
486        let decision = AutonomyEnforcer.check(&ctx, None);
487
488        match decision {
489            AutonomyDecision::Proceed => None,
490
491            AutonomyDecision::RequireToolApproval { tool_name } => {
492                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
493                let _ = self
494                    .backend
495                    .append_event(jamjet_state::Event::new(
496                        execution_id.clone(),
497                        seq,
498                        EventKind::ToolApprovalRequired {
499                            node_id: node_id.to_string(),
500                            tool_name,
501                            approver: "human".to_string(),
502                            context: serde_json::json!({ "agent_ref": agent_ref }),
503                        },
504                    ))
505                    .await;
506                Some(Ok(()))
507            }
508
509            AutonomyDecision::EscalateLimit {
510                limit_type,
511                limit_value,
512                actual_value,
513                escalation_target,
514            } => {
515                warn!(execution_id = %execution_id, node_id, %limit_type, "Autonomy limit reached");
516                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
517                let _ = self
518                    .backend
519                    .append_event(jamjet_state::Event::new(
520                        execution_id.clone(),
521                        seq,
522                        EventKind::AutonomyLimitReached {
523                            node_id: node_id.to_string(),
524                            agent_ref: agent_ref.clone(),
525                            limit_type: limit_type.clone(),
526                            limit_value,
527                            actual_value,
528                        },
529                    ))
530                    .await;
531                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
532                let _ = self
533                    .backend
534                    .append_event(jamjet_state::Event::new(
535                        execution_id.clone(),
536                        seq,
537                        EventKind::EscalationRequired {
538                            node_id: node_id.to_string(),
539                            agent_ref,
540                            reason: format!("autonomy_limit:{limit_type}"),
541                            escalation_target: escalation_target.as_str(),
542                        },
543                    ))
544                    .await;
545                Some(Err(format!("autonomy limit: {limit_type}").into()))
546            }
547
548            AutonomyDecision::TripCircuitBreaker {
549                consecutive_errors,
550                threshold,
551                escalation_target,
552            } => {
553                warn!(execution_id = %execution_id, node_id, consecutive_errors, "Circuit breaker tripped");
554                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
555                let _ = self
556                    .backend
557                    .append_event(jamjet_state::Event::new(
558                        execution_id.clone(),
559                        seq,
560                        EventKind::CircuitBreakerTripped {
561                            node_id: node_id.to_string(),
562                            agent_ref: agent_ref.clone(),
563                            consecutive_errors,
564                            threshold,
565                        },
566                    ))
567                    .await;
568                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
569                let _ = self
570                    .backend
571                    .append_event(jamjet_state::Event::new(
572                        execution_id.clone(),
573                        seq,
574                        EventKind::EscalationRequired {
575                            node_id: node_id.to_string(),
576                            agent_ref,
577                            reason: "circuit_breaker".to_string(),
578                            escalation_target: escalation_target.as_str(),
579                        },
580                    ))
581                    .await;
582                Some(Err("circuit breaker tripped".into()))
583            }
584        }
585    }
586
587    // ── Budget enforcement ────────────────────────────────────────────────────
588
589    async fn load_budget_state(&self, execution_id: &ExecutionId) -> BudgetState {
590        self.backend
591            .latest_snapshot(execution_id)
592            .await
593            .ok()
594            .flatten()
595            .map(|s| BudgetState::from_snapshot_state(&s.state))
596            .unwrap_or_default()
597    }
598
599    #[allow(clippy::too_many_arguments)]
600    async fn check_budget_after_execution(
601        &self,
602        execution_id: &ExecutionId,
603        node_id: &str,
604        ir: &WorkflowIr,
605        budget: &mut BudgetState,
606        input_tokens: Option<u64>,
607        output_tokens: Option<u64>,
608        cost_usd: Option<f64>,
609        state_patch: &mut serde_json::Value,
610    ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
611        budget.accumulate(input_tokens, output_tokens, cost_usd);
612        budget.patch_into_snapshot_state(state_patch);
613
614        if let Some(tb) = &ir.token_budget {
615            if let Some(limit) = tb.total_tokens {
616                let limit = limit as u64;
617                let current = budget.total_tokens();
618                if current > limit {
619                    return self
620                        .emit_token_budget_exceeded(
621                            execution_id,
622                            node_id,
623                            "total_tokens",
624                            limit,
625                            current,
626                        )
627                        .await;
628                }
629            }
630            if let Some(limit) = tb.input_tokens {
631                let limit = limit as u64;
632                if budget.total_input_tokens > limit {
633                    return self
634                        .emit_token_budget_exceeded(
635                            execution_id,
636                            node_id,
637                            "input_tokens",
638                            limit,
639                            budget.total_input_tokens,
640                        )
641                        .await;
642                }
643            }
644            if let Some(limit) = tb.output_tokens {
645                let limit = limit as u64;
646                if budget.total_output_tokens > limit {
647                    return self
648                        .emit_token_budget_exceeded(
649                            execution_id,
650                            node_id,
651                            "output_tokens",
652                            limit,
653                            budget.total_output_tokens,
654                        )
655                        .await;
656                }
657            }
658        }
659
660        if let Some(cost_limit) = ir.cost_budget_usd {
661            if budget.total_cost_usd > cost_limit {
662                warn!(
663                    execution_id = %execution_id,
664                    node_id,
665                    cost_limit,
666                    current = budget.total_cost_usd,
667                    "Cost budget exceeded"
668                );
669                let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
670                let _ = self
671                    .backend
672                    .append_event(jamjet_state::Event::new(
673                        execution_id.clone(),
674                        seq,
675                        EventKind::CostBudgetExceeded {
676                            node_id: node_id.to_string(),
677                            limit_usd: cost_limit,
678                            current_usd: budget.total_cost_usd,
679                        },
680                    ))
681                    .await;
682                return Some(Err(format!(
683                    "cost budget exceeded: ${:.4} > ${:.4}",
684                    budget.total_cost_usd, cost_limit
685                )
686                .into()));
687            }
688        }
689
690        None
691    }
692
693    async fn emit_token_budget_exceeded(
694        &self,
695        execution_id: &ExecutionId,
696        node_id: &str,
697        kind: &str,
698        limit: u64,
699        current: u64,
700    ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
701        warn!(execution_id = %execution_id, node_id, kind, limit, current, "Token budget exceeded");
702        let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
703        let _ = self
704            .backend
705            .append_event(jamjet_state::Event::new(
706                execution_id.clone(),
707                seq,
708                EventKind::TokenBudgetExceeded {
709                    node_id: node_id.to_string(),
710                    kind: kind.to_string(),
711                    limit,
712                    current,
713                },
714            ))
715            .await;
716        Some(Err(format!(
717            "token budget exceeded: {kind} {current} > {limit}"
718        )
719        .into()))
720    }
721
722    // ── IR loading ────────────────────────────────────────────────────────────
723
724    async fn load_ir(
725        &self,
726        workflow_id: &str,
727        workflow_version: &str,
728    ) -> Result<WorkflowIr, String> {
729        let def = self
730            .backend
731            .get_workflow(workflow_id, workflow_version)
732            .await
733            .map_err(|e| e.to_string())?
734            .ok_or_else(|| format!("workflow {workflow_id} v{workflow_version} not found"))?;
735        serde_json::from_value(def.ir).map_err(|e| e.to_string())
736    }
737}
738
739fn parse_payload(payload: &serde_json::Value) -> (String, String) {
740    let workflow_id = payload
741        .get("workflow_id")
742        .and_then(|v| v.as_str())
743        .unwrap_or("unknown")
744        .to_string();
745    let workflow_version = payload
746        .get("workflow_version")
747        .and_then(|v| v.as_str())
748        .unwrap_or("1.0.0")
749        .to_string();
750    (workflow_id, workflow_version)
751}