use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex, RwLock};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum EventPriority {
Low = 0,
Normal = 1,
High = 2,
Critical = 3,
}
impl Default for EventPriority {
fn default() -> Self {
Self::Normal
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum NotificationLevel {
Info,
Success,
Warning,
Error,
}
impl Default for NotificationLevel {
fn default() -> Self {
Self::Info
}
}
impl std::str::FromStr for NotificationLevel {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"info" => Ok(Self::Info),
"success" => Ok(Self::Success),
"warning" | "warn" => Ok(Self::Warning),
"error" | "err" => Ok(Self::Error),
_ => Err(anyhow!("Invalid notification level: {}", s)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PromptType {
Text { placeholder: Option<String> },
Confirm { default: Option<bool> },
Select {
options: Vec<String>,
default: Option<usize>,
},
MultiSelect {
options: Vec<String>,
defaults: Vec<usize>,
},
Password { placeholder: Option<String> },
FilePath {
filter: Option<String>,
must_exist: bool,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PromptResponse {
Text(String),
Confirm(bool),
Select(usize),
MultiSelect(Vec<usize>),
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RenderContent {
Text(String),
Markdown(String),
Json(serde_json::Value),
Table {
headers: Vec<String>,
rows: Vec<Vec<String>>,
},
Code { language: String, content: String },
Image { data: String, alt: Option<String> },
Thinking {
steps: Vec<String>,
final_answer: Option<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum A2UIEventType {
Notify {
message: String,
level: NotificationLevel,
duration_ms: Option<u64>,
},
Prompt {
id: Uuid,
message: String,
prompt_type: PromptType,
},
Progress {
id: Uuid,
label: String,
current: u64,
total: u64,
message: Option<String>,
},
ProgressComplete { id: Uuid },
Render {
target: Option<String>,
content: RenderContent,
replace: bool,
},
Clear { target: Option<String> },
Highlight {
element: String,
message: Option<String>,
},
Status {
text: String,
section: Option<String>,
},
Modal {
id: Uuid,
title: String,
content: RenderContent,
buttons: Vec<(String, String)>, },
ModalClose { id: Uuid },
Toast {
message: String,
level: NotificationLevel,
duration_ms: u64,
},
AgentStarted {
agent_id: String,
task: Option<String>,
},
AgentCompleted {
agent_id: String,
result: Option<String>,
success: bool,
},
AgentThinking {
agent_id: String,
thought: String,
step: usize,
},
Focus { element: String },
ScrollTo { element: String, position: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct A2UIEvent {
pub id: Uuid,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub source: String,
pub priority: EventPriority,
pub event_type: A2UIEventType,
}
impl A2UIEvent {
pub fn new(source: impl Into<String>, event_type: A2UIEventType) -> Self {
Self {
id: Uuid::new_v4(),
timestamp: chrono::Utc::now(),
source: source.into(),
priority: EventPriority::default(),
event_type,
}
}
pub fn with_priority(mut self, priority: EventPriority) -> Self {
self.priority = priority;
self
}
pub fn notify(source: impl Into<String>, message: impl Into<String>) -> Self {
Self::new(
source,
A2UIEventType::Notify {
message: message.into(),
level: NotificationLevel::Info,
duration_ms: None,
},
)
}
pub fn notify_level(
source: impl Into<String>,
message: impl Into<String>,
level: NotificationLevel,
) -> Self {
Self::new(
source,
A2UIEventType::Notify {
message: message.into(),
level,
duration_ms: None,
},
)
}
pub fn progress(
source: impl Into<String>,
id: Uuid,
label: impl Into<String>,
current: u64,
total: u64,
) -> Self {
Self::new(
source,
A2UIEventType::Progress {
id,
label: label.into(),
current,
total,
message: None,
},
)
}
pub fn prompt_text(source: impl Into<String>, message: impl Into<String>) -> (Self, Uuid) {
let prompt_id = Uuid::new_v4();
(
Self::new(
source,
A2UIEventType::Prompt {
id: prompt_id,
message: message.into(),
prompt_type: PromptType::Text { placeholder: None },
},
),
prompt_id,
)
}
pub fn prompt_confirm(source: impl Into<String>, message: impl Into<String>) -> (Self, Uuid) {
let prompt_id = Uuid::new_v4();
(
Self::new(
source,
A2UIEventType::Prompt {
id: prompt_id,
message: message.into(),
prompt_type: PromptType::Confirm { default: None },
},
),
prompt_id,
)
}
pub fn prompt_select(
source: impl Into<String>,
message: impl Into<String>,
options: Vec<String>,
) -> (Self, Uuid) {
let prompt_id = Uuid::new_v4();
(
Self::new(
source,
A2UIEventType::Prompt {
id: prompt_id,
message: message.into(),
prompt_type: PromptType::Select {
options,
default: None,
},
},
),
prompt_id,
)
}
pub fn render(source: impl Into<String>, content: RenderContent) -> Self {
Self::new(
source,
A2UIEventType::Render {
target: None,
content,
replace: false,
},
)
}
pub fn toast(
source: impl Into<String>,
message: impl Into<String>,
level: NotificationLevel,
duration_ms: u64,
) -> Self {
Self::new(
source,
A2UIEventType::Toast {
message: message.into(),
level,
duration_ms,
},
)
}
pub fn agent_started(source: impl Into<String>, agent_id: impl Into<String>) -> Self {
Self::new(
source,
A2UIEventType::AgentStarted {
agent_id: agent_id.into(),
task: None,
},
)
}
pub fn agent_completed(
source: impl Into<String>,
agent_id: impl Into<String>,
success: bool,
) -> Self {
Self::new(
source,
A2UIEventType::AgentCompleted {
agent_id: agent_id.into(),
result: None,
success,
},
)
}
pub fn agent_thinking(
source: impl Into<String>,
agent_id: impl Into<String>,
thought: impl Into<String>,
step: usize,
) -> Self {
Self::new(
source,
A2UIEventType::AgentThinking {
agent_id: agent_id.into(),
thought: thought.into(),
step,
},
)
}
pub fn status(source: impl Into<String>, text: impl Into<String>) -> Self {
Self::new(
source,
A2UIEventType::Status {
text: text.into(),
section: None,
},
)
}
}
#[derive(Clone)]
pub struct A2UIChannel {
events: Arc<Mutex<VecDeque<A2UIEvent>>>,
responses: Arc<RwLock<std::collections::HashMap<Uuid, PromptResponse>>>,
response_signal: Arc<(Mutex<bool>, Condvar)>,
max_events: usize,
}
impl A2UIChannel {
pub fn new() -> Self {
Self {
events: Arc::new(Mutex::new(VecDeque::new())),
responses: Arc::new(RwLock::new(std::collections::HashMap::new())),
response_signal: Arc::new((Mutex::new(false), Condvar::new())),
max_events: 1000,
}
}
pub fn with_capacity(max_events: usize) -> Self {
Self {
events: Arc::new(Mutex::new(VecDeque::new())),
responses: Arc::new(RwLock::new(std::collections::HashMap::new())),
response_signal: Arc::new((Mutex::new(false), Condvar::new())),
max_events,
}
}
pub fn send(&self, event: A2UIEvent) -> Result<()> {
let mut events = self
.events
.lock()
.map_err(|e| anyhow!("Failed to acquire events lock: {}", e))?;
while events.len() >= self.max_events {
events.pop_front();
}
events.push_back(event);
Ok(())
}
pub fn notify(&self, source: &str, message: &str) -> Result<()> {
self.send(A2UIEvent::notify(source, message))
}
pub fn notify_level(
&self,
source: &str,
message: &str,
level: NotificationLevel,
) -> Result<()> {
self.send(A2UIEvent::notify_level(source, message, level))
}
pub fn progress(
&self,
source: &str,
id: Uuid,
label: &str,
current: u64,
total: u64,
) -> Result<()> {
self.send(A2UIEvent::progress(source, id, label, current, total))
}
pub fn progress_complete(&self, source: &str, id: Uuid) -> Result<()> {
self.send(A2UIEvent::new(
source,
A2UIEventType::ProgressComplete { id },
))
}
pub fn prompt_text(&self, source: &str, message: &str) -> Result<PromptResponse> {
let (event, prompt_id) = A2UIEvent::prompt_text(source, message);
self.send(event)?;
self.wait_for_response(prompt_id)
}
pub fn prompt_confirm(&self, source: &str, message: &str) -> Result<bool> {
let (event, prompt_id) = A2UIEvent::prompt_confirm(source, message);
self.send(event)?;
match self.wait_for_response(prompt_id)? {
PromptResponse::Confirm(v) => Ok(v),
PromptResponse::Cancelled => Ok(false),
_ => Err(anyhow!("Unexpected response type")),
}
}
pub fn prompt_select(
&self,
source: &str,
message: &str,
options: Vec<String>,
) -> Result<PromptResponse> {
let (event, prompt_id) = A2UIEvent::prompt_select(source, message, options);
self.send(event)?;
self.wait_for_response(prompt_id)
}
pub fn render(&self, source: &str, content: RenderContent) -> Result<()> {
self.send(A2UIEvent::render(source, content))
}
pub fn toast(
&self,
source: &str,
message: &str,
level: NotificationLevel,
duration_ms: u64,
) -> Result<()> {
self.send(A2UIEvent::toast(source, message, level, duration_ms))
}
pub fn status(&self, source: &str, text: &str) -> Result<()> {
self.send(A2UIEvent::status(source, text))
}
pub fn receive_all(&self) -> Result<Vec<A2UIEvent>> {
let mut events = self
.events
.lock()
.map_err(|e| anyhow!("Failed to acquire events lock: {}", e))?;
Ok(events.drain(..).collect())
}
pub fn receive(&self, max_count: usize) -> Result<Vec<A2UIEvent>> {
let mut events = self
.events
.lock()
.map_err(|e| anyhow!("Failed to acquire events lock: {}", e))?;
let count = max_count.min(events.len());
Ok(events.drain(..count).collect())
}
pub fn peek(&self) -> Result<Vec<A2UIEvent>> {
let events = self
.events
.lock()
.map_err(|e| anyhow!("Failed to acquire events lock: {}", e))?;
Ok(events.iter().cloned().collect())
}
pub fn has_events(&self) -> Result<bool> {
let events = self
.events
.lock()
.map_err(|e| anyhow!("Failed to acquire events lock: {}", e))?;
Ok(!events.is_empty())
}
pub fn event_count(&self) -> Result<usize> {
let events = self
.events
.lock()
.map_err(|e| anyhow!("Failed to acquire events lock: {}", e))?;
Ok(events.len())
}
pub fn submit_response(&self, prompt_id: Uuid, response: PromptResponse) -> Result<()> {
{
let mut responses = self
.responses
.write()
.map_err(|e| anyhow!("Failed to acquire responses lock: {}", e))?;
responses.insert(prompt_id, response);
}
let (lock, cvar) = &*self.response_signal;
let mut signaled = lock
.lock()
.map_err(|e| anyhow!("Failed to acquire signal lock: {}", e))?;
*signaled = true;
cvar.notify_all();
Ok(())
}
pub fn has_response(&self, prompt_id: Uuid) -> Result<bool> {
let responses = self
.responses
.read()
.map_err(|e| anyhow!("Failed to acquire responses lock: {}", e))?;
Ok(responses.contains_key(&prompt_id))
}
fn wait_for_response(&self, prompt_id: Uuid) -> Result<PromptResponse> {
let timeout = std::time::Duration::from_secs(300); let start = std::time::Instant::now();
loop {
{
let mut responses = self
.responses
.write()
.map_err(|e| anyhow!("Failed to acquire responses lock: {}", e))?;
if let Some(response) = responses.remove(&prompt_id) {
return Ok(response);
}
}
if start.elapsed() > timeout {
return Ok(PromptResponse::Cancelled);
}
let (lock, cvar) = &*self.response_signal;
let signaled = lock
.lock()
.map_err(|e| anyhow!("Failed to acquire signal lock: {}", e))?;
let wait_time = std::time::Duration::from_millis(100);
let (mut guard, _) = cvar
.wait_timeout(signaled, wait_time)
.map_err(|e| anyhow!("Failed to wait on condition: {}", e))?;
*guard = false;
}
}
pub fn clear(&self) -> Result<()> {
let mut events = self
.events
.lock()
.map_err(|e| anyhow!("Failed to acquire events lock: {}", e))?;
events.clear();
Ok(())
}
}
impl Default for A2UIChannel {
fn default() -> Self {
Self::new()
}
}
lazy_static::lazy_static! {
pub static ref A2UI_CHANNEL: A2UIChannel = A2UIChannel::new();
}
pub fn notify(source: &str, message: &str) -> Result<()> {
A2UI_CHANNEL.notify(source, message)
}
pub fn notify_level(source: &str, message: &str, level: NotificationLevel) -> Result<()> {
A2UI_CHANNEL.notify_level(source, message, level)
}
pub fn progress(source: &str, id: Uuid, label: &str, current: u64, total: u64) -> Result<()> {
A2UI_CHANNEL.progress(source, id, label, current, total)
}
pub fn progress_complete(source: &str, id: Uuid) -> Result<()> {
A2UI_CHANNEL.progress_complete(source, id)
}
pub fn prompt_text(source: &str, message: &str) -> Result<PromptResponse> {
A2UI_CHANNEL.prompt_text(source, message)
}
pub fn prompt_confirm(source: &str, message: &str) -> Result<bool> {
A2UI_CHANNEL.prompt_confirm(source, message)
}
pub fn prompt_select(source: &str, message: &str, options: Vec<String>) -> Result<PromptResponse> {
A2UI_CHANNEL.prompt_select(source, message, options)
}
pub fn render(source: &str, content: RenderContent) -> Result<()> {
A2UI_CHANNEL.render(source, content)
}
pub fn toast(
source: &str,
message: &str,
level: NotificationLevel,
duration_ms: u64,
) -> Result<()> {
A2UI_CHANNEL.toast(source, message, level, duration_ms)
}
pub fn status(source: &str, text: &str) -> Result<()> {
A2UI_CHANNEL.status(source, text)
}
pub fn receive_all() -> Result<Vec<A2UIEvent>> {
A2UI_CHANNEL.receive_all()
}
pub fn submit_response(prompt_id: Uuid, response: PromptResponse) -> Result<()> {
A2UI_CHANNEL.submit_response(prompt_id, response)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_notification_level_parse() {
assert_eq!(
"info".parse::<NotificationLevel>().unwrap(),
NotificationLevel::Info
);
assert_eq!(
"success".parse::<NotificationLevel>().unwrap(),
NotificationLevel::Success
);
assert_eq!(
"warning".parse::<NotificationLevel>().unwrap(),
NotificationLevel::Warning
);
assert_eq!(
"warn".parse::<NotificationLevel>().unwrap(),
NotificationLevel::Warning
);
assert_eq!(
"error".parse::<NotificationLevel>().unwrap(),
NotificationLevel::Error
);
}
#[test]
fn test_event_creation() {
let event = A2UIEvent::notify("test_agent", "Hello World");
assert_eq!(event.source, "test_agent");
match event.event_type {
A2UIEventType::Notify { message, level, .. } => {
assert_eq!(message, "Hello World");
assert_eq!(level, NotificationLevel::Info);
}
_ => panic!("Wrong event type"),
}
}
#[test]
fn test_channel_send_receive() {
let channel = A2UIChannel::new();
channel.notify("agent1", "Test 1").unwrap();
channel.notify("agent2", "Test 2").unwrap();
assert_eq!(channel.event_count().unwrap(), 2);
let events = channel.receive_all().unwrap();
assert_eq!(events.len(), 2);
assert_eq!(channel.event_count().unwrap(), 0);
}
#[test]
fn test_channel_capacity() {
let channel = A2UIChannel::with_capacity(3);
for i in 0..5 {
channel.notify("agent", &format!("Message {}", i)).unwrap();
}
assert_eq!(channel.event_count().unwrap(), 3);
}
#[test]
fn test_progress_events() {
let channel = A2UIChannel::new();
let progress_id = Uuid::new_v4();
channel
.progress("agent", progress_id, "Downloading", 50, 100)
.unwrap();
let events = channel.receive_all().unwrap();
assert_eq!(events.len(), 1);
match &events[0].event_type {
A2UIEventType::Progress {
id, current, total, ..
} => {
assert_eq!(*id, progress_id);
assert_eq!(*current, 50);
assert_eq!(*total, 100);
}
_ => panic!("Wrong event type"),
}
}
#[test]
fn test_prompt_response() {
let channel = A2UIChannel::new();
let prompt_id = Uuid::new_v4();
channel
.submit_response(prompt_id, PromptResponse::Text("user input".to_string()))
.unwrap();
let responses = channel.responses.read().unwrap();
assert!(responses.contains_key(&prompt_id));
}
#[test]
fn test_render_content_variants() {
let channel = A2UIChannel::new();
channel
.render("agent", RenderContent::Text("Plain text".to_string()))
.unwrap();
channel
.render("agent", RenderContent::Markdown("# Header".to_string()))
.unwrap();
channel
.render(
"agent",
RenderContent::Table {
headers: vec!["Name".to_string(), "Value".to_string()],
rows: vec![vec!["foo".to_string(), "bar".to_string()]],
},
)
.unwrap();
channel
.render(
"agent",
RenderContent::Code {
language: "rust".to_string(),
content: "fn main() {}".to_string(),
},
)
.unwrap();
assert_eq!(channel.event_count().unwrap(), 4);
}
#[test]
fn test_toast_events() {
let channel = A2UIChannel::new();
channel
.toast("agent", "Quick message", NotificationLevel::Success, 3000)
.unwrap();
let events = channel.receive_all().unwrap();
match &events[0].event_type {
A2UIEventType::Toast {
message,
level,
duration_ms,
} => {
assert_eq!(message, "Quick message");
assert_eq!(*level, NotificationLevel::Success);
assert_eq!(*duration_ms, 3000);
}
_ => panic!("Wrong event type"),
}
}
#[test]
fn test_agent_lifecycle_events() {
let channel = A2UIChannel::new();
channel
.send(A2UIEvent::agent_started("system", "agent1"))
.unwrap();
channel
.send(A2UIEvent::agent_thinking(
"system",
"agent1",
"Analyzing...",
1,
))
.unwrap();
channel
.send(A2UIEvent::agent_completed("system", "agent1", true))
.unwrap();
let events = channel.receive_all().unwrap();
assert_eq!(events.len(), 3);
}
#[test]
fn test_event_priority() {
let event = A2UIEvent::notify("agent", "test").with_priority(EventPriority::Critical);
assert_eq!(event.priority, EventPriority::Critical);
}
#[test]
fn test_global_channel() {
notify("test", "Global test").unwrap();
A2UI_CHANNEL.clear().unwrap();
}
}