claude_api/managed_agents/
threads.rs1use serde::{Deserialize, Serialize};
15
16use crate::client::Client;
17use crate::error::Result;
18use crate::pagination::Paginated;
19
20use super::MANAGED_AGENTS_BETA;
21use super::events::SessionEvent;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25#[non_exhaustive]
26pub struct Thread {
27 pub id: String,
29 #[serde(rename = "type", default, skip_serializing_if = "Option::is_none")]
31 pub ty: Option<String>,
32 #[serde(default, skip_serializing_if = "Option::is_none")]
34 pub agent_name: Option<String>,
35 #[serde(default, skip_serializing_if = "Option::is_none")]
37 pub status: Option<super::sessions::SessionStatus>,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub model: Option<crate::types::ModelId>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
43 pub created_at: Option<String>,
44}
45
46pub struct Threads<'a> {
50 pub(crate) client: &'a Client,
51 pub(crate) session_id: String,
52}
53
54impl Threads<'_> {
55 pub async fn list(&self) -> Result<Paginated<Thread>> {
59 let path = format!("/v1/sessions/{}/threads", self.session_id);
60 self.client
61 .execute_with_retry(
62 || self.client.request_builder(reqwest::Method::GET, &path),
63 &[MANAGED_AGENTS_BETA],
64 )
65 .await
66 }
67
68 #[must_use]
70 pub fn events(&self, thread_id: impl Into<String>) -> ThreadEvents<'_> {
71 ThreadEvents {
72 client: self.client,
73 session_id: self.session_id.clone(),
74 thread_id: thread_id.into(),
75 }
76 }
77}
78
79pub struct ThreadEvents<'a> {
81 client: &'a Client,
82 session_id: String,
83 thread_id: String,
84}
85
86impl ThreadEvents<'_> {
87 pub async fn list(&self) -> Result<Paginated<SessionEvent>> {
89 let path = format!(
90 "/v1/sessions/{}/threads/{}/events",
91 self.session_id, self.thread_id
92 );
93 self.client
94 .execute_with_retry(
95 || self.client.request_builder(reqwest::Method::GET, &path),
96 &[MANAGED_AGENTS_BETA],
97 )
98 .await
99 }
100
101 #[cfg(feature = "streaming")]
108 #[cfg_attr(docsrs, doc(cfg(feature = "streaming")))]
109 pub async fn stream(&self) -> Result<crate::managed_agents::events::EventStream> {
110 let path = format!(
111 "/v1/sessions/{}/threads/{}/stream",
112 self.session_id, self.thread_id
113 );
114 let response = self
115 .client
116 .execute_streaming(
117 self.client
118 .request_builder(reqwest::Method::GET, &path)
119 .header("accept", "text/event-stream"),
120 &[MANAGED_AGENTS_BETA],
121 )
122 .await?;
123 Ok(crate::managed_agents::events::EventStream::from_response(
124 response,
125 ))
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use pretty_assertions::assert_eq;
133 use serde_json::json;
134 use wiremock::matchers::{method, path};
135 use wiremock::{Mock, MockServer, ResponseTemplate};
136
137 fn client_for(mock: &MockServer) -> Client {
138 Client::builder()
139 .api_key("sk-ant-test")
140 .base_url(mock.uri())
141 .build()
142 .unwrap()
143 }
144
145 #[tokio::test]
146 async fn list_threads_returns_typed_records() {
147 let mock = MockServer::start().await;
148 Mock::given(method("GET"))
149 .and(path("/v1/sessions/sesn_x/threads"))
150 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
151 "data": [
152 {
153 "id": "sthr_a",
154 "type": "session_thread",
155 "agent_name": "Reviewer",
156 "status": "running",
157 "model": "claude-opus-4-7"
158 }
159 ],
160 "has_more": false
161 })))
162 .mount(&mock)
163 .await;
164
165 let client = client_for(&mock);
166 let page = client
167 .managed_agents()
168 .sessions()
169 .threads("sesn_x")
170 .list()
171 .await
172 .unwrap();
173 assert_eq!(page.data.len(), 1);
174 assert_eq!(page.data[0].agent_name.as_deref(), Some("Reviewer"));
175 }
176
177 #[tokio::test]
178 async fn list_thread_events_returns_typed_session_events() {
179 let mock = MockServer::start().await;
180 Mock::given(method("GET"))
181 .and(path("/v1/sessions/sesn_x/threads/sthr_a/events"))
182 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
183 "data": [
184 {"type": "agent.message", "id": "sevt_1", "content": [{"type": "text", "text": "hi"}]}
185 ],
186 "has_more": false
187 })))
188 .mount(&mock)
189 .await;
190
191 let client = client_for(&mock);
192 let page = client
193 .managed_agents()
194 .sessions()
195 .threads("sesn_x")
196 .events("sthr_a")
197 .list()
198 .await
199 .unwrap();
200 assert_eq!(page.data.len(), 1);
201 }
202
203 #[cfg(feature = "streaming")]
204 #[tokio::test]
205 async fn stream_thread_yields_typed_events() {
206 use futures_util::StreamExt;
207 use wiremock::matchers::header;
208 let sse = concat!(
209 "event: message\n",
210 "data: {\"type\":\"agent.message\",\"id\":\"sevt_1\",\"content\":[{\"type\":\"text\",\"text\":\"hi\"}]}\n",
211 "\n",
212 "event: message\n",
213 "data: {\"type\":\"session.thread_idle\",\"id\":\"sevt_2\"}\n",
214 "\n",
215 );
216 let mock = MockServer::start().await;
217 Mock::given(method("GET"))
218 .and(path("/v1/sessions/sesn_x/threads/sthr_a/stream"))
219 .and(header("anthropic-beta", "managed-agents-2026-04-01"))
220 .respond_with(
221 ResponseTemplate::new(200)
222 .insert_header("content-type", "text/event-stream")
223 .set_body_string(sse),
224 )
225 .mount(&mock)
226 .await;
227
228 let client = client_for(&mock);
229 let mut stream = client
230 .managed_agents()
231 .sessions()
232 .threads("sesn_x")
233 .events("sthr_a")
234 .stream()
235 .await
236 .unwrap();
237 let first = stream.next().await.unwrap().unwrap();
238 let second = stream.next().await.unwrap().unwrap();
239 assert!(first.type_tag().as_deref() == Some("agent.message"));
240 assert!(second.type_tag().as_deref() == Some("session.thread_idle"));
241 }
242}