Skip to main content

temporal_agent_rs/
workflow.rs

1//! `AgentWorkflow` — the durable agent loop.
2//!
3//! # Determinism contract
4//!
5//! Every line in this module must be deterministic across replay:
6//!
7//! - **No wall-clock**, **no random**, **no I/O** — all of that lives in
8//!   activities ([`crate::activities`]).
9//! - **Never call a [`ToolT`]** directly. The workflow holds tools by name,
10//!   not by reference, and dispatches them via the `execute_tool` activity.
11//! - **Never call the [`LLMProvider`]** directly. Use the `llm_chat`
12//!   activity.
13//!
14//! Re-entering the workflow function from history must produce identical
15//! commands to the original run.
16//!
17//! [`ToolT`]: autoagents_core::tool::ToolT
18//! [`LLMProvider`]: autoagents_llm::LLMProvider
19
20use std::time::Duration;
21
22use temporalio_macros::{workflow, workflow_methods};
23use temporalio_sdk::{
24    ActivityOptions, ContinueAsNewOptions, SyncWorkflowContext, WorkflowContext,
25    WorkflowContextView, WorkflowResult,
26};
27
28use crate::activities::AgentActivities;
29use crate::state::{
30    AgentInput, AgentOutput, AgentState, LlmChatInput, LlmResponse, Message, StopReason, compact,
31};
32
33/// Hard cap on history length before we `continue_as_new` to keep the event
34/// history small. Picked conservatively; tune via the worker config in a
35/// future release.
36pub const CONTINUE_AS_NEW_THRESHOLD: usize = 200;
37
38/// How many recent messages [`compact`] keeps when rotating to a new run.
39pub const COMPACT_KEEP_RECENT: usize = 20;
40
41/// Durable AI agent workflow.
42///
43/// Each invocation runs a ReAct loop until the model emits a final answer or
44/// `max_turns` is reached. Every LLM call and every tool call is a separate
45/// Temporal activity; crashes resume from the last completed activity without
46/// re-paying for prior LLM turns.
47#[workflow]
48#[derive(Default)]
49pub struct AgentWorkflow {
50    state: AgentState,
51}
52
53#[workflow_methods]
54impl AgentWorkflow {
55    /// Entry point. Initializes state from `input`, then loops until the LLM
56    /// emits a final answer or `max_turns` is reached.
57    #[run]
58    pub async fn run(
59        ctx: &mut WorkflowContext<Self>,
60        input: AgentInput,
61    ) -> WorkflowResult<AgentOutput> {
62        ctx.state_mut(|s| {
63            s.state = AgentState::new(input);
64        });
65
66        let llm_opts = ActivityOptions::with_start_to_close_timeout(Duration::from_mins(2))
67            .heartbeat_timeout(Duration::from_secs(30))
68            .build();
69        // Tool activities use a generous default so that long-running tools —
70        // notably human-in-the-loop tools that block on the user — have time
71        // to complete. Override per-deployment by writing tools that
72        // self-throttle / heartbeat, or fork this constant.
73        let tool_opts =
74            ActivityOptions::with_start_to_close_timeout(Duration::from_hours(1)).build();
75
76        loop {
77            // Drain signal-injected user messages into history.
78            ctx.state_mut(|s| {
79                let pending = std::mem::take(&mut s.state.pending_user_messages);
80                for msg in pending {
81                    s.state.history.push(Message::user(msg));
82                }
83            });
84
85            let (turn, max_turns, history_len) =
86                ctx.state(|s| (s.state.turn, s.state.input.max_turns, s.state.history.len()));
87
88            if turn >= max_turns {
89                let out = ctx.state(|s| {
90                    build_output(&s.state, StopReason::MaxTurnsReached, "[max turns reached]")
91                });
92                return Ok(out);
93            }
94
95            if history_len > CONTINUE_AS_NEW_THRESHOLD {
96                let next_input = ctx.state(|s| compact(&s.state, COMPACT_KEEP_RECENT));
97                tracing::info!(history_len, "compacting and continuing as new");
98                ctx.continue_as_new(&next_input, ContinueAsNewOptions::default())?;
99                unreachable!(); // continue_as_new always returns Err
100            }
101
102            let chat_input = ctx.state(|s| LlmChatInput {
103                messages: s.state.history.clone(),
104                tools: crate::builder::WORKER_TOOL_CATALOG
105                    .get()
106                    .cloned()
107                    .unwrap_or_default(),
108            });
109
110            let response: LlmResponse = ctx
111                .start_activity(AgentActivities::llm_chat, chat_input, llm_opts.clone())
112                .await?;
113
114            match response {
115                LlmResponse::Final { answer } => {
116                    ctx.state_mut(|s| {
117                        s.state.history.push(Message::assistant_text(&answer));
118                    });
119                    let out =
120                        ctx.state(|s| build_output(&s.state, StopReason::FinalAnswer, &answer));
121                    return Ok(out);
122                }
123                LlmResponse::UseTools { calls } => {
124                    ctx.state_mut(|s| {
125                        s.state
126                            .history
127                            .push(Message::assistant_with_tools(calls.clone()));
128                    });
129
130                    for call in calls {
131                        let result = ctx
132                            .start_activity(AgentActivities::execute_tool, call, tool_opts.clone())
133                            .await?;
134                        ctx.state_mut(|s| {
135                            s.state.history.push(Message::tool_result(&result));
136                            s.state.tool_calls_executed += 1;
137                        });
138                    }
139                    ctx.state_mut(|s| s.state.turn += 1);
140                }
141            }
142        }
143    }
144
145    /// Inject a new user message mid-conversation.
146    ///
147    /// Buffered until the start of the next loop iteration so we never mutate
148    /// history concurrently with an in-flight LLM activity.
149    #[signal]
150    pub fn add_user_message(&mut self, _ctx: &mut SyncWorkflowContext<Self>, msg: String) {
151        self.state.pending_user_messages.push(msg);
152    }
153
154    /// Read the full live state.
155    #[query]
156    pub fn get_state(&self, _ctx: &WorkflowContextView) -> AgentState {
157        self.state.clone()
158    }
159
160    /// Cheap turn counter for monitoring.
161    #[query]
162    pub fn turn_count(&self, _ctx: &WorkflowContextView) -> u32 {
163        self.state.turn
164    }
165}
166
167fn build_output(state: &AgentState, stop_reason: StopReason, answer: &str) -> AgentOutput {
168    AgentOutput {
169        final_answer: answer.to_string(),
170        stop_reason,
171        turns_used: state.turn,
172        tool_calls: state.tool_calls_executed,
173    }
174}