1use crate::executor::{ExecutionResult, NodeExecutor};
2use crate::heartbeat::spawn_heartbeat;
3use jamjet_agents::AgentRegistry;
4use jamjet_core::node::NodeKind;
5use jamjet_core::workflow::ExecutionId;
6use jamjet_ir::workflow::{NodeDef, WorkflowIr};
7use jamjet_policy::autonomy::{AutonomyContext, AutonomyDecision, AutonomyEnforcer};
8use jamjet_policy::engine::node_kind_tag;
9use jamjet_policy::{EvaluationContext, PolicyDecision, PolicyEvaluator};
10use jamjet_state::backend::{StateBackend, WorkItem};
11use jamjet_state::budget::BudgetState;
12use jamjet_state::event::EventKind;
13use jamjet_telemetry::{gen_ai_attrs, record_gen_ai_usage};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17use tracing::{info, instrument, warn};
18
19pub struct Worker {
23 pub worker_id: String,
24 backend: Arc<dyn StateBackend>,
25 agents: Option<Arc<dyn AgentRegistry>>,
27 queue_types: Vec<String>,
28 poll_interval: Duration,
29 executors: HashMap<String, Arc<dyn NodeExecutor>>,
30}
31
32impl Worker {
33 pub fn new(
34 worker_id: String,
35 backend: Arc<dyn StateBackend>,
36 queue_types: Vec<String>,
37 ) -> Self {
38 Self {
39 worker_id,
40 backend,
41 agents: None,
42 queue_types,
43 poll_interval: Duration::from_millis(500),
44 executors: HashMap::new(),
45 }
46 }
47
48 pub fn with_agents(mut self, agents: Arc<dyn AgentRegistry>) -> Self {
50 self.agents = Some(agents);
51 self
52 }
53
54 pub fn register_executor(
55 mut self,
56 kind: impl Into<String>,
57 executor: Arc<dyn NodeExecutor>,
58 ) -> Self {
59 self.executors.insert(kind.into(), executor);
60 self
61 }
62
63 pub async fn run(&self) {
64 info!(
65 worker_id = %self.worker_id,
66 queues = ?self.queue_types,
67 "Worker started"
68 );
69 loop {
70 match self.poll_and_execute().await {
71 Ok(true) => {}
72 Ok(false) => tokio::time::sleep(self.poll_interval).await,
73 Err(e) => {
74 warn!(worker_id = %self.worker_id, "Worker error: {e}");
75 tokio::time::sleep(self.poll_interval).await;
76 }
77 }
78 }
79 }
80
81 #[instrument(skip(self), fields(worker_id = %self.worker_id))]
82 async fn poll_and_execute(&self) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
83 let queue_refs: Vec<&str> = self.queue_types.iter().map(|s| s.as_str()).collect();
84 let item = self
85 .backend
86 .claim_work_item(&self.worker_id, &queue_refs)
87 .await?;
88 let Some(item) = item else { return Ok(false) };
89 info!(
90 worker_id = %self.worker_id,
91 execution_id = %item.execution_id,
92 node_id = %item.node_id,
93 attempt = item.attempt,
94 "Claimed work item"
95 );
96 self.execute_item(item).await?;
97 Ok(true)
98 }
99
100 async fn execute_item(
101 &self,
102 item: WorkItem,
103 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
104 let start = std::time::Instant::now();
105 let execution_id = item.execution_id.clone();
106 let node_id = item.node_id.clone();
107 let tenant_id = item.tenant_id.clone();
108 let attempt = item.attempt;
109 let item_id = item.id;
110
111 let node_span = tracing::info_span!(
112 "jamjet.node",
113 "jamjet.execution.id" = %execution_id,
114 "jamjet.node.id" = %node_id,
115 "jamjet.attempt" = attempt,
116 "jamjet.worker.id" = %self.worker_id,
117 "jamjet.workflow.id" = tracing::field::Empty,
118 "jamjet.workflow.version" = tracing::field::Empty,
119 "jamjet.node.kind" = tracing::field::Empty,
120 "gen_ai.system" = tracing::field::Empty,
121 "gen_ai.request.model" = tracing::field::Empty,
122 "gen_ai.usage.input_tokens" = tracing::field::Empty,
123 "gen_ai.usage.output_tokens" = tracing::field::Empty,
124 "gen_ai.response.finish_reasons" = tracing::field::Empty,
125 );
126 let _span_guard = node_span.enter();
127
128 let seq = self.backend.latest_sequence(&execution_id).await? + 1;
130 self.backend
131 .append_event(jamjet_state::Event::new(
132 execution_id.clone(),
133 seq,
134 EventKind::NodeStarted {
135 node_id: node_id.clone(),
136 worker_id: self.worker_id.clone(),
137 attempt,
138 },
139 ))
140 .await?;
141
142 let heartbeat = spawn_heartbeat(
144 Arc::clone(&self.backend),
145 item_id,
146 self.worker_id.clone(),
147 Duration::from_secs(15),
148 );
149
150 let (workflow_id, workflow_version) = parse_payload(&item.payload);
152 node_span.record(gen_ai_attrs::JAMJET_WORKFLOW_ID, workflow_id.as_str());
153 node_span.record(
154 gen_ai_attrs::JAMJET_WORKFLOW_VERSION,
155 workflow_version.as_str(),
156 );
157
158 let result: Result<ExecutionResult, String> = match self
159 .load_ir(&workflow_id, &workflow_version)
160 .await
161 {
162 Err(e) => Err(format!("failed to load IR: {e}")),
163 Ok(ir) => match ir.node(&node_id) {
164 None => Err(format!("node {node_id} not found in IR")),
165 Some(node_def) => {
166 let kind = &node_def.kind;
167 let kind_tag = node_kind_tag(kind);
168 node_span.record(gen_ai_attrs::JAMJET_NODE_KIND, kind_tag.as_str());
169
170 let budget = self.load_budget_state(&execution_id).await;
172
173 if let Some(r) = self
175 .check_policy(&execution_id, &node_id, &tenant_id, kind, node_def, &ir)
176 .await
177 {
178 heartbeat.abort();
179 return r;
180 }
181
182 if let Some(r) = self
184 .check_autonomy(&execution_id, &node_id, kind, &budget)
185 .await
186 {
187 heartbeat.abort();
188 return r;
189 }
190
191 let exec_result = match self.executors.get(&kind_tag) {
193 Some(executor) if kind_tag == "agent_tool" => {
194 let (tx, mut rx) = tokio::sync::mpsc::channel::<serde_json::Value>(64);
195 let backend = Arc::clone(&self.backend);
196 let eid = execution_id.clone();
197 let receiver_handle = tokio::spawn(async move {
198 while let Some(event) = rx.recv().await {
199 backend
200 .patch_append_array(&eid, "agent_tool_events", event)
201 .await
202 .map_err(|e| format!("patch_append_array failed: {e}"))?;
203 }
204 Ok::<(), String>(())
205 });
206 let result = executor.execute_streaming(&item, tx).await;
207 match receiver_handle.await {
208 Ok(Err(e)) => Err(e),
209 Err(e) => Err(format!("Receiver task panicked: {e}")),
210 Ok(Ok(())) => result,
211 }
212 }
213 Some(executor) => executor.execute(&item).await,
214 None => {
215 info!(node_id = %node_id, kind = %kind_tag, "No executor; using stub");
216 Ok(ExecutionResult {
217 output: serde_json::json!({}),
218 state_patch: serde_json::json!({}),
219 duration_ms: start.elapsed().as_millis() as u64,
220 gen_ai_system: None,
221 gen_ai_model: None,
222 input_tokens: None,
223 output_tokens: None,
224 finish_reason: None,
225 })
226 }
227 };
228
229 match exec_result {
231 Ok(mut result) => {
232 let mut budget = budget;
233 if let Some(r) = self
234 .check_budget_after_execution(
235 &execution_id,
236 &node_id,
237 &ir,
238 &mut budget,
239 result.input_tokens,
240 result.output_tokens,
241 None,
242 &mut result.state_patch,
243 )
244 .await
245 {
246 heartbeat.abort();
247 return r;
248 }
249 Ok(result)
250 }
251 Err(e) => Err(e),
252 }
253 }
254 },
255 };
256
257 let duration_ms = start.elapsed().as_millis() as u64;
258 heartbeat.abort();
259
260 match result {
261 Ok(exec_result) => {
262 if let (Some(system), Some(model), Some(input), Some(output)) = (
263 &exec_result.gen_ai_system,
264 &exec_result.gen_ai_model,
265 exec_result.input_tokens,
266 exec_result.output_tokens,
267 ) {
268 record_gen_ai_usage(&node_span, system, model, input, output);
269 }
270 if let Some(finish_reason) = &exec_result.finish_reason {
271 node_span.record(
272 gen_ai_attrs::RESPONSE_FINISH_REASONS,
273 finish_reason.as_str(),
274 );
275 }
276
277 self.backend.complete_work_item(item_id).await?;
278
279 let seq = self.backend.latest_sequence(&execution_id).await? + 1;
280 self.backend
281 .append_event(jamjet_state::Event::new(
282 execution_id.clone(),
283 seq,
284 EventKind::NodeCompleted {
285 node_id: node_id.clone(),
286 output: exec_result.output,
287 state_patch: exec_result.state_patch,
288 duration_ms,
289 gen_ai_system: exec_result.gen_ai_system,
290 gen_ai_model: exec_result.gen_ai_model,
291 input_tokens: exec_result.input_tokens,
292 output_tokens: exec_result.output_tokens,
293 finish_reason: exec_result.finish_reason,
294 cost_usd: None,
295 provenance: None,
296 },
297 ))
298 .await?;
299
300 info!(execution_id = %execution_id, node_id = %node_id, duration_ms, "Node completed");
301 }
302 Err(error) => {
303 self.backend.fail_work_item(item_id, &error).await?;
304 let seq = self.backend.latest_sequence(&execution_id).await? + 1;
305 self.backend
306 .append_event(jamjet_state::Event::new(
307 execution_id.clone(),
308 seq,
309 EventKind::NodeFailed {
310 node_id: node_id.clone(),
311 error: error.clone(),
312 attempt,
313 retryable: false,
314 },
315 ))
316 .await?;
317 warn!(execution_id = %execution_id, node_id = %node_id, attempt, %error, "Node failed");
318 }
319 }
320 Ok(())
321 }
322
323 async fn check_policy(
326 &self,
327 execution_id: &ExecutionId,
328 node_id: &str,
329 tenant_id: &str,
330 kind: &NodeKind,
331 node_def: &NodeDef,
332 ir: &WorkflowIr,
333 ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
334 let ctx = EvaluationContext::from_node_kind(node_id, kind);
335
336 let tenant_policy_set = self
338 .backend
339 .get_tenant(&jamjet_state::TenantId::from(tenant_id))
340 .await
341 .ok()
342 .flatten()
343 .and_then(|t| t.policy_set());
344
345 let mut sets: Vec<&jamjet_ir::workflow::PolicySetIr> = Vec::new();
348 if let Some(ref tp) = tenant_policy_set {
349 sets.push(tp);
350 }
351 if let Some(p) = &ir.policy {
352 sets.push(p);
353 }
354 if let Some(p) = &node_def.policy {
355 sets.push(p);
356 }
357 if sets.is_empty() {
358 return None;
359 }
360
361 let decision = PolicyEvaluator.evaluate(&ctx, &sets);
362
363 match decision {
364 PolicyDecision::Allow => None,
365
366 PolicyDecision::Block { reason } => {
367 let policy_scope = self.identify_policy_scope(
368 &ctx,
369 tenant_policy_set.as_ref(),
370 ir.policy.as_ref(),
371 node_def.policy.as_ref(),
372 );
373 warn!(execution_id = %execution_id, node_id, %reason, %policy_scope, "Policy blocked node");
374 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
375 let _ = self
376 .backend
377 .append_event(jamjet_state::Event::new(
378 execution_id.clone(),
379 seq,
380 EventKind::PolicyViolation {
381 node_id: node_id.to_string(),
382 rule: reason.clone(),
383 decision: "blocked".to_string(),
384 policy_scope,
385 },
386 ))
387 .await;
388 Some(Err(format!("policy blocked: {reason}").into()))
389 }
390
391 PolicyDecision::RequireApproval { approver } => {
392 info!(execution_id = %execution_id, node_id, %approver, "Node requires approval");
393 let tool_name = ctx.tool_name.unwrap_or_else(|| node_id.to_string());
394 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
395 let _ = self
396 .backend
397 .append_event(jamjet_state::Event::new(
398 execution_id.clone(),
399 seq,
400 EventKind::ToolApprovalRequired {
401 node_id: node_id.to_string(),
402 tool_name,
403 approver,
404 context: serde_json::json!({ "node_id": node_id }),
405 },
406 ))
407 .await;
408 Some(Ok(()))
409 }
410 }
411 }
412
413 fn identify_policy_scope(
419 &self,
420 ctx: &EvaluationContext,
421 tenant_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
422 workflow_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
423 node_policy: Option<&jamjet_ir::workflow::PolicySetIr>,
424 ) -> String {
425 if let Some(p) = node_policy {
427 if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
428 return "node".to_string();
429 }
430 }
431 if let Some(p) = workflow_policy {
432 if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
433 return "workflow".to_string();
434 }
435 }
436 if let Some(p) = tenant_policy {
437 if !matches!(PolicyEvaluator.evaluate(ctx, &[p]), PolicyDecision::Allow) {
438 return "tenant".to_string();
439 }
440 }
441 "unknown".to_string()
442 }
443
444 async fn check_autonomy(
447 &self,
448 execution_id: &ExecutionId,
449 node_id: &str,
450 kind: &NodeKind,
451 budget: &BudgetState,
452 ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
453 let kind_tag = node_kind_tag(kind);
454 if kind_tag != "agent" {
455 return None;
456 }
457 let agents = self.agents.as_ref()?;
458
459 let agent_ref = serde_json::to_value(kind)
460 .ok()
461 .and_then(|v| {
462 v.get("agent_ref")
463 .and_then(|a| a.as_str())
464 .map(|s| s.to_string())
465 })
466 .unwrap_or_else(|| node_id.to_string());
467
468 let agent = match agents.get_by_uri(&agent_ref).await {
469 Ok(Some(a)) => a,
470 _ => return None,
471 };
472 let card = agent.card;
473
474 let ctx = AutonomyContext {
475 agent_ref: agent_ref.clone(),
476 autonomy_level: card.autonomy.clone(),
477 constraints: card.constraints.clone(),
478 current_iterations: budget.iteration_count,
479 current_tool_calls: budget.tool_call_count,
480 current_cost_usd: budget.total_cost_usd,
481 current_tokens: budget.total_tokens(),
482 consecutive_errors: budget.consecutive_error_count,
483 circuit_breaker_threshold: 3,
484 };
485
486 let decision = AutonomyEnforcer.check(&ctx, None);
487
488 match decision {
489 AutonomyDecision::Proceed => None,
490
491 AutonomyDecision::RequireToolApproval { tool_name } => {
492 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
493 let _ = self
494 .backend
495 .append_event(jamjet_state::Event::new(
496 execution_id.clone(),
497 seq,
498 EventKind::ToolApprovalRequired {
499 node_id: node_id.to_string(),
500 tool_name,
501 approver: "human".to_string(),
502 context: serde_json::json!({ "agent_ref": agent_ref }),
503 },
504 ))
505 .await;
506 Some(Ok(()))
507 }
508
509 AutonomyDecision::EscalateLimit {
510 limit_type,
511 limit_value,
512 actual_value,
513 escalation_target,
514 } => {
515 warn!(execution_id = %execution_id, node_id, %limit_type, "Autonomy limit reached");
516 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
517 let _ = self
518 .backend
519 .append_event(jamjet_state::Event::new(
520 execution_id.clone(),
521 seq,
522 EventKind::AutonomyLimitReached {
523 node_id: node_id.to_string(),
524 agent_ref: agent_ref.clone(),
525 limit_type: limit_type.clone(),
526 limit_value,
527 actual_value,
528 },
529 ))
530 .await;
531 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
532 let _ = self
533 .backend
534 .append_event(jamjet_state::Event::new(
535 execution_id.clone(),
536 seq,
537 EventKind::EscalationRequired {
538 node_id: node_id.to_string(),
539 agent_ref,
540 reason: format!("autonomy_limit:{limit_type}"),
541 escalation_target: escalation_target.as_str(),
542 },
543 ))
544 .await;
545 Some(Err(format!("autonomy limit: {limit_type}").into()))
546 }
547
548 AutonomyDecision::TripCircuitBreaker {
549 consecutive_errors,
550 threshold,
551 escalation_target,
552 } => {
553 warn!(execution_id = %execution_id, node_id, consecutive_errors, "Circuit breaker tripped");
554 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
555 let _ = self
556 .backend
557 .append_event(jamjet_state::Event::new(
558 execution_id.clone(),
559 seq,
560 EventKind::CircuitBreakerTripped {
561 node_id: node_id.to_string(),
562 agent_ref: agent_ref.clone(),
563 consecutive_errors,
564 threshold,
565 },
566 ))
567 .await;
568 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
569 let _ = self
570 .backend
571 .append_event(jamjet_state::Event::new(
572 execution_id.clone(),
573 seq,
574 EventKind::EscalationRequired {
575 node_id: node_id.to_string(),
576 agent_ref,
577 reason: "circuit_breaker".to_string(),
578 escalation_target: escalation_target.as_str(),
579 },
580 ))
581 .await;
582 Some(Err("circuit breaker tripped".into()))
583 }
584 }
585 }
586
587 async fn load_budget_state(&self, execution_id: &ExecutionId) -> BudgetState {
590 self.backend
591 .latest_snapshot(execution_id)
592 .await
593 .ok()
594 .flatten()
595 .map(|s| BudgetState::from_snapshot_state(&s.state))
596 .unwrap_or_default()
597 }
598
599 #[allow(clippy::too_many_arguments)]
600 async fn check_budget_after_execution(
601 &self,
602 execution_id: &ExecutionId,
603 node_id: &str,
604 ir: &WorkflowIr,
605 budget: &mut BudgetState,
606 input_tokens: Option<u64>,
607 output_tokens: Option<u64>,
608 cost_usd: Option<f64>,
609 state_patch: &mut serde_json::Value,
610 ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
611 budget.accumulate(input_tokens, output_tokens, cost_usd);
612 budget.patch_into_snapshot_state(state_patch);
613
614 if let Some(tb) = &ir.token_budget {
615 if let Some(limit) = tb.total_tokens {
616 let limit = limit as u64;
617 let current = budget.total_tokens();
618 if current > limit {
619 return self
620 .emit_token_budget_exceeded(
621 execution_id,
622 node_id,
623 "total_tokens",
624 limit,
625 current,
626 )
627 .await;
628 }
629 }
630 if let Some(limit) = tb.input_tokens {
631 let limit = limit as u64;
632 if budget.total_input_tokens > limit {
633 return self
634 .emit_token_budget_exceeded(
635 execution_id,
636 node_id,
637 "input_tokens",
638 limit,
639 budget.total_input_tokens,
640 )
641 .await;
642 }
643 }
644 if let Some(limit) = tb.output_tokens {
645 let limit = limit as u64;
646 if budget.total_output_tokens > limit {
647 return self
648 .emit_token_budget_exceeded(
649 execution_id,
650 node_id,
651 "output_tokens",
652 limit,
653 budget.total_output_tokens,
654 )
655 .await;
656 }
657 }
658 }
659
660 if let Some(cost_limit) = ir.cost_budget_usd {
661 if budget.total_cost_usd > cost_limit {
662 warn!(
663 execution_id = %execution_id,
664 node_id,
665 cost_limit,
666 current = budget.total_cost_usd,
667 "Cost budget exceeded"
668 );
669 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
670 let _ = self
671 .backend
672 .append_event(jamjet_state::Event::new(
673 execution_id.clone(),
674 seq,
675 EventKind::CostBudgetExceeded {
676 node_id: node_id.to_string(),
677 limit_usd: cost_limit,
678 current_usd: budget.total_cost_usd,
679 },
680 ))
681 .await;
682 return Some(Err(format!(
683 "cost budget exceeded: ${:.4} > ${:.4}",
684 budget.total_cost_usd, cost_limit
685 )
686 .into()));
687 }
688 }
689
690 None
691 }
692
693 async fn emit_token_budget_exceeded(
694 &self,
695 execution_id: &ExecutionId,
696 node_id: &str,
697 kind: &str,
698 limit: u64,
699 current: u64,
700 ) -> Option<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
701 warn!(execution_id = %execution_id, node_id, kind, limit, current, "Token budget exceeded");
702 let seq = self.backend.latest_sequence(execution_id).await.ok()? + 1;
703 let _ = self
704 .backend
705 .append_event(jamjet_state::Event::new(
706 execution_id.clone(),
707 seq,
708 EventKind::TokenBudgetExceeded {
709 node_id: node_id.to_string(),
710 kind: kind.to_string(),
711 limit,
712 current,
713 },
714 ))
715 .await;
716 Some(Err(format!(
717 "token budget exceeded: {kind} {current} > {limit}"
718 )
719 .into()))
720 }
721
722 async fn load_ir(
725 &self,
726 workflow_id: &str,
727 workflow_version: &str,
728 ) -> Result<WorkflowIr, String> {
729 let def = self
730 .backend
731 .get_workflow(workflow_id, workflow_version)
732 .await
733 .map_err(|e| e.to_string())?
734 .ok_or_else(|| format!("workflow {workflow_id} v{workflow_version} not found"))?;
735 serde_json::from_value(def.ir).map_err(|e| e.to_string())
736 }
737}
738
739fn parse_payload(payload: &serde_json::Value) -> (String, String) {
740 let workflow_id = payload
741 .get("workflow_id")
742 .and_then(|v| v.as_str())
743 .unwrap_or("unknown")
744 .to_string();
745 let workflow_version = payload
746 .get("workflow_version")
747 .and_then(|v| v.as_str())
748 .unwrap_or("1.0.0")
749 .to_string();
750 (workflow_id, workflow_version)
751}