Skip to main content

claude_api/managed_agents/
threads.rs

1//! Multi-agent session threads.
2//!
3//! When an agent has `callable_agents` configured, the coordinator can
4//! delegate work to those sub-agents at runtime. Each delegation runs
5//! in its own **thread**: a context-isolated event stream with its own
6//! conversation history. The session-level event stream is the
7//! "primary thread" and shows aggregated activity; per-thread streams
8//! drill into one sub-agent's reasoning and tool calls.
9//!
10//! Threads are a Research Preview feature; the full multi-agent
11//! workflow is gated on the same `managed-agents-2026-04-01` beta
12//! header as the rest of Managed Agents.
13
14use serde::{Deserialize, Serialize};
15
16use crate::client::Client;
17use crate::error::Result;
18use crate::pagination::Paginated;
19
20use super::MANAGED_AGENTS_BETA;
21use super::events::SessionEvent;
22
23/// One thread inside a multi-agent session.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[non_exhaustive]
26pub struct Thread {
27    /// Stable identifier (`sthr_...`).
28    pub id: String,
29    /// Wire type tag (`"session_thread"`).
30    #[serde(rename = "type", default, skip_serializing_if = "Option::is_none")]
31    pub ty: Option<String>,
32    /// Name of the agent driving this thread.
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub agent_name: Option<String>,
35    /// Lifecycle status. Same enum shape as the parent session.
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub status: Option<super::sessions::SessionStatus>,
38    /// Model the thread runs against.
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub model: Option<crate::types::ModelId>,
41    /// Creation timestamp (RFC3339).
42    #[serde(default, skip_serializing_if = "Option::is_none")]
43    pub created_at: Option<String>,
44}
45
46/// Namespace handle for thread operations on a single session.
47///
48/// Obtained via [`Sessions::threads`](super::sessions::Sessions::threads).
49pub struct Threads<'a> {
50    pub(crate) client: &'a Client,
51    pub(crate) session_id: String,
52}
53
54impl Threads<'_> {
55    /// `GET /v1/sessions/{session_id}/threads`. List the threads in a
56    /// session, including the primary thread (if exposed by the
57    /// server) and any spawned sub-agent threads.
58    pub async fn list(&self) -> Result<Paginated<Thread>> {
59        let path = format!("/v1/sessions/{}/threads", self.session_id);
60        self.client
61            .execute_with_retry(
62                || self.client.request_builder(reqwest::Method::GET, &path),
63                &[MANAGED_AGENTS_BETA],
64            )
65            .await
66    }
67
68    /// Sub-namespace for events on a single thread.
69    #[must_use]
70    pub fn events(&self, thread_id: impl Into<String>) -> ThreadEvents<'_> {
71        ThreadEvents {
72            client: self.client,
73            session_id: self.session_id.clone(),
74            thread_id: thread_id.into(),
75        }
76    }
77}
78
79/// Namespace handle for events on a specific thread.
80pub struct ThreadEvents<'a> {
81    client: &'a Client,
82    session_id: String,
83    thread_id: String,
84}
85
86impl ThreadEvents<'_> {
87    /// `GET /v1/sessions/{session_id}/threads/{thread_id}/events`.
88    pub async fn list(&self) -> Result<Paginated<SessionEvent>> {
89        let path = format!(
90            "/v1/sessions/{}/threads/{}/events",
91            self.session_id, self.thread_id
92        );
93        self.client
94            .execute_with_retry(
95                || self.client.request_builder(reqwest::Method::GET, &path),
96                &[MANAGED_AGENTS_BETA],
97            )
98            .await
99    }
100
101    /// `GET /v1/sessions/{session_id}/threads/{thread_id}/stream`.
102    /// Returns an
103    /// [`EventStream`](crate::managed_agents::events::EventStream)
104    /// scoped to a single thread. Events fired before the stream
105    /// connects are not delivered; pair with [`Self::list`] to seed a
106    /// set of seen IDs.
107    #[cfg(feature = "streaming")]
108    #[cfg_attr(docsrs, doc(cfg(feature = "streaming")))]
109    pub async fn stream(&self) -> Result<crate::managed_agents::events::EventStream> {
110        let path = format!(
111            "/v1/sessions/{}/threads/{}/stream",
112            self.session_id, self.thread_id
113        );
114        let response = self
115            .client
116            .execute_streaming(
117                self.client
118                    .request_builder(reqwest::Method::GET, &path)
119                    .header("accept", "text/event-stream"),
120                &[MANAGED_AGENTS_BETA],
121            )
122            .await?;
123        Ok(crate::managed_agents::events::EventStream::from_response(
124            response,
125        ))
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use pretty_assertions::assert_eq;
133    use serde_json::json;
134    use wiremock::matchers::{method, path};
135    use wiremock::{Mock, MockServer, ResponseTemplate};
136
137    fn client_for(mock: &MockServer) -> Client {
138        Client::builder()
139            .api_key("sk-ant-test")
140            .base_url(mock.uri())
141            .build()
142            .unwrap()
143    }
144
145    #[tokio::test]
146    async fn list_threads_returns_typed_records() {
147        let mock = MockServer::start().await;
148        Mock::given(method("GET"))
149            .and(path("/v1/sessions/sesn_x/threads"))
150            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
151                "data": [
152                    {
153                        "id": "sthr_a",
154                        "type": "session_thread",
155                        "agent_name": "Reviewer",
156                        "status": "running",
157                        "model": "claude-opus-4-7"
158                    }
159                ],
160                "has_more": false
161            })))
162            .mount(&mock)
163            .await;
164
165        let client = client_for(&mock);
166        let page = client
167            .managed_agents()
168            .sessions()
169            .threads("sesn_x")
170            .list()
171            .await
172            .unwrap();
173        assert_eq!(page.data.len(), 1);
174        assert_eq!(page.data[0].agent_name.as_deref(), Some("Reviewer"));
175    }
176
177    #[tokio::test]
178    async fn list_thread_events_returns_typed_session_events() {
179        let mock = MockServer::start().await;
180        Mock::given(method("GET"))
181            .and(path("/v1/sessions/sesn_x/threads/sthr_a/events"))
182            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
183                "data": [
184                    {"type": "agent.message", "id": "sevt_1", "content": [{"type": "text", "text": "hi"}]}
185                ],
186                "has_more": false
187            })))
188            .mount(&mock)
189            .await;
190
191        let client = client_for(&mock);
192        let page = client
193            .managed_agents()
194            .sessions()
195            .threads("sesn_x")
196            .events("sthr_a")
197            .list()
198            .await
199            .unwrap();
200        assert_eq!(page.data.len(), 1);
201    }
202
203    #[cfg(feature = "streaming")]
204    #[tokio::test]
205    async fn stream_thread_yields_typed_events() {
206        use futures_util::StreamExt;
207        use wiremock::matchers::header;
208        let sse = concat!(
209            "event: message\n",
210            "data: {\"type\":\"agent.message\",\"id\":\"sevt_1\",\"content\":[{\"type\":\"text\",\"text\":\"hi\"}]}\n",
211            "\n",
212            "event: message\n",
213            "data: {\"type\":\"session.thread_idle\",\"id\":\"sevt_2\"}\n",
214            "\n",
215        );
216        let mock = MockServer::start().await;
217        Mock::given(method("GET"))
218            .and(path("/v1/sessions/sesn_x/threads/sthr_a/stream"))
219            .and(header("anthropic-beta", "managed-agents-2026-04-01"))
220            .respond_with(
221                ResponseTemplate::new(200)
222                    .insert_header("content-type", "text/event-stream")
223                    .set_body_string(sse),
224            )
225            .mount(&mock)
226            .await;
227
228        let client = client_for(&mock);
229        let mut stream = client
230            .managed_agents()
231            .sessions()
232            .threads("sesn_x")
233            .events("sthr_a")
234            .stream()
235            .await
236            .unwrap();
237        let first = stream.next().await.unwrap().unwrap();
238        let second = stream.next().await.unwrap().unwrap();
239        assert!(first.type_tag().as_deref() == Some("agent.message"));
240        assert!(second.type_tag().as_deref() == Some("session.thread_idle"));
241    }
242}