use anyhow::{Context, Result, anyhow};
use clap::Args;
use futures_util::StreamExt;
use reqwest::Client;
use reqwest_eventsource::{Event, EventSource};
use systemprompt_agent::models::a2a::jsonrpc::{
JSON_RPC_VERSION_2_0, JsonRpcResponse, Request, RequestId,
};
use systemprompt_agent::models::a2a::protocol::{
MessageSendConfiguration, MessageSendParams, TaskStatusUpdateEvent,
};
use systemprompt_identifiers::{ContextId, MessageId, TaskId};
use systemprompt_logging::CliService;
use systemprompt_models::a2a::{Message, MessageRole, Part, Task, TextPart};
use super::types::MessageOutput;
use crate::CliConfig;
use crate::interactive::resolve_required;
use crate::session::get_or_create_session;
use crate::shared::CommandResult;
#[derive(Debug, Args)]
pub struct MessageArgs {
#[arg(help = "Agent name to send message to (required in non-interactive mode)")]
pub agent: Option<String>,
#[arg(short = 'm', long, help = "Message text to send")]
pub message: Option<String>,
#[arg(
long,
help = "Context ID for conversation continuity (overrides session)"
)]
pub context_id: Option<String>,
#[arg(long, help = "Task ID to continue an existing task")]
pub task_id: Option<String>,
#[arg(long, help = "Gateway URL (overrides profile's api_external_url)")]
pub url: Option<String>,
#[arg(long, help = "Use streaming mode")]
pub stream: bool,
#[arg(long, help = "Wait for task completion (blocking mode)")]
pub blocking: bool,
#[arg(
long,
default_value = "30",
help = "Timeout in seconds for blocking mode"
)]
pub timeout: u64,
#[arg(long, help = "Output full task JSON instead of response text")]
pub json: bool,
}
fn extract_text_from_parts(parts: &[Part]) -> String {
parts
.iter()
.filter_map(|part| match part {
Part::Text(text_part) => Some(text_part.text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("\n")
}
pub async fn execute(
args: MessageArgs,
config: &CliConfig,
) -> Result<CommandResult<MessageOutput>> {
let session_ctx = get_or_create_session(config).await?;
let agent = resolve_required(args.agent, "agent", config, || {
Err(anyhow!("Agent name is required"))
})?;
let message_text = resolve_required(args.message, "message", config, || {
Err(anyhow!("Message text is required. Use -m or --message"))
})?;
let base_url = args
.url
.as_deref()
.unwrap_or(&session_ctx.profile.server.api_external_url);
let agent_url = format!("{}/api/v1/agents/{}", base_url.trim_end_matches('/'), agent);
let context_id: ContextId = args
.context_id
.map_or_else(|| session_ctx.context_id().clone(), ContextId::new);
let auth_token = session_ctx.session_token().as_str();
let task_id: Option<TaskId> = args.task_id.map(TaskId::new);
let message_id = MessageId::generate();
let request_id = RequestId::String(MessageId::generate().to_string());
let method = if args.stream {
"message/stream"
} else {
"message/send"
};
let request = Request {
jsonrpc: JSON_RPC_VERSION_2_0.to_string(),
method: method.to_string(),
params: MessageSendParams {
message: Message {
role: MessageRole::User,
parts: vec![Part::Text(TextPart {
text: message_text.clone(),
})],
message_id,
task_id,
context_id: context_id.clone(),
metadata: None,
extensions: None,
reference_task_ids: None,
},
configuration: args.blocking.then_some(MessageSendConfiguration {
blocking: Some(true),
accepted_output_modes: None,
history_length: None,
push_notification_config: None,
}),
metadata: None,
},
id: request_id,
};
let use_json = args.json;
let result = if args.stream {
execute_streaming(&agent, &agent_url, auth_token, &request, &message_text).await?
} else {
execute_non_streaming(NonStreamingRequest {
agent: &agent,
agent_url: &agent_url,
auth_token,
request: &request,
message_text: &message_text,
timeout: args.timeout,
})
.await?
};
if use_json {
return Ok(result);
}
let output = result.data;
CliService::output(output.response.as_deref().unwrap_or("No response"));
Ok(CommandResult::text(output).with_skip_render())
}
async fn execute_streaming(
agent: &str,
agent_url: &str,
auth_token: &str,
request: &Request<MessageSendParams>,
message_text: &str,
) -> Result<CommandResult<MessageOutput>> {
let client = Client::new();
let http_request = client
.post(agent_url)
.header("Content-Type", "application/json")
.header("Accept", "text/event-stream")
.header("Authorization", format!("Bearer {}", auth_token))
.json(request);
let mut es = EventSource::new(http_request)
.map_err(|e| anyhow!("Failed to create SSE connection: {}", e))?;
let mut final_task: Option<Task> = None;
let mut accumulated_text = String::new();
while let Some(event) = es.next().await {
match event {
Ok(Event::Open) => {
tracing::debug!("SSE connection opened");
},
Ok(Event::Message(message)) => {
match serde_json::from_str::<JsonRpcResponse<TaskStatusUpdateEvent>>(&message.data)
{
Ok(response) => {
if let Some(error) = response.error {
let details = error
.data
.map_or_else(String::new, |d| format!("\n\nDetails: {}", d));
anyhow::bail!(
"Agent returned error ({}): {}{}",
error.code,
error.message,
details
);
}
if let Some(event) = response.result {
if let Some(ref msg) = event.status.message {
let text = extract_text_from_parts(&msg.parts);
if !text.is_empty() {
let _ = std::io::Write::write_all(
&mut std::io::stdout(),
text.as_bytes(),
);
let _ = std::io::Write::flush(&mut std::io::stdout());
accumulated_text.push_str(&text);
}
}
if event.is_final {
CliService::output("");
final_task = Some(Task {
id: event.task_id.into(),
context_id: event.context_id.into(),
status: event.status,
history: None,
artifacts: None,
metadata: None,
created_at: None,
last_modified: None,
});
break;
}
}
},
Err(e) => {
tracing::debug!(error = %e, data = %message.data, "Failed to parse SSE event");
},
}
},
Err(reqwest_eventsource::Error::StreamEnded) => {
tracing::debug!("SSE stream ended");
break;
},
Err(e) => {
anyhow::bail!("SSE stream error: {}", e);
},
}
}
let task = final_task.ok_or_else(|| anyhow!("Stream ended without final task"))?;
let response = if accumulated_text.is_empty() {
task.status
.message
.as_ref()
.map(|msg| extract_text_from_parts(&msg.parts))
} else {
Some(accumulated_text)
};
let output = MessageOutput {
agent: agent.to_string(),
task,
message_sent: message_text.to_string(),
response,
};
Ok(CommandResult::card(output).with_title(format!("Message sent to {}", agent)))
}
struct NonStreamingRequest<'a> {
agent: &'a str,
agent_url: &'a str,
auth_token: &'a str,
request: &'a Request<MessageSendParams>,
message_text: &'a str,
timeout: u64,
}
async fn execute_non_streaming(
params: NonStreamingRequest<'_>,
) -> Result<CommandResult<MessageOutput>> {
let NonStreamingRequest {
agent,
agent_url,
auth_token,
request,
message_text,
timeout,
} = params;
let client = Client::builder()
.timeout(std::time::Duration::from_secs(timeout))
.build()
.context("Failed to create HTTP client")?;
let response = client
.post(agent_url)
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", auth_token))
.json(request)
.send()
.await
.with_context(|| format!("Failed to send message to agent at {}", agent_url))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_else(|_| String::new());
anyhow::bail!("Agent request failed with status {}: {}", status, body);
}
let json_response: JsonRpcResponse<Task> = response
.json()
.await
.context("Failed to parse agent response")?;
if json_response.jsonrpc != JSON_RPC_VERSION_2_0 {
anyhow::bail!(
"Invalid JSON-RPC version: expected {}, got {}",
JSON_RPC_VERSION_2_0,
json_response.jsonrpc
);
}
if let Some(error) = json_response.error {
let details = error
.data
.map_or_else(String::new, |d| format!("\n\nDetails: {}", d));
anyhow::bail!(
"Agent returned error ({}): {}{}",
error.code,
error.message,
details
);
}
let task = json_response
.result
.ok_or_else(|| anyhow!("No result in agent response"))?;
let response = task
.status
.message
.as_ref()
.map(|msg| extract_text_from_parts(&msg.parts));
let output = MessageOutput {
agent: agent.to_string(),
task,
message_sent: message_text.to_string(),
response,
};
Ok(CommandResult::card(output).with_title(format!("Message sent to {}", agent)))
}