Skip to main content

temporal_agent_rs/
activities.rs

1//! Temporal activities backing the agent loop.
2//!
3//! The workflow itself stays deterministic; everything that talks to the
4//! outside world — calling the LLM, executing tools — happens here. These
5//! activities are stateless across invocations: all required context flows
6//! in via their inputs.
7
8use std::sync::Arc;
9
10use autoagents_llm::LLMProvider;
11use serde_json::Value;
12use temporalio_macros::activities;
13use temporalio_sdk::activities::{ActivityContext, ActivityError};
14
15use crate::error::AgentError;
16use crate::llm;
17use crate::state::{LlmChatInput, LlmResponse, ToolCall, ToolResult};
18use crate::tool::ToolRegistry;
19
20/// Shared state for the activity worker. One instance per worker process.
21///
22/// `Arc<dyn LLMProvider>` and the `ToolRegistry` are both cheap to clone and
23/// safe to share across concurrent activity executions.
24#[derive(Clone)]
25pub struct AgentActivities {
26    pub llm: Arc<dyn LLMProvider>,
27    pub tools: ToolRegistry,
28}
29
30impl AgentActivities {
31    pub fn new(llm: Arc<dyn LLMProvider>, tools: ToolRegistry) -> Self {
32        Self { llm, tools }
33    }
34}
35
36#[activities]
37impl AgentActivities {
38    /// One LLM "reasoning step": given the running conversation and the
39    /// catalog of tools, ask the model what to do next.
40    ///
41    /// Returns either a final answer or a list of tool calls to execute.
42    #[activity]
43    pub async fn llm_chat(
44        self: Arc<Self>,
45        _ctx: ActivityContext,
46        input: LlmChatInput,
47    ) -> Result<LlmResponse, ActivityError> {
48        tracing::debug!(
49            messages = input.messages.len(),
50            tools = input.tools.len(),
51            "llm_chat: invoking LLM"
52        );
53        let response = llm::chat(&self.llm, &input.messages, &input.tools)
54            .await
55            .map_err(agent_err_to_activity_err)?;
56        Ok(response)
57    }
58
59    /// Execute a single tool call.
60    ///
61    /// Tool-side failures are returned as `Ok(ToolResult { error: Some(..) })`
62    /// — they are observed by the LLM, not retried by Temporal. Only
63    /// infrastructure errors (missing tool registration, serde failure)
64    /// surface as `Err`.
65    #[activity]
66    pub async fn execute_tool(
67        self: Arc<Self>,
68        _ctx: ActivityContext,
69        call: ToolCall,
70    ) -> Result<ToolResult, ActivityError> {
71        let tool = self.tools.get(&call.name).ok_or_else(|| {
72            agent_err_to_activity_err(AgentError::ToolNotFound(call.name.clone()))
73        })?;
74
75        tracing::debug!(name = %call.name, id = %call.id, "execute_tool: dispatching");
76
77        match tool.execute(call.args.clone()).await {
78            Ok(output) => Ok(ToolResult {
79                call_id: call.id,
80                output,
81                error: None,
82            }),
83            Err(e) => Ok(ToolResult {
84                call_id: call.id,
85                output: Value::Null,
86                error: Some(e.to_string()),
87            }),
88        }
89    }
90}
91
92fn agent_err_to_activity_err(e: AgentError) -> ActivityError {
93    // ActivityError has a blanket From<E: Into<anyhow::Error>>; AgentError
94    // implements Error via thiserror so it converts cleanly.
95    ActivityError::from(anyhow::Error::from(e))
96}