1use crate::executor::{ExecutionResult, NodeExecutor};
2use crate::heartbeat::spawn_heartbeat;
3use jamjet_agents::AgentRegistry;
4use jamjet_core::node::NodeKind;
5use jamjet_core::workflow::ExecutionId;
6use jamjet_ir::workflow::{NodeDef, WorkflowIr};
7use jamjet_policy::autonomy::{AutonomyContext, AutonomyDecision, AutonomyEnforcer};
8use jamjet_policy::engine::node_kind_tag;
9use jamjet_policy::{EvaluationContext, PolicyDecision, PolicyEvaluator};
10use jamjet_state::backend::{StateBackend, WorkItem};
11use jamjet_state::budget::BudgetState;
12use jamjet_state::event::EventKind;
13use jamjet_telemetry::{gen_ai_attrs, record_gen_ai_usage};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17use tracing::{info, instrument, warn};
18
19pub struct Worker {
23 pub worker_id: String,
24 backend: Arc<dyn StateBackend>,
25 agents: Option<Arc<dyn AgentRegistry>>,
27 queue_types: Vec<String>,
28 poll_interval: Duration,
29 executors: HashMap<String, Arc<dyn NodeExecutor>>,
30}
31
32impl Worker {
33 pub fn new(
34 worker_id: String,
35 backend: Arc<dyn StateBackend>,
36 queue_types: Vec<String>,
37 ) -> Self {
38 Self {
39 worker_id,
40 backend,
41 agents: None,
42 queue_types,
43 poll_interval: Duration::from_millis(500),
44 executors: HashMap::new(),
45 }
46 }
47
48 pub fn with_agents(mut self, agents: Arc<dyn AgentRegistry>) -> Self {
50 self.agents = Some(agents);
51 self
52 }
53
54 pub fn register_executor(
55 mut self,
56 kind: impl Into<String>,
57 executor: Arc<dyn NodeExecutor>,
58 ) -> Self {
59 self.executors.insert(kind.into(), executor);
60 self
61 }
62
63 pub async fn run(&self) {
64 info!(
65 worker_id = %self.worker_id,
66 queues = ?self.queue_types,
67 "Worker started"
68 );
69 loop {
70 match self.poll_and_execute().await {
71 Ok(true) => {}
72 Ok(false) => tokio::time::sleep(self.poll_interval).await,
73 Err(e) => {
74 warn!(worker_id = %self.worker_id, "Worker error: {e}");
75 tokio::time::sleep(self.poll_interval).await;
76 }
77 }
78 }
79 }
80
81 #[instrument(skip(self), fields(worker_id = %self.worker_id))]
82 async fn poll_and_execute(&self) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
83 let queue_refs: Vec<&str> = self.queue_types.iter().map(|s| s.as_str()).collect();
84 let item = self
85 .backend
86 .claim_work_item(&self.worker_id, &queue_refs)
87 .await?;
88 let Some(item) = item else { return Ok(false) };
89 info!(
90 worker_id = %self.worker_id,
91 execution_id = %item.execution_id,
92 node_id = %item.node_id,
93 attempt = item.attempt,
94 "Claimed work item"
95 );
96 self.execute_item(item).await?;
97 Ok(true)
98 }
99
100 async fn execute_item(
101 &self,
102 item: WorkItem,
103 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
104 let start = std::time::Instant::now();
105 let execution_id = item.execution_id.clone();
106 let node_id = item.node_id.clone();
107 let tenant_id = item.tenant_id.clone();
108 let attempt = item.attempt;
109 let item_id = item.id;
110
111 let node_span = tracing::info_span!(
112 "jamjet.node",
113 "jamjet.execution.id" = %execution_id,
114 "jamjet.node.id" = %node_id,
115 "jamjet.attempt" = attempt,
116 "jamjet.worker.id" = %self.worker_id,
117 "jamjet.workflow.id" = tracing::field::Empty,
118 "jamjet.workflow.version" = tracing::field::Empty,
119 "jamjet.node.kind" = tracing::field::Empty,
120 "gen_ai.system" = tracing::field::Empty,
121 "gen_ai.request.model" = tracing::field::Empty,
122 "gen_ai.usage.input_tokens" = tracing::field::Empty,
123 "gen_ai.usage.output_tokens" = tracing::field::Empty,
124 "gen_ai.response.finish_reasons" = tracing::field::Empty,
125 );
126 let _span_guard = node_span.enter();
127
128 let seq = self.backend.latest_sequence(&execution_id).await? + 1;
130 self.backend
131 .append_event(jamjet_state::Event::new(
132 execution_id.clone(),
133 seq,
134 EventKind::NodeStarted {
135 node_id: node_id.clone(),
136 worker_id: self.worker_id.clone(),
137 attempt,
138 },
139 ))
140 .await?;
141
142 let heartbeat = spawn_heartbeat(
144 Arc::clone(&self.backend),
145 item_id,
146 self.worker_id.clone(),
147 Duration::from_secs(15),
148 );
149
150 let (workflow_id, workflow_version) = parse_payload(&item.payload);
152 node_span.record(gen_ai_attrs::JAMJET_WORKFLOW_ID, workflow_id.as_str());
153 node_span.record(
154 gen_ai_attrs::JAMJET_WORKFLOW_VERSION,
155 workflow_version.as_str(),
156 );
157
158 let result: Result<ExecutionResult, String> = match self
159 .load_ir(&workflow_id, &workflow_version)
160 .await
161 {
162 Err(e) => Err(format!("failed to load IR: {e}")),
163 Ok(ir) => match ir.node(&node_id) {
164 None => Err(format!("node {node_id} not found in IR")),
165 Some(node_def) => {
166 let kind = &node_def.kind;
167 let kind_tag = node_kind_tag(kind);
168 node_span.record(gen_ai_attrs::JAMJET_NODE_KIND, kind_tag.as_str());
169
170 let budget = self.load_budget_state(&execution_id).await;
172
173 if let Some(r) = self
175 .check_policy(&execution_id, &node_id, &tenant_id, kind, node_def, &ir)
176 .await
177 {
178 heartbeat.abort();
179 return r;
180 }
181
182 if let Some(r) = self
184 .check_autonomy(&execution_id, &node_id, kind, &budget)
185 .await
186 {
187 heartbeat.abort();
188 return r;
189 }
190
191 let exec_result = match self.executors.get(&kind_tag) {
193 Some(executor) if kind_tag == "agent_tool" => {
194 let (tx, mut rx) = tokio::sync::mpsc::channel::<serde_json::Value>(64);
195 let backend = Arc::clone(&self.backend);
196 let eid = execution_id.clone();
197 let receiver_handle = tokio::spawn(async move {
198 while let Some(event) = rx.recv().await {
199 backend
200 .patch_append_array(&eid, "agent_tool_events", event)
201 .await
202 .map_err(|e| format!("patch_append_array failed: {e}"))?;
203 }
204 Ok::<(), String>(())
205 });
206 let result = executor.execute_streaming(&item, tx).await;
207 match receiver_handle.await {
208 Ok(Err(e)) => Err(e),
209 Err(e) => Err(format!("Receiver task panicked: {e}")),
210 Ok(Ok(())) => result,
211 }
212 }
213 Some(executor) => executor.execute(&item).await,
214 None => {
215 info!(node_id = %node_id, kind = %kind_tag, "No executor; using stub");
216 Ok(ExecutionResult {
217 output: serde_json::json!({}),
218 state_patch: serde_json::json!({}),
219 duration_ms: start.elapsed().as_millis() as u64,
220 gen_ai_system: None,
221 gen_ai_model: None,
222 input_tokens: None,
223 output_tokens: None,
224 finish_reason: None,
225 })
226 }
227 };
228
229 match exec_result {
231 Ok(mut result) => {
232 let mut budget = budget;
233 if let Some(r) = self
234 .check_budget_after_execution(
235 &execution_id,
236 &node_id,
237 &ir,
238 &mut budget,
239 result.input_tokens,
240 result.output_tokens,
241 None,
242 &mut result.state_patch,
243 )
244 .await
245 {
246 heartbeat.abort();
247 return r;
248 }
249 Ok(result)
250 }
251 Err(e) => Err(e),
252 }
253 }
254 },
255 };
256
257 let duration_ms = start.elapsed().as_millis() as u64;
258 heartbeat.abort();
259
260 match result {
261 Ok(exec_result) => {
262 if let (Some(system), Some(model), Some(input), Some(output)) = (
263 &exec_result.gen_ai_system,
264 &exec_result.gen_ai_model,
265 exec_result.input_tokens,
266 exec_result.output_tokens,
267 ) {
268 record_gen_ai_usage(&node_span, system, model, input, output);
269 }
270 if let Some(finish_reason) = &exec_result.finish_reason {
271 node_span.record(
272 gen_ai_attrs::RESPONSE_FINISH_REASONS,
273 finish_reason.as_str(),
274 );
275 }
276
277 self.backend.complete_work_item(item_id).await?;
278
279 let seq = self.backend.latest_sequence(&execution_id).await? + 1;
280 self.backend
281 .append_event(jamjet_state::Event::new(
282 execution_id.clone(),
283 seq,
284 EventKind::NodeCompleted {
285 node_id: node_id.clone(),
286 output: exec_result.output,
287 state_patch: exec_result.state_patch,
288 duration_ms,
289 gen_ai_system: exec_result.gen_ai_system,
290 gen_ai_model: exec_result.gen_ai_model,
291 input_tokens: exec_result.input_tokens,
292 output_tokens: exec_result.output_tokens,
293 finish_reason: exec_result.finish_reason,
294 cost_usd: None,
295 provenance: None,
296 },
297 ))
298 .await?;
299
300 info!(execution_id = %execution_id, node_id = %node_id, duration_ms, "Node completed");
301 }
302 Err(error) => {
303 self.backend.fail_work_item(item_id, &error).await?;
304 let seq = self.backend.latest_sequence(&execution_id).await? + 1;
305 self.backend
306 .append_event(jamjet_state::Event::new(
307 execution_id.clone(),
308 seq,
309 EventKind::NodeFailed {
310 node_id: node_id.clone(),
311 error: error.clone(),
312 attempt,
313 retryable: false,
314 },
315 ))
316 .await?;
317 warn!(execution_id = %execution_id, node_id = %node_id, attempt, %error, "Node failed");
318 }
319 }
320 Ok(())
321 }
322
323 async fn check_policy(
326 &self,
327 execution_id: &ExecutionId,
328 node_id: &str,
329 tenant_id: &str,
330 kind: &NodeKind,
331 node_def: &NodeDef,
332 ir: &WorkflowIr,
333 ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
334 let ctx = EvaluationContext::from_node_kind(node_id, kind);
335
336 let tenant_policy_set = self
338 .backend
339 .get_tenant(&jamjet_state::TenantId::from(tenant_id))
340 .await
341 .ok()
342 .flatten()
343 .and_then(|t| t.policy_set());
344
345 let mut sets: Vec<&jamjet_ir::workflow::PolicySetIr> = Vec::new();
348 if let Some(ref tp) = tenant_policy_set {
349 sets.push(tp);
350 }
351 if let Some(p) = &ir.policy {
352 sets.push(p);
353 }
354 if let Some(p) = &node_def.policy {
355 sets.push(p);
356 }
357 if sets.is_empty() {
358 return None;
359 }
360
361 let decision = PolicyEvaluator.evaluate(&ctx, &sets);
362
363 match decision {
364 PolicyDecision::Allow => None,
365
366 PolicyDecision::Block { reason } => {
367 let policy_scope = self.identify_policy_scope(
368 &ctx,
369 tenant_policy_set.as_ref(),
370 ir.policy.as_ref(),
371 node_def.policy.as_ref(),
372 );
373 warn!(execution_id = %execution_id, node_id, %reason, %policy_scope, "Policy blocked node");
374 if let Ok(latest) = self.backend.latest_sequence(execution_id).await {
377 let _ = self
378 .backend
379 .append_event(jamjet_state::Event::new(
380 execution_id.clone(),
381 latest + 1,
382 EventKind::PolicyViolation {
383 node_id: node_id.to_string(),
384 rule: reason.clone(),
385 decision: "blocked".to_string(),
386 policy_scope,
387 },
388 ))
389 .await;
390 }
391 Some(Err(format!("policy blocked: {reason}").into()))
392 }
393
394 PolicyDecision::RequireApproval { approver } => {
395 info!(execution_id = %execution_id, node_id, %approver, "Node requires approval");
396 let tool_name = ctx.tool_name.unwrap_or_else(|| node_id.to_string());
397 let seq = match self.backend.latest_sequence(execution_id).await {
400 Ok(s) => s + 1,
401 Err(e) => {
402 return Some(Err(format!(
403 "approval required but could not be recorded: {e}"
404 )
405 .into()))
406 }
407 };
408 let _ = self
409 .backend
410 .append_event(jamjet_state::Event::new(
411 execution_id.clone(),
412 seq,
413 EventKind::ToolApprovalRequired {
414 node_id: node_id.to_string(),
415 tool_name,
416 approver,
417 context: serde_json::json!({ "node_id": node_id }),
418 },
419 ))
420 .await;
421 Some(Ok(()))
422 }
423 }
424 }
425
426 fn identify_policy_scope(
432 &self,
433 ctx: &EvaluationContext,
434 tenant_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
435 workflow_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
436 node_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
437 ) -> String {
438 if let Some(p) = node_policy {
440 if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
441 return "node".to_string();
442 }
443 }
444 if let Some(p) = workflow_policy {
445 if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
446 return "workflow".to_string();
447 }
448 }
449 if let Some(p) = tenant_policy {
450 if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
451 return "tenant".to_string();
452 }
453 }
454 "unknown".to_string()
455 }
456
457 async fn check_autonomy(
460 &self,
461 execution_id: &ExecutionId,
462 node_id: &str,
463 kind: &NodeKind,
464 budget: &BudgetState,
465 ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
466 let kind_tag = node_kind_tag(kind);
467 if kind_tag != "agent" {
468 return None;
469 }
470 let agents = self.agents.as_ref()?;
471
472 let agent_ref = serde_json::to_value(kind)
473 .ok()
474 .and_then(|v| {
475 v.get("agent_ref")
476 .and_then(|a| a.as_str())
477 .map(|s| s.to_string())
478 })
479 .unwrap_or_else(|| node_id.to_string());
480
481 let agent = match agents.get_by_uri(&agent_ref).await {
482 Ok(Some(a)) => a,
483 _ => return None,
484 };
485 let card = agent.card;
486
487 let ctx = AutonomyContext {
488 agent_ref: agent_ref.clone(),
489 autonomy_level: card.autonomy.clone(),
490 constraints: card.constraints.clone(),
491 current_iterations: budget.iteration_count,
492 current_tool_calls: budget.tool_call_count,
493 current_cost_usd: budget.total_cost_usd,
494 current_tokens: budget.total_tokens(),
495 consecutive_errors: budget.consecutive_error_count,
496 circuit_breaker_threshold: 3,
497 };
498
499 let decision = AutonomyEnforcer.check(&ctx, None);
500
501 match decision {
502 AutonomyDecision::Proceed => None,
503
504 AutonomyDecision::RequireToolApproval { tool_name } => {
505 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
506 let _ = self
507 .backend
508 .append_event(jamjet_state::Event::new(
509 execution_id.clone(),
510 seq,
511 EventKind::ToolApprovalRequired {
512 node_id: node_id.to_string(),
513 tool_name,
514 approver: "human".to_string(),
515 context: serde_json::json!({ "agent_ref": agent_ref }),
516 },
517 ))
518 .await;
519 Some(Ok(()))
520 }
521
522 AutonomyDecision::EscalateLimit {
523 limit_type,
524 limit_value,
525 actual_value,
526 escalation_target,
527 } => {
528 warn!(execution_id = %execution_id, node_id, %limit_type, "Autonomy limit reached");
529 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
530 let _ = self
531 .backend
532 .append_event(jamjet_state::Event::new(
533 execution_id.clone(),
534 seq,
535 EventKind::AutonomyLimitReached {
536 node_id: node_id.to_string(),
537 agent_ref: agent_ref.clone(),
538 limit_type: limit_type.clone(),
539 limit_value,
540 actual_value,
541 },
542 ))
543 .await;
544 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
545 let _ = self
546 .backend
547 .append_event(jamjet_state::Event::new(
548 execution_id.clone(),
549 seq,
550 EventKind::EscalationRequired {
551 node_id: node_id.to_string(),
552 agent_ref,
553 reason: format!("autonomy_limit:{limit_type}"),
554 escalation_target: escalation_target.as_str(),
555 },
556 ))
557 .await;
558 Some(Err(format!("autonomy limit: {limit_type}").into()))
559 }
560
561 AutonomyDecision::TripCircuitBreaker {
562 consecutive_errors,
563 threshold,
564 escalation_target,
565 } => {
566 warn!(execution_id = %execution_id, node_id, consecutive_errors, "Circuit breaker tripped");
567 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
568 let _ = self
569 .backend
570 .append_event(jamjet_state::Event::new(
571 execution_id.clone(),
572 seq,
573 EventKind::CircuitBreakerTripped {
574 node_id: node_id.to_string(),
575 agent_ref: agent_ref.clone(),
576 consecutive_errors,
577 threshold,
578 },
579 ))
580 .await;
581 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
582 let _ = self
583 .backend
584 .append_event(jamjet_state::Event::new(
585 execution_id.clone(),
586 seq,
587 EventKind::EscalationRequired {
588 node_id: node_id.to_string(),
589 agent_ref,
590 reason: "circuit_breaker".to_string(),
591 escalation_target: escalation_target.as_str(),
592 },
593 ))
594 .await;
595 Some(Err("circuit breaker tripped".into()))
596 }
597 }
598 }
599
600 async fn load_budget_state(&self, execution_id: &ExecutionId) -> BudgetState {
603 self.backend
604 .latest_snapshot(execution_id)
605 .await
606 .ok()
607 .flatten()
608 .map(|s| BudgetState::from_snapshot_state(&s.state))
609 .unwrap_or_default()
610 }
611
612 #[allow(clippy::too_many_arguments)]
613 async fn check_budget_after_execution(
614 &self,
615 execution_id: &ExecutionId,
616 node_id: &str,
617 ir: &WorkflowIr,
618 budget: &mut BudgetState,
619 input_tokens: Option<u64>,
620 output_tokens: Option<u64>,
621 cost_usd: Option<f64>,
622 state_patch: &mut serde_json::Value,
623 ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
624 budget.accumulate(input_tokens, output_tokens, cost_usd);
625 budget.patch_into_snapshot_state(state_patch);
626
627 if let Some(tb) = &ir.token_budget {
628 if let Some(limit) = tb.total_tokens {
629 let limit = limit as u64;
630 let current = budget.total_tokens();
631 if current > limit {
632 return self
633 .emit_token_budget_exceeded(
634 execution_id,
635 node_id,
636 "total_tokens",
637 limit,
638 current,
639 )
640 .await;
641 }
642 }
643 if let Some(limit) = tb.input_tokens {
644 let limit = limit as u64;
645 if budget.total_input_tokens > limit {
646 return self
647 .emit_token_budget_exceeded(
648 execution_id,
649 node_id,
650 "input_tokens",
651 limit,
652 budget.total_input_tokens,
653 )
654 .await;
655 }
656 }
657 if let Some(limit) = tb.output_tokens {
658 let limit = limit as u64;
659 if budget.total_output_tokens > limit {
660 return self
661 .emit_token_budget_exceeded(
662 execution_id,
663 node_id,
664 "output_tokens",
665 limit,
666 budget.total_output_tokens,
667 )
668 .await;
669 }
670 }
671 }
672
673 if let Some(cost_limit) = ir.cost_budget_usd {
674 if budget.total_cost_usd > cost_limit {
675 warn!(
676 execution_id = %execution_id,
677 node_id,
678 cost_limit,
679 current = budget.total_cost_usd,
680 "Cost budget exceeded"
681 );
682 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
683 let _ = self
684 .backend
685 .append_event(jamjet_state::Event::new(
686 execution_id.clone(),
687 seq,
688 EventKind::CostBudgetExceeded {
689 node_id: node_id.to_string(),
690 limit_usd: cost_limit,
691 current_usd: budget.total_cost_usd,
692 },
693 ))
694 .await;
695 return Some(Err(format!(
696 "cost budget exceeded: ${:.4} > ${:.4}",
697 budget.total_cost_usd, cost_limit
698 )
699 .into()));
700 }
701 }
702
703 None
704 }
705
706 async fn emit_token_budget_exceeded(
707 &self,
708 execution_id: &ExecutionId,
709 node_id: &str,
710 kind: &str,
711 limit: u64,
712 current: u64,
713 ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
714 warn!(execution_id = %execution_id, node_id, kind, limit, current, "Token budget exceeded");
715 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
716 let _ = self
717 .backend
718 .append_event(jamjet_state::Event::new(
719 execution_id.clone(),
720 seq,
721 EventKind::TokenBudgetExceeded {
722 node_id: node_id.to_string(),
723 kind: kind.to_string(),
724 limit,
725 current,
726 },
727 ))
728 .await;
729 Some(Err(format!(
730 "token budget exceeded: {kind} {current} > {limit}"
731 )
732 .into()))
733 }
734
735 async fn load_ir(
738 &self,
739 workflow_id: &str,
740 workflow_version: &str,
741 ) -> Result<WorkflowIr, String> {
742 let def = self
743 .backend
744 .get_workflow(workflow_id, workflow_version)
745 .await
746 .map_err(|e| e.to_string())?
747 .ok_or_else(|| format!("workflow {workflow_id} v{workflow_version} not found"))?;
748 serde_json::from_value(def.ir).map_err(|e| e.to_string())
749 }
750}
751
752fn parse_payload(payload: &serde_json::Value) -> (String, String) {
753 let workflow_id = payload
754 .get("workflow_id")
755 .and_then(|v| v.as_str())
756 .unwrap_or("unknown")
757 .to_string();
758 let workflow_version = payload
759 .get("workflow_version")
760 .and_then(|v| v.as_str())
761 .unwrap_or("1.0.0")
762 .to_string();
763 (workflow_id, workflow_version)
764}