use crate::jsonrpc::RequestId;
use crate::protocol::{
methods, AccountRateLimitsUpdatedNotification, AgentMessageDeltaNotification,
CmdOutputDeltaNotification, CommandExecutionApprovalParams, ErrorNotification,
FileChangeApprovalParams, FileChangeOutputDeltaNotification, ItemCompletedNotification,
ItemStartedNotification, McpServerStartupStatusUpdatedNotification, ReasoningDeltaNotification,
RemoteControlStatusChangedNotification, ThreadStartedNotification,
ThreadStatusChangedNotification, ThreadTokenUsageUpdatedNotification,
TurnCompletedNotification, TurnStartedNotification,
};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value;
#[derive(Debug, Clone)]
pub enum Notification {
ThreadStarted(ThreadStartedNotification),
ThreadStatusChanged(ThreadStatusChangedNotification),
ThreadTokenUsageUpdated(ThreadTokenUsageUpdatedNotification),
TurnStarted(TurnStartedNotification),
TurnCompleted(TurnCompletedNotification),
ItemStarted(ItemStartedNotification),
ItemCompleted(ItemCompletedNotification),
AgentMessageDelta(AgentMessageDeltaNotification),
CmdOutputDelta(CmdOutputDeltaNotification),
FileChangeOutputDelta(FileChangeOutputDeltaNotification),
ReasoningDelta(ReasoningDeltaNotification),
Error(ErrorNotification),
AccountRateLimitsUpdated(AccountRateLimitsUpdatedNotification),
McpServerStartupStatusUpdated(McpServerStartupStatusUpdatedNotification),
RemoteControlStatusChanged(RemoteControlStatusChangedNotification),
Unknown {
method: String,
params: Option<Value>,
},
}
impl Notification {
pub fn method(&self) -> &str {
match self {
Self::ThreadStarted(_) => methods::THREAD_STARTED,
Self::ThreadStatusChanged(_) => methods::THREAD_STATUS_CHANGED,
Self::ThreadTokenUsageUpdated(_) => methods::THREAD_TOKEN_USAGE_UPDATED,
Self::TurnStarted(_) => methods::TURN_STARTED,
Self::TurnCompleted(_) => methods::TURN_COMPLETED,
Self::ItemStarted(_) => methods::ITEM_STARTED,
Self::ItemCompleted(_) => methods::ITEM_COMPLETED,
Self::AgentMessageDelta(_) => methods::AGENT_MESSAGE_DELTA,
Self::CmdOutputDelta(_) => methods::CMD_OUTPUT_DELTA,
Self::FileChangeOutputDelta(_) => methods::FILE_CHANGE_OUTPUT_DELTA,
Self::ReasoningDelta(_) => methods::REASONING_DELTA,
Self::Error(_) => methods::ERROR,
Self::AccountRateLimitsUpdated(_) => methods::ACCOUNT_RATE_LIMITS_UPDATED,
Self::McpServerStartupStatusUpdated(_) => methods::MCP_SERVER_STARTUP_STATUS_UPDATED,
Self::RemoteControlStatusChanged(_) => methods::REMOTE_CONTROL_STATUS_CHANGED,
Self::Unknown { method, .. } => method,
}
}
pub fn is_unknown(&self) -> bool {
matches!(self, Self::Unknown { .. })
}
pub fn from_envelope(method: &str, params: Option<Value>) -> Result<Self, serde_json::Error> {
let params_value = params.clone().unwrap_or(Value::Null);
match method {
methods::THREAD_STARTED => {
serde_json::from_value(params_value).map(Self::ThreadStarted)
}
methods::THREAD_STATUS_CHANGED => {
serde_json::from_value(params_value).map(Self::ThreadStatusChanged)
}
methods::THREAD_TOKEN_USAGE_UPDATED => {
serde_json::from_value(params_value).map(Self::ThreadTokenUsageUpdated)
}
methods::TURN_STARTED => serde_json::from_value(params_value).map(Self::TurnStarted),
methods::TURN_COMPLETED => {
serde_json::from_value(params_value).map(Self::TurnCompleted)
}
methods::ITEM_STARTED => serde_json::from_value(params_value).map(Self::ItemStarted),
methods::ITEM_COMPLETED => {
serde_json::from_value(params_value).map(Self::ItemCompleted)
}
methods::AGENT_MESSAGE_DELTA => {
serde_json::from_value(params_value).map(Self::AgentMessageDelta)
}
methods::CMD_OUTPUT_DELTA => {
serde_json::from_value(params_value).map(Self::CmdOutputDelta)
}
methods::FILE_CHANGE_OUTPUT_DELTA => {
serde_json::from_value(params_value).map(Self::FileChangeOutputDelta)
}
methods::REASONING_DELTA => {
serde_json::from_value(params_value).map(Self::ReasoningDelta)
}
methods::ERROR => serde_json::from_value(params_value).map(Self::Error),
methods::ACCOUNT_RATE_LIMITS_UPDATED => {
serde_json::from_value(params_value).map(Self::AccountRateLimitsUpdated)
}
methods::MCP_SERVER_STARTUP_STATUS_UPDATED => {
serde_json::from_value(params_value).map(Self::McpServerStartupStatusUpdated)
}
methods::REMOTE_CONTROL_STATUS_CHANGED => {
serde_json::from_value(params_value).map(Self::RemoteControlStatusChanged)
}
_ => Ok(Self::Unknown {
method: method.to_string(),
params,
}),
}
}
pub fn into_envelope(self) -> Result<(String, Option<Value>), serde_json::Error> {
fn pack<T: Serialize>(
method: &str,
v: &T,
) -> Result<(String, Option<Value>), serde_json::Error> {
Ok((method.to_string(), Some(serde_json::to_value(v)?)))
}
match &self {
Self::ThreadStarted(v) => pack(methods::THREAD_STARTED, v),
Self::ThreadStatusChanged(v) => pack(methods::THREAD_STATUS_CHANGED, v),
Self::ThreadTokenUsageUpdated(v) => pack(methods::THREAD_TOKEN_USAGE_UPDATED, v),
Self::TurnStarted(v) => pack(methods::TURN_STARTED, v),
Self::TurnCompleted(v) => pack(methods::TURN_COMPLETED, v),
Self::ItemStarted(v) => pack(methods::ITEM_STARTED, v),
Self::ItemCompleted(v) => pack(methods::ITEM_COMPLETED, v),
Self::AgentMessageDelta(v) => pack(methods::AGENT_MESSAGE_DELTA, v),
Self::CmdOutputDelta(v) => pack(methods::CMD_OUTPUT_DELTA, v),
Self::FileChangeOutputDelta(v) => pack(methods::FILE_CHANGE_OUTPUT_DELTA, v),
Self::ReasoningDelta(v) => pack(methods::REASONING_DELTA, v),
Self::Error(v) => pack(methods::ERROR, v),
Self::AccountRateLimitsUpdated(v) => pack(methods::ACCOUNT_RATE_LIMITS_UPDATED, v),
Self::McpServerStartupStatusUpdated(v) => {
pack(methods::MCP_SERVER_STARTUP_STATUS_UPDATED, v)
}
Self::RemoteControlStatusChanged(v) => pack(methods::REMOTE_CONTROL_STATUS_CHANGED, v),
Self::Unknown { method, params } => Ok((method.clone(), params.clone())),
}
}
}
impl Serialize for Notification {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let (method, params) = self
.clone()
.into_envelope()
.map_err(serde::ser::Error::custom)?;
let mut env = serde_json::Map::new();
env.insert("method".to_string(), Value::String(method));
if let Some(p) = params {
env.insert("params".to_string(), p);
}
Value::Object(env).serialize(serializer)
}
}
impl<'de> Deserialize<'de> for Notification {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let value = Value::deserialize(deserializer)?;
let method = value
.get("method")
.and_then(|v| v.as_str())
.ok_or_else(|| serde::de::Error::missing_field("method"))?
.to_string();
let params = value.get("params").cloned();
Self::from_envelope(&method, params).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone)]
pub enum ServerRequest {
CmdExecApproval(CommandExecutionApprovalParams),
FileChangeApproval(FileChangeApprovalParams),
Unknown {
method: String,
params: Option<Value>,
},
}
impl ServerRequest {
pub fn method(&self) -> &str {
match self {
Self::CmdExecApproval(_) => methods::CMD_EXEC_APPROVAL,
Self::FileChangeApproval(_) => methods::FILE_CHANGE_APPROVAL,
Self::Unknown { method, .. } => method,
}
}
pub fn is_unknown(&self) -> bool {
matches!(self, Self::Unknown { .. })
}
pub fn from_envelope(method: &str, params: Option<Value>) -> Result<Self, serde_json::Error> {
let params_value = params.clone().unwrap_or(Value::Null);
match method {
methods::CMD_EXEC_APPROVAL => {
serde_json::from_value(params_value).map(Self::CmdExecApproval)
}
methods::FILE_CHANGE_APPROVAL => {
serde_json::from_value(params_value).map(Self::FileChangeApproval)
}
_ => Ok(Self::Unknown {
method: method.to_string(),
params,
}),
}
}
}
#[derive(Debug, Clone)]
pub enum ServerMessage {
Notification(Notification),
Request {
id: RequestId,
request: ServerRequest,
},
}
impl ServerMessage {
pub fn is_unknown(&self) -> bool {
match self {
Self::Notification(n) => n.is_unknown(),
Self::Request { request, .. } => request.is_unknown(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_notification_unknown_method_routes_to_unknown_variant() {
let n = Notification::from_envelope("foo/bar", Some(serde_json::json!({"x": 1})))
.expect("unknown methods do not error");
match n {
Notification::Unknown { method, params } => {
assert_eq!(method, "foo/bar");
assert_eq!(params, Some(serde_json::json!({"x": 1})));
}
other => panic!("expected Unknown, got {:?}", other),
}
}
#[test]
fn test_notification_known_method_with_bad_params_errors() {
let err = Notification::from_envelope("thread/started", Some(serde_json::json!({})));
assert!(err.is_err());
}
#[test]
fn test_notification_round_trip_envelope() {
let wire = serde_json::json!({
"method": "item/agentMessage/delta",
"params": {"threadId": "t1", "itemId": "i1", "delta": "hi"},
});
let n: Notification = serde_json::from_value(wire.clone()).unwrap();
assert!(matches!(n, Notification::AgentMessageDelta(_)));
let back = serde_json::to_value(&n).unwrap();
assert_eq!(back, wire);
}
}