use serde::{Deserialize, Serialize};
use crate::client::Client;
use crate::error::Result;
use crate::forward_compat::dispatch_known_or_other;
use crate::pagination::Paginated;
use super::MANAGED_AGENTS_BETA;
#[derive(Debug, Clone, PartialEq)]
pub enum SessionEvent {
Known(KnownSessionEvent),
Other(serde_json::Value),
}
const KNOWN_INCOMING_TAGS: &[&str] = &[
"agent.message",
"agent.thinking",
"agent.tool_use",
"agent.tool_result",
"agent.mcp_tool_use",
"agent.mcp_tool_result",
"agent.custom_tool_use",
"agent.thread_context_compacted",
"agent.thread_message_sent",
"agent.thread_message_received",
"session.status_running",
"session.status_idle",
"session.status_rescheduled",
"session.status_terminated",
"session.deleted",
"session.error",
"session.outcome_evaluated",
"session.thread_created",
"session.thread_idle",
"span.model_request_start",
"span.model_request_end",
"span.outcome_evaluation_start",
"span.outcome_evaluation_ongoing",
"span.outcome_evaluation_end",
"user.message",
"user.interrupt",
"user.custom_tool_result",
"user.tool_confirmation",
"user.define_outcome",
];
impl Serialize for SessionEvent {
fn serialize<S: serde::Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
match self {
Self::Known(k) => k.serialize(s),
Self::Other(v) => v.serialize(s),
}
}
}
impl<'de> Deserialize<'de> for SessionEvent {
fn deserialize<D: serde::Deserializer<'de>>(d: D) -> std::result::Result<Self, D::Error> {
let raw = serde_json::Value::deserialize(d)?;
dispatch_known_or_other(
raw,
KNOWN_INCOMING_TAGS,
SessionEvent::Known,
SessionEvent::Other,
)
.map_err(serde::de::Error::custom)
}
}
impl SessionEvent {
#[must_use]
pub fn known(&self) -> Option<&KnownSessionEvent> {
match self {
Self::Known(k) => Some(k),
Self::Other(_) => None,
}
}
#[must_use]
pub fn type_tag(&self) -> Option<String> {
match self {
Self::Known(k) => serde_json::to_value(k).ok().and_then(|v| {
v.get("type")
.and_then(serde_json::Value::as_str)
.map(String::from)
}),
Self::Other(v) => v
.get("type")
.and_then(serde_json::Value::as_str)
.map(String::from),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type")]
#[non_exhaustive]
pub enum KnownSessionEvent {
#[serde(rename = "agent.message")]
AgentMessage(AgentMessageEvent),
#[serde(rename = "agent.thinking")]
AgentThinking(AgentThinkingEvent),
#[serde(rename = "agent.tool_use")]
AgentToolUse(AgentToolUseEvent),
#[serde(rename = "agent.tool_result")]
AgentToolResult(AgentToolResultEvent),
#[serde(rename = "agent.mcp_tool_use")]
AgentMcpToolUse(AgentMcpToolUseEvent),
#[serde(rename = "agent.mcp_tool_result")]
AgentMcpToolResult(AgentMcpToolResultEvent),
#[serde(rename = "agent.custom_tool_use")]
AgentCustomToolUse(AgentCustomToolUseEvent),
#[serde(rename = "agent.thread_context_compacted")]
AgentThreadContextCompacted(EventEnvelope),
#[serde(rename = "agent.thread_message_sent")]
AgentThreadMessageSent(AgentThreadMessageSentEvent),
#[serde(rename = "agent.thread_message_received")]
AgentThreadMessageReceived(AgentThreadMessageReceivedEvent),
#[serde(rename = "session.status_running")]
SessionStatusRunning(EventEnvelope),
#[serde(rename = "session.status_idle")]
SessionStatusIdle(SessionStatusIdleEvent),
#[serde(rename = "session.status_rescheduled")]
SessionStatusRescheduled(EventEnvelope),
#[serde(rename = "session.status_terminated")]
SessionStatusTerminated(EventEnvelope),
#[serde(rename = "session.deleted")]
SessionDeleted(EventEnvelope),
#[serde(rename = "session.error")]
SessionError(SessionErrorEvent),
#[serde(rename = "session.outcome_evaluated")]
SessionOutcomeEvaluated(EventEnvelope),
#[serde(rename = "session.thread_created")]
SessionThreadCreated(SessionThreadCreatedEvent),
#[serde(rename = "session.thread_idle")]
SessionThreadIdle(EventEnvelope),
#[serde(rename = "span.model_request_start")]
SpanModelRequestStart(EventEnvelope),
#[serde(rename = "span.model_request_end")]
SpanModelRequestEnd(SpanModelRequestEndEvent),
#[serde(rename = "span.outcome_evaluation_start")]
SpanOutcomeEvaluationStart(EventEnvelope),
#[serde(rename = "span.outcome_evaluation_ongoing")]
SpanOutcomeEvaluationOngoing(EventEnvelope),
#[serde(rename = "span.outcome_evaluation_end")]
SpanOutcomeEvaluationEnd(EventEnvelope),
#[serde(rename = "user.message")]
UserMessage(UserMessageEvent),
#[serde(rename = "user.interrupt")]
UserInterrupt(EventEnvelope),
#[serde(rename = "user.custom_tool_result")]
UserCustomToolResult(UserCustomToolResultEvent),
#[serde(rename = "user.tool_confirmation")]
UserToolConfirmation(UserToolConfirmationEvent),
#[serde(rename = "user.define_outcome")]
UserDefineOutcome(UserDefineOutcomeEvent),
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct EventEnvelope {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct AgentMessageEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
pub content: Vec<serde_json::Value>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct AgentThinkingEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
#[serde(default)]
pub thinking: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub signature: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[non_exhaustive]
pub enum AgentEvaluatedPermission {
Allow,
Ask,
Deny,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct AgentToolUseEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
pub name: String,
pub input: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub evaluated_permission: Option<AgentEvaluatedPermission>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_thread_id: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct AgentToolResultEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_use_id: Option<String>,
#[serde(default)]
pub content: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub is_error: Option<bool>,
}
pub type AgentMcpToolUseEvent = AgentToolUseEvent;
pub type AgentMcpToolResultEvent = AgentToolResultEvent;
pub type AgentCustomToolUseEvent = AgentToolUseEvent;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct SessionThreadCreatedEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_thread_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct AgentThreadMessageSentEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub to_thread_id: Option<String>,
#[serde(default)]
pub content: serde_json::Value,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct AgentThreadMessageReceivedEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub from_thread_id: Option<String>,
#[serde(default)]
pub content: serde_json::Value,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct SessionStatusIdleEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub stop_reason: Option<StopReason>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum StopReason {
Known(KnownStopReason),
Other(serde_json::Value),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum KnownStopReason {
EndTurn,
RequiresAction {
event_ids: Vec<String>,
},
StopSequence,
MaxTokens,
}
const KNOWN_STOP_REASON_TAGS: &[&str] =
&["end_turn", "requires_action", "stop_sequence", "max_tokens"];
impl Serialize for StopReason {
fn serialize<S: serde::Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
match self {
Self::Known(k) => k.serialize(s),
Self::Other(v) => v.serialize(s),
}
}
}
impl<'de> Deserialize<'de> for StopReason {
fn deserialize<D: serde::Deserializer<'de>>(d: D) -> std::result::Result<Self, D::Error> {
let raw = serde_json::Value::deserialize(d)?;
dispatch_known_or_other(
raw,
KNOWN_STOP_REASON_TAGS,
StopReason::Known,
StopReason::Other,
)
.map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct SessionErrorEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
#[serde(default)]
pub error: serde_json::Value,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct SpanModelRequestEndEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model_request_start_id: Option<String>,
#[serde(default)]
pub is_error: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model_usage: Option<crate::types::Usage>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct UserMessageEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
pub content: Vec<UserContentBlock>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum UserContentBlock {
Text {
text: String,
},
}
impl UserContentBlock {
#[must_use]
pub fn text(text: impl Into<String>) -> Self {
Self::Text { text: text.into() }
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct UserCustomToolResultEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
pub custom_tool_use_id: String,
pub content: Vec<UserContentBlock>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub is_error: Option<bool>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct UserToolConfirmationEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
pub tool_use_id: String,
pub result: ConfirmationResult,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub deny_message: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum ConfirmationResult {
Allow,
Deny,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct UserDefineOutcomeEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processed_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub outcome_id: Option<String>,
pub description: String,
pub rubric: OutcomeRubric,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_iterations: Option<u32>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum OutcomeRubric {
Text {
content: String,
},
File {
file_id: String,
},
}
#[derive(Debug, Clone, PartialEq, Serialize)]
#[serde(tag = "type")]
#[non_exhaustive]
pub enum OutgoingUserEvent {
#[serde(rename = "user.message")]
Message {
content: Vec<UserContentBlock>,
},
#[serde(rename = "user.interrupt")]
Interrupt {},
#[serde(rename = "user.custom_tool_result")]
CustomToolResult {
custom_tool_use_id: String,
content: Vec<UserContentBlock>,
#[serde(default, skip_serializing_if = "Option::is_none")]
is_error: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
session_thread_id: Option<String>,
},
#[serde(rename = "user.tool_confirmation")]
ToolConfirmation {
tool_use_id: String,
result: ConfirmationResult,
#[serde(default, skip_serializing_if = "Option::is_none")]
deny_message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
session_thread_id: Option<String>,
},
#[serde(rename = "user.define_outcome")]
DefineOutcome(UserDefineOutcomeEvent),
}
impl OutgoingUserEvent {
#[must_use]
pub fn message(text: impl Into<String>) -> Self {
Self::Message {
content: vec![UserContentBlock::text(text)],
}
}
#[must_use]
pub fn interrupt() -> Self {
Self::Interrupt {}
}
#[must_use]
pub fn allow_tool(tool_use_id: impl Into<String>) -> Self {
Self::ToolConfirmation {
tool_use_id: tool_use_id.into(),
result: ConfirmationResult::Allow,
deny_message: None,
session_thread_id: None,
}
}
#[must_use]
pub fn deny_tool(tool_use_id: impl Into<String>, deny_message: impl Into<String>) -> Self {
Self::ToolConfirmation {
tool_use_id: tool_use_id.into(),
result: ConfirmationResult::Deny,
deny_message: Some(deny_message.into()),
session_thread_id: None,
}
}
#[must_use]
pub fn custom_tool_result_text(
custom_tool_use_id: impl Into<String>,
text: impl Into<String>,
) -> Self {
Self::CustomToolResult {
custom_tool_use_id: custom_tool_use_id.into(),
content: vec![UserContentBlock::text(text)],
is_error: None,
session_thread_id: None,
}
}
#[must_use]
pub fn with_session_thread_id(mut self, thread_id: impl Into<String>) -> Self {
let id = thread_id.into();
match &mut self {
Self::ToolConfirmation {
session_thread_id, ..
}
| Self::CustomToolResult {
session_thread_id, ..
} => {
*session_thread_id = Some(id);
}
Self::Message { .. } | Self::Interrupt {} | Self::DefineOutcome(_) => {}
}
self
}
}
#[derive(Debug, Clone, Serialize)]
struct SendEventsRequest<'a> {
events: &'a [OutgoingUserEvent],
}
pub struct Events<'a> {
pub(crate) client: &'a Client,
pub(crate) session_id: String,
}
impl Events<'_> {
pub async fn send(&self, events: &[OutgoingUserEvent]) -> Result<()> {
let path = format!("/v1/sessions/{}/events", self.session_id);
let body = SendEventsRequest { events };
let _: serde_json::Value = self
.client
.execute_with_retry(
|| {
self.client
.request_builder(reqwest::Method::POST, &path)
.json(&body)
},
&[MANAGED_AGENTS_BETA],
)
.await?;
Ok(())
}
pub async fn list(&self) -> Result<Paginated<SessionEvent>> {
let path = format!("/v1/sessions/{}/events", self.session_id);
self.client
.execute_with_retry(
|| self.client.request_builder(reqwest::Method::GET, &path),
&[MANAGED_AGENTS_BETA],
)
.await
}
#[cfg(feature = "streaming")]
#[cfg_attr(docsrs, doc(cfg(feature = "streaming")))]
pub async fn stream(&self) -> Result<EventStream> {
let path = format!("/v1/sessions/{}/stream", self.session_id);
let response = self
.client
.execute_streaming(
self.client
.request_builder(reqwest::Method::GET, &path)
.header("accept", "text/event-stream"),
&[MANAGED_AGENTS_BETA],
)
.await?;
Ok(EventStream::from_response(response))
}
}
#[cfg(feature = "streaming")]
#[cfg_attr(docsrs, doc(cfg(feature = "streaming")))]
pub struct EventStream {
inner: futures_util::stream::BoxStream<'static, Result<SessionEvent>>,
}
#[cfg(feature = "streaming")]
impl EventStream {
pub(crate) fn from_response(response: reqwest::Response) -> Self {
use futures_util::StreamExt;
Self {
inner: crate::sse::into_typed_stream::<SessionEvent>(response).boxed(),
}
}
}
#[cfg(feature = "streaming")]
impl futures_util::Stream for EventStream {
type Item = Result<SessionEvent>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
}
}
#[cfg(feature = "streaming")]
impl std::fmt::Debug for EventStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventStream").finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serde_json::json;
use wiremock::matchers::{body_partial_json, header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn client_for(mock: &MockServer) -> Client {
Client::builder()
.api_key("sk-ant-test")
.base_url(mock.uri())
.build()
.unwrap()
}
#[test]
fn known_agent_message_round_trips() {
let raw = json!({
"type": "agent.message",
"id": "sevt_01",
"processed_at": "2026-04-30T12:00:00Z",
"content": [{"type": "text", "text": "hello"}]
});
let ev: SessionEvent = serde_json::from_value(raw.clone()).unwrap();
match &ev {
SessionEvent::Known(KnownSessionEvent::AgentMessage(m)) => {
assert_eq!(m.id.as_deref(), Some("sevt_01"));
assert_eq!(m.content.len(), 1);
}
other => panic!("expected AgentMessage, got {other:?}"),
}
let back = serde_json::to_value(&ev).unwrap();
assert_eq!(back, raw);
}
#[test]
fn unknown_event_type_falls_through_to_other() {
let raw = json!({
"type": "agent.future_event",
"id": "sevt_99",
"extra": [1, 2, 3]
});
let ev: SessionEvent = serde_json::from_value(raw.clone()).unwrap();
match &ev {
SessionEvent::Other(v) => assert_eq!(v, &raw),
SessionEvent::Known(_) => panic!("expected Other, got Known: {ev:?}"),
}
assert_eq!(serde_json::to_value(&ev).unwrap(), raw);
assert_eq!(ev.type_tag().as_deref(), Some("agent.future_event"));
}
#[test]
fn malformed_known_event_errors() {
let raw = json!({"type": "agent.tool_use", "name": "bash"});
let parsed: std::result::Result<SessionEvent, _> = serde_json::from_value(raw);
assert!(parsed.is_err(), "must not silently fall through to Other");
}
#[test]
fn session_status_idle_with_requires_action_decodes_event_ids() {
let raw = json!({
"type": "session.status_idle",
"id": "sevt_77",
"stop_reason": {
"type": "requires_action",
"event_ids": ["sevt_a", "sevt_b"]
}
});
let ev: SessionEvent = serde_json::from_value(raw).unwrap();
let SessionEvent::Known(KnownSessionEvent::SessionStatusIdle(idle)) = &ev else {
panic!("expected SessionStatusIdle, got {ev:?}");
};
let StopReason::Known(KnownStopReason::RequiresAction { event_ids }) =
idle.stop_reason.as_ref().unwrap()
else {
panic!("expected RequiresAction stop reason");
};
assert_eq!(event_ids, &["sevt_a", "sevt_b"]);
}
#[test]
fn session_status_idle_with_unknown_stop_reason_lands_in_other() {
let raw = json!({
"type": "session.status_idle",
"stop_reason": {"type": "future_reason", "x": 1}
});
let ev: SessionEvent = serde_json::from_value(raw).unwrap();
let SessionEvent::Known(KnownSessionEvent::SessionStatusIdle(idle)) = &ev else {
panic!("expected SessionStatusIdle");
};
match idle.stop_reason.as_ref().unwrap() {
StopReason::Other(v) => assert_eq!(v["type"], "future_reason"),
StopReason::Known(_) => panic!("expected Other stop reason, got Known"),
}
}
#[test]
fn outgoing_user_message_serializes_with_text_block() {
let ev = OutgoingUserEvent::message("hi");
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(
v,
json!({
"type": "user.message",
"content": [{"type": "text", "text": "hi"}]
})
);
}
#[test]
fn outgoing_user_interrupt_serializes_minimal_object() {
let ev = OutgoingUserEvent::interrupt();
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(v, json!({"type": "user.interrupt"}));
}
#[test]
fn outgoing_tool_confirmation_serializes_allow_and_deny() {
let allow = OutgoingUserEvent::allow_tool("sevt_1");
assert_eq!(
serde_json::to_value(&allow).unwrap(),
json!({
"type": "user.tool_confirmation",
"tool_use_id": "sevt_1",
"result": "allow"
})
);
let deny = OutgoingUserEvent::deny_tool("sevt_2", "policy violation");
assert_eq!(
serde_json::to_value(&deny).unwrap(),
json!({
"type": "user.tool_confirmation",
"tool_use_id": "sevt_2",
"result": "deny",
"deny_message": "policy violation"
})
);
}
#[test]
fn session_thread_created_event_decodes_thread_id_and_model() {
let raw = json!({
"type": "session.thread_created",
"id": "sevt_1",
"session_thread_id": "sthr_a",
"model": "claude-opus-4-7"
});
let ev: SessionEvent = serde_json::from_value(raw).unwrap();
let SessionEvent::Known(KnownSessionEvent::SessionThreadCreated(t)) = ev else {
panic!("expected SessionThreadCreated");
};
assert_eq!(t.session_thread_id.as_deref(), Some("sthr_a"));
assert_eq!(t.model.as_deref(), Some("claude-opus-4-7"));
}
#[test]
fn agent_thread_message_sent_event_decodes_to_thread_id() {
let raw = json!({
"type": "agent.thread_message_sent",
"id": "sevt_2",
"to_thread_id": "sthr_b",
"content": [{"type": "text", "text": "delegate"}]
});
let ev: SessionEvent = serde_json::from_value(raw).unwrap();
let SessionEvent::Known(KnownSessionEvent::AgentThreadMessageSent(m)) = ev else {
panic!("expected AgentThreadMessageSent");
};
assert_eq!(m.to_thread_id.as_deref(), Some("sthr_b"));
}
#[test]
fn agent_thread_message_received_event_decodes_from_thread_id() {
let raw = json!({
"type": "agent.thread_message_received",
"id": "sevt_3",
"from_thread_id": "sthr_b",
"content": [{"type": "text", "text": "done"}]
});
let ev: SessionEvent = serde_json::from_value(raw).unwrap();
let SessionEvent::Known(KnownSessionEvent::AgentThreadMessageReceived(m)) = ev else {
panic!("expected AgentThreadMessageReceived");
};
assert_eq!(m.from_thread_id.as_deref(), Some("sthr_b"));
}
#[test]
fn agent_tool_use_event_carries_session_thread_id_when_in_subagent_thread() {
let raw = json!({
"type": "agent.tool_use",
"id": "sevt_4",
"name": "bash",
"input": {"cmd": "ls"},
"session_thread_id": "sthr_b"
});
let ev: SessionEvent = serde_json::from_value(raw).unwrap();
let SessionEvent::Known(KnownSessionEvent::AgentToolUse(t)) = ev else {
panic!("expected AgentToolUse");
};
assert_eq!(t.session_thread_id.as_deref(), Some("sthr_b"));
}
#[test]
fn outgoing_tool_confirmation_with_thread_id_routes_reply() {
let ev = OutgoingUserEvent::allow_tool("sevt_4").with_session_thread_id("sthr_b");
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(v["session_thread_id"], "sthr_b");
assert_eq!(v["type"], "user.tool_confirmation");
}
#[test]
fn outgoing_custom_tool_result_with_thread_id_routes_reply() {
let ev = OutgoingUserEvent::custom_tool_result_text("sevt_5", "ok")
.with_session_thread_id("sthr_c");
let v = serde_json::to_value(&ev).unwrap();
assert_eq!(v["session_thread_id"], "sthr_c");
assert_eq!(v["custom_tool_use_id"], "sevt_5");
}
#[test]
fn outgoing_tool_confirmation_without_thread_id_omits_field() {
let ev = OutgoingUserEvent::allow_tool("sevt_4");
let v = serde_json::to_value(&ev).unwrap();
assert!(v.get("session_thread_id").is_none(), "{v}");
}
#[test]
fn agent_evaluated_permission_round_trips_lowercase() {
for (perm, wire) in [
(AgentEvaluatedPermission::Allow, "allow"),
(AgentEvaluatedPermission::Ask, "ask"),
(AgentEvaluatedPermission::Deny, "deny"),
] {
let v = serde_json::to_value(perm).unwrap();
assert_eq!(v, json!(wire));
let parsed: AgentEvaluatedPermission = serde_json::from_value(v).unwrap();
assert_eq!(parsed, perm);
}
}
#[tokio::test]
async fn events_send_posts_to_events_subpath() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/sessions/sesn_x/events"))
.and(header("anthropic-beta", "managed-agents-2026-04-01"))
.and(body_partial_json(json!({
"events": [
{"type": "user.message", "content": [{"type": "text", "text": "go"}]}
]
})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.mount(&mock)
.await;
let client = client_for(&mock);
client
.managed_agents()
.sessions()
.events("sesn_x")
.send(&[OutgoingUserEvent::message("go")])
.await
.unwrap();
}
#[cfg(feature = "streaming")]
#[tokio::test]
async fn events_stream_yields_typed_session_events() {
use futures_util::StreamExt;
let sse_body = concat!(
"event: message\n",
"data: {\"type\":\"agent.message\",\"id\":\"sevt_1\",\"content\":[{\"type\":\"text\",\"text\":\"hi\"}]}\n",
"\n",
"event: message\n",
"data: {\"type\":\"session.status_idle\",\"id\":\"sevt_2\",\"stop_reason\":{\"type\":\"end_turn\"}}\n",
"\n",
);
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/v1/sessions/sesn_x/stream"))
.and(header("anthropic-beta", "managed-agents-2026-04-01"))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_string(sse_body),
)
.mount(&mock)
.await;
let client = client_for(&mock);
let mut stream = client
.managed_agents()
.sessions()
.events("sesn_x")
.stream()
.await
.unwrap();
let first = stream.next().await.unwrap().unwrap();
assert!(matches!(
first,
SessionEvent::Known(KnownSessionEvent::AgentMessage(_))
));
let second = stream.next().await.unwrap().unwrap();
let SessionEvent::Known(KnownSessionEvent::SessionStatusIdle(idle)) = second else {
panic!("expected SessionStatusIdle");
};
assert!(matches!(
idle.stop_reason,
Some(StopReason::Known(KnownStopReason::EndTurn))
));
}
#[cfg(feature = "streaming")]
#[tokio::test]
async fn events_stream_propagates_unauthorized_response() {
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/v1/sessions/sesn_x/stream"))
.respond_with(
ResponseTemplate::new(401)
.insert_header("request-id", "req_unauth")
.set_body_json(json!({
"type": "error",
"error": {"type": "authentication_error", "message": "bad key"}
})),
)
.mount(&mock)
.await;
let client = client_for(&mock);
let err = client
.managed_agents()
.sessions()
.events("sesn_x")
.stream()
.await
.unwrap_err();
assert_eq!(err.status(), Some(http::StatusCode::UNAUTHORIZED));
assert_eq!(err.request_id(), Some("req_unauth"));
}
#[tokio::test]
async fn events_list_returns_paginated_event_stream() {
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/v1/sessions/sesn_x/events"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"data": [
{"type": "user.message", "content": [{"type": "text", "text": "hi"}]},
{"type": "agent.message", "content": [{"type": "text", "text": "hello"}]}
],
"has_more": false
})))
.mount(&mock)
.await;
let client = client_for(&mock);
let page = client
.managed_agents()
.sessions()
.events("sesn_x")
.list()
.await
.unwrap();
assert_eq!(page.data.len(), 2);
assert!(matches!(
page.data[0],
SessionEvent::Known(KnownSessionEvent::UserMessage(_))
));
assert!(matches!(
page.data[1],
SessionEvent::Known(KnownSessionEvent::AgentMessage(_))
));
}
}