claude_api/managed_agents/
threads.rs1use serde::{Deserialize, Serialize};
15
16use crate::client::Client;
17use crate::error::Result;
18use crate::pagination::Paginated;
19
20use super::betas;
21use super::events::SessionEvent;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25#[non_exhaustive]
26pub struct Thread {
27 pub id: String,
29 #[serde(rename = "type", default, skip_serializing_if = "Option::is_none")]
31 pub ty: Option<String>,
32 #[serde(default, skip_serializing_if = "Option::is_none")]
34 pub agent_name: Option<String>,
35 #[serde(default, skip_serializing_if = "Option::is_none")]
37 pub status: Option<super::sessions::SessionStatus>,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub model: Option<crate::types::ModelId>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
43 pub created_at: Option<String>,
44}
45
46pub struct Threads<'a> {
50 pub(crate) client: &'a Client,
51 pub(crate) session_id: String,
52 pub(crate) research_preview: bool,
55}
56
57impl Threads<'_> {
58 pub async fn list(&self) -> Result<Paginated<Thread>> {
62 let path = format!("/v1/sessions/{}/threads", self.session_id);
63 self.client
64 .execute_with_retry(
65 || self.client.request_builder(reqwest::Method::GET, &path),
66 betas(self.research_preview),
67 )
68 .await
69 }
70
71 #[must_use]
73 pub fn events(&self, thread_id: impl Into<String>) -> ThreadEvents<'_> {
74 ThreadEvents {
75 client: self.client,
76 session_id: self.session_id.clone(),
77 thread_id: thread_id.into(),
78 research_preview: self.research_preview,
79 }
80 }
81}
82
83pub struct ThreadEvents<'a> {
85 client: &'a Client,
86 session_id: String,
87 thread_id: String,
88 research_preview: bool,
89}
90
91impl ThreadEvents<'_> {
92 pub async fn list(&self) -> Result<Paginated<SessionEvent>> {
94 let path = format!(
95 "/v1/sessions/{}/threads/{}/events",
96 self.session_id, self.thread_id
97 );
98 self.client
99 .execute_with_retry(
100 || self.client.request_builder(reqwest::Method::GET, &path),
101 betas(self.research_preview),
102 )
103 .await
104 }
105
106 #[cfg(feature = "streaming")]
113 #[cfg_attr(docsrs, doc(cfg(feature = "streaming")))]
114 pub async fn stream(&self) -> Result<crate::managed_agents::events::EventStream> {
115 let path = format!(
116 "/v1/sessions/{}/threads/{}/stream",
117 self.session_id, self.thread_id
118 );
119 let response = self
120 .client
121 .execute_streaming(
122 self.client
123 .request_builder(reqwest::Method::GET, &path)
124 .header("accept", "text/event-stream"),
125 betas(self.research_preview),
126 )
127 .await?;
128 Ok(crate::managed_agents::events::EventStream::from_response(
129 response,
130 ))
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use pretty_assertions::assert_eq;
138 use serde_json::json;
139 use wiremock::matchers::{method, path};
140 use wiremock::{Mock, MockServer, ResponseTemplate};
141
142 fn client_for(mock: &MockServer) -> Client {
143 Client::builder()
144 .api_key("sk-ant-test")
145 .base_url(mock.uri())
146 .build()
147 .unwrap()
148 }
149
150 #[tokio::test]
151 async fn list_threads_returns_typed_records() {
152 let mock = MockServer::start().await;
153 Mock::given(method("GET"))
154 .and(path("/v1/sessions/sesn_x/threads"))
155 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
156 "data": [
157 {
158 "id": "sthr_a",
159 "type": "session_thread",
160 "agent_name": "Reviewer",
161 "status": "running",
162 "model": "claude-opus-4-7"
163 }
164 ],
165 "has_more": false
166 })))
167 .mount(&mock)
168 .await;
169
170 let client = client_for(&mock);
171 let page = client
172 .managed_agents()
173 .sessions()
174 .threads("sesn_x")
175 .list()
176 .await
177 .unwrap();
178 assert_eq!(page.data.len(), 1);
179 assert_eq!(page.data[0].agent_name.as_deref(), Some("Reviewer"));
180 }
181
182 #[tokio::test]
183 async fn list_thread_events_returns_typed_session_events() {
184 let mock = MockServer::start().await;
185 Mock::given(method("GET"))
186 .and(path("/v1/sessions/sesn_x/threads/sthr_a/events"))
187 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
188 "data": [
189 {"type": "agent.message", "id": "sevt_1", "content": [{"type": "text", "text": "hi"}]}
190 ],
191 "has_more": false
192 })))
193 .mount(&mock)
194 .await;
195
196 let client = client_for(&mock);
197 let page = client
198 .managed_agents()
199 .sessions()
200 .threads("sesn_x")
201 .events("sthr_a")
202 .list()
203 .await
204 .unwrap();
205 assert_eq!(page.data.len(), 1);
206 }
207
208 #[cfg(feature = "streaming")]
209 #[tokio::test]
210 async fn stream_thread_yields_typed_events() {
211 use futures_util::StreamExt;
212 use wiremock::matchers::header;
213 let sse = concat!(
214 "event: message\n",
215 "data: {\"type\":\"agent.message\",\"id\":\"sevt_1\",\"content\":[{\"type\":\"text\",\"text\":\"hi\"}]}\n",
216 "\n",
217 "event: message\n",
218 "data: {\"type\":\"session.thread_idle\",\"id\":\"sevt_2\"}\n",
219 "\n",
220 );
221 let mock = MockServer::start().await;
222 Mock::given(method("GET"))
223 .and(path("/v1/sessions/sesn_x/threads/sthr_a/stream"))
224 .and(header("anthropic-beta", "managed-agents-2026-04-01"))
225 .respond_with(
226 ResponseTemplate::new(200)
227 .insert_header("content-type", "text/event-stream")
228 .set_body_string(sse),
229 )
230 .mount(&mock)
231 .await;
232
233 let client = client_for(&mock);
234 let mut stream = client
235 .managed_agents()
236 .sessions()
237 .threads("sesn_x")
238 .events("sthr_a")
239 .stream()
240 .await
241 .unwrap();
242 let first = stream.next().await.unwrap().unwrap();
243 let second = stream.next().await.unwrap().unwrap();
244 assert!(first.type_tag().as_deref() == Some("agent.message"));
245 assert!(second.type_tag().as_deref() == Some("session.thread_idle"));
246 }
247}