temporal_agent_rs/
workflow.rs1use std::time::Duration;
21
22use temporalio_macros::{workflow, workflow_methods};
23use temporalio_sdk::{
24 ActivityOptions, ContinueAsNewOptions, SyncWorkflowContext, WorkflowContext,
25 WorkflowContextView, WorkflowResult,
26};
27
28use crate::activities::AgentActivities;
29use crate::state::{
30 AgentInput, AgentOutput, AgentState, LlmChatInput, LlmResponse, Message, StopReason, compact,
31};
32
33pub const CONTINUE_AS_NEW_THRESHOLD: usize = 200;
37
38pub const COMPACT_KEEP_RECENT: usize = 20;
40
41#[workflow]
48#[derive(Default)]
49pub struct AgentWorkflow {
50 state: AgentState,
51}
52
53#[workflow_methods]
54impl AgentWorkflow {
55 #[run]
58 pub async fn run(
59 ctx: &mut WorkflowContext<Self>,
60 input: AgentInput,
61 ) -> WorkflowResult<AgentOutput> {
62 ctx.state_mut(|s| {
63 s.state = AgentState::new(input);
64 });
65
66 let llm_opts = ActivityOptions::with_start_to_close_timeout(Duration::from_mins(2))
67 .heartbeat_timeout(Duration::from_secs(30))
68 .build();
69 let tool_opts =
74 ActivityOptions::with_start_to_close_timeout(Duration::from_hours(1)).build();
75
76 loop {
77 ctx.state_mut(|s| {
79 let pending = std::mem::take(&mut s.state.pending_user_messages);
80 for msg in pending {
81 s.state.history.push(Message::user(msg));
82 }
83 });
84
85 let (turn, max_turns, history_len) =
86 ctx.state(|s| (s.state.turn, s.state.input.max_turns, s.state.history.len()));
87
88 if turn >= max_turns {
89 let out = ctx.state(|s| {
90 build_output(&s.state, StopReason::MaxTurnsReached, "[max turns reached]")
91 });
92 return Ok(out);
93 }
94
95 if history_len > CONTINUE_AS_NEW_THRESHOLD {
96 let next_input = ctx.state(|s| compact(&s.state, COMPACT_KEEP_RECENT));
97 tracing::info!(history_len, "compacting and continuing as new");
98 ctx.continue_as_new(&next_input, ContinueAsNewOptions::default())?;
99 unreachable!(); }
101
102 let chat_input = ctx.state(|s| LlmChatInput {
103 messages: s.state.history.clone(),
104 tools: crate::builder::WORKER_TOOL_CATALOG
105 .get()
106 .cloned()
107 .unwrap_or_default(),
108 });
109
110 let response: LlmResponse = ctx
111 .start_activity(AgentActivities::llm_chat, chat_input, llm_opts.clone())
112 .await?;
113
114 match response {
115 LlmResponse::Final { answer } => {
116 ctx.state_mut(|s| {
117 s.state.history.push(Message::assistant_text(&answer));
118 });
119 let out =
120 ctx.state(|s| build_output(&s.state, StopReason::FinalAnswer, &answer));
121 return Ok(out);
122 }
123 LlmResponse::UseTools { calls } => {
124 ctx.state_mut(|s| {
125 s.state
126 .history
127 .push(Message::assistant_with_tools(calls.clone()));
128 });
129
130 for call in calls {
131 let result = ctx
132 .start_activity(AgentActivities::execute_tool, call, tool_opts.clone())
133 .await?;
134 ctx.state_mut(|s| {
135 s.state.history.push(Message::tool_result(&result));
136 s.state.tool_calls_executed += 1;
137 });
138 }
139 ctx.state_mut(|s| s.state.turn += 1);
140 }
141 }
142 }
143 }
144
145 #[signal]
150 pub fn add_user_message(&mut self, _ctx: &mut SyncWorkflowContext<Self>, msg: String) {
151 self.state.pending_user_messages.push(msg);
152 }
153
154 #[query]
156 pub fn get_state(&self, _ctx: &WorkflowContextView) -> AgentState {
157 self.state.clone()
158 }
159
160 #[query]
162 pub fn turn_count(&self, _ctx: &WorkflowContextView) -> u32 {
163 self.state.turn
164 }
165}
166
167fn build_output(state: &AgentState, stop_reason: StopReason, answer: &str) -> AgentOutput {
168 AgentOutput {
169 final_answer: answer.to_string(),
170 stop_reason,
171 turns_used: state.turn,
172 tool_calls: state.tool_calls_executed,
173 }
174}