Skip to main content

claude_api/managed_agents/
threads.rs

1//! Multi-agent session threads.
2//!
3//! When an agent has `callable_agents` configured, the coordinator can
4//! delegate work to those sub-agents at runtime. Each delegation runs
5//! in its own **thread**: a context-isolated event stream with its own
6//! conversation history. The session-level event stream is the
7//! "primary thread" and shows aggregated activity; per-thread streams
8//! drill into one sub-agent's reasoning and tool calls.
9//!
10//! Threads are a Research Preview feature; the full multi-agent
11//! workflow is gated on the same `managed-agents-2026-04-01` beta
12//! header as the rest of Managed Agents.
13
14use serde::{Deserialize, Serialize};
15
16use crate::client::Client;
17use crate::error::Result;
18use crate::pagination::Paginated;
19
20use super::betas;
21use super::events::SessionEvent;
22
23/// One thread inside a multi-agent session.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[non_exhaustive]
26pub struct Thread {
27    /// Stable identifier (`sthr_...`).
28    pub id: String,
29    /// Wire type tag (`"session_thread"`).
30    #[serde(rename = "type", default, skip_serializing_if = "Option::is_none")]
31    pub ty: Option<String>,
32    /// Name of the agent driving this thread.
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub agent_name: Option<String>,
35    /// Lifecycle status. Same enum shape as the parent session.
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub status: Option<super::sessions::SessionStatus>,
38    /// Model the thread runs against.
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub model: Option<crate::types::ModelId>,
41    /// Creation timestamp (RFC3339).
42    #[serde(default, skip_serializing_if = "Option::is_none")]
43    pub created_at: Option<String>,
44}
45
46/// Namespace handle for thread operations on a single session.
47///
48/// Obtained via [`Sessions::threads`](super::sessions::Sessions::threads).
49pub struct Threads<'a> {
50    pub(crate) client: &'a Client,
51    pub(crate) session_id: String,
52    /// Inherited from
53    /// [`Sessions::with_research_preview`](super::sessions::Sessions::with_research_preview).
54    pub(crate) research_preview: bool,
55}
56
57impl Threads<'_> {
58    /// `GET /v1/sessions/{session_id}/threads`. List the threads in a
59    /// session, including the primary thread (if exposed by the
60    /// server) and any spawned sub-agent threads.
61    pub async fn list(&self) -> Result<Paginated<Thread>> {
62        let path = format!("/v1/sessions/{}/threads", self.session_id);
63        self.client
64            .execute_with_retry(
65                || self.client.request_builder(reqwest::Method::GET, &path),
66                betas(self.research_preview),
67            )
68            .await
69    }
70
71    /// Sub-namespace for events on a single thread.
72    #[must_use]
73    pub fn events(&self, thread_id: impl Into<String>) -> ThreadEvents<'_> {
74        ThreadEvents {
75            client: self.client,
76            session_id: self.session_id.clone(),
77            thread_id: thread_id.into(),
78            research_preview: self.research_preview,
79        }
80    }
81}
82
83/// Namespace handle for events on a specific thread.
84pub struct ThreadEvents<'a> {
85    client: &'a Client,
86    session_id: String,
87    thread_id: String,
88    research_preview: bool,
89}
90
91impl ThreadEvents<'_> {
92    /// `GET /v1/sessions/{session_id}/threads/{thread_id}/events`.
93    pub async fn list(&self) -> Result<Paginated<SessionEvent>> {
94        let path = format!(
95            "/v1/sessions/{}/threads/{}/events",
96            self.session_id, self.thread_id
97        );
98        self.client
99            .execute_with_retry(
100                || self.client.request_builder(reqwest::Method::GET, &path),
101                betas(self.research_preview),
102            )
103            .await
104    }
105
106    /// `GET /v1/sessions/{session_id}/threads/{thread_id}/stream`.
107    /// Returns an
108    /// [`EventStream`](crate::managed_agents::events::EventStream)
109    /// scoped to a single thread. Events fired before the stream
110    /// connects are not delivered; pair with [`Self::list`] to seed a
111    /// set of seen IDs.
112    #[cfg(feature = "streaming")]
113    #[cfg_attr(docsrs, doc(cfg(feature = "streaming")))]
114    pub async fn stream(&self) -> Result<crate::managed_agents::events::EventStream> {
115        let path = format!(
116            "/v1/sessions/{}/threads/{}/stream",
117            self.session_id, self.thread_id
118        );
119        let response = self
120            .client
121            .execute_streaming(
122                self.client
123                    .request_builder(reqwest::Method::GET, &path)
124                    .header("accept", "text/event-stream"),
125                betas(self.research_preview),
126            )
127            .await?;
128        Ok(crate::managed_agents::events::EventStream::from_response(
129            response,
130        ))
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use pretty_assertions::assert_eq;
138    use serde_json::json;
139    use wiremock::matchers::{method, path};
140    use wiremock::{Mock, MockServer, ResponseTemplate};
141
142    fn client_for(mock: &MockServer) -> Client {
143        Client::builder()
144            .api_key("sk-ant-test")
145            .base_url(mock.uri())
146            .build()
147            .unwrap()
148    }
149
150    #[tokio::test]
151    async fn list_threads_returns_typed_records() {
152        let mock = MockServer::start().await;
153        Mock::given(method("GET"))
154            .and(path("/v1/sessions/sesn_x/threads"))
155            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
156                "data": [
157                    {
158                        "id": "sthr_a",
159                        "type": "session_thread",
160                        "agent_name": "Reviewer",
161                        "status": "running",
162                        "model": "claude-opus-4-7"
163                    }
164                ],
165                "has_more": false
166            })))
167            .mount(&mock)
168            .await;
169
170        let client = client_for(&mock);
171        let page = client
172            .managed_agents()
173            .sessions()
174            .threads("sesn_x")
175            .list()
176            .await
177            .unwrap();
178        assert_eq!(page.data.len(), 1);
179        assert_eq!(page.data[0].agent_name.as_deref(), Some("Reviewer"));
180    }
181
182    #[tokio::test]
183    async fn list_thread_events_returns_typed_session_events() {
184        let mock = MockServer::start().await;
185        Mock::given(method("GET"))
186            .and(path("/v1/sessions/sesn_x/threads/sthr_a/events"))
187            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
188                "data": [
189                    {"type": "agent.message", "id": "sevt_1", "content": [{"type": "text", "text": "hi"}]}
190                ],
191                "has_more": false
192            })))
193            .mount(&mock)
194            .await;
195
196        let client = client_for(&mock);
197        let page = client
198            .managed_agents()
199            .sessions()
200            .threads("sesn_x")
201            .events("sthr_a")
202            .list()
203            .await
204            .unwrap();
205        assert_eq!(page.data.len(), 1);
206    }
207
208    #[cfg(feature = "streaming")]
209    #[tokio::test]
210    async fn stream_thread_yields_typed_events() {
211        use futures_util::StreamExt;
212        use wiremock::matchers::header;
213        let sse = concat!(
214            "event: message\n",
215            "data: {\"type\":\"agent.message\",\"id\":\"sevt_1\",\"content\":[{\"type\":\"text\",\"text\":\"hi\"}]}\n",
216            "\n",
217            "event: message\n",
218            "data: {\"type\":\"session.thread_idle\",\"id\":\"sevt_2\"}\n",
219            "\n",
220        );
221        let mock = MockServer::start().await;
222        Mock::given(method("GET"))
223            .and(path("/v1/sessions/sesn_x/threads/sthr_a/stream"))
224            .and(header("anthropic-beta", "managed-agents-2026-04-01"))
225            .respond_with(
226                ResponseTemplate::new(200)
227                    .insert_header("content-type", "text/event-stream")
228                    .set_body_string(sse),
229            )
230            .mount(&mock)
231            .await;
232
233        let client = client_for(&mock);
234        let mut stream = client
235            .managed_agents()
236            .sessions()
237            .threads("sesn_x")
238            .events("sthr_a")
239            .stream()
240            .await
241            .unwrap();
242        let first = stream.next().await.unwrap().unwrap();
243        let second = stream.next().await.unwrap().unwrap();
244        assert!(first.type_tag().as_deref() == Some("agent.message"));
245        assert!(second.type_tag().as_deref() == Some("session.thread_idle"));
246    }
247}