1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//! Server-Sent Events (SSE) streaming components
use a2a_rs::services::{AsyncA2AClient, StreamItem};
use axum::response::sse::{Event, KeepAlive, Sse};
use futures::StreamExt;
use std::{convert::Infallible, sync::Arc, time::Duration};
use tracing::{error, info, warn};
use crate::WebA2AClient;
/// Create an SSE stream for task updates
///
/// This function handles:
/// - WebSocket streaming if available
/// - Fallback to HTTP polling
/// - Automatic retry logic
/// - Serialization to JSON events
pub fn create_sse_stream(
client: Arc<WebA2AClient>,
task_id: String,
) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
let stream = async_stream::stream! {
// Check if we have a WebSocket client
if let Some(ws_client) = client.websocket() {
info!("Attempting to subscribe to task {} via WebSocket", task_id);
let mut retry_count = 0;
let max_retries = 60; // 60 retries with 1 second delay = 1 minute
loop {
match ws_client.subscribe_to_task(&task_id, Some(50)).await {
Ok(mut event_stream) => {
info!("Successfully subscribed to task {} via WebSocket", task_id);
while let Some(result) = event_stream.next().await {
match result {
Ok(stream_item) => {
let (event_type, event_data) = match &stream_item {
StreamItem::Task(task) => {
match serde_json::to_string(task) {
Ok(json) => ("task-update", json),
Err(e) => {
error!("Failed to serialize task: {}", e);
continue;
}
}
}
StreamItem::StatusUpdate(status) => {
match serde_json::to_string(status) {
Ok(json) => ("task-status", json),
Err(e) => {
error!("Failed to serialize status: {}", e);
continue;
}
}
}
StreamItem::ArtifactUpdate(artifact) => {
match serde_json::to_string(artifact) {
Ok(json) => ("artifact", json),
Err(e) => {
error!("Failed to serialize artifact: {}", e);
continue;
}
}
}
};
yield Ok(Event::default()
.event(event_type)
.data(event_data));
}
Err(e) => {
warn!("Stream error (continuing): {}", e);
continue;
}
}
}
break;
}
Err(e) => {
retry_count += 1;
if retry_count <= max_retries {
if retry_count == 1 {
info!("Task {} not ready yet, will retry", task_id);
}
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
} else {
warn!("Failed to subscribe after {} retries: {}, falling back to polling", max_retries, e);
loop {
match client.http.get_task(&task_id, Some(50)).await {
Ok(task) => {
let task_json = match serde_json::to_string(&task) {
Ok(json) => json,
Err(e) => {
error!("Failed to serialize task: {}", e);
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
};
yield Ok(Event::default()
.event("task-update")
.data(task_json));
}
Err(_) => {
// Task doesn't exist yet, keep polling silently
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
}
}
} else {
// Fallback: Poll for updates every 2 seconds
warn!("WebSocket not available, using polling fallback for task {}", task_id);
loop {
match client.http.get_task(&task_id, Some(50)).await {
Ok(task) => {
let task_json = match serde_json::to_string(&task) {
Ok(json) => json,
Err(e) => {
error!("Failed to serialize task: {}", e);
continue;
}
};
yield Ok(Event::default()
.event("task-update")
.data(task_json));
}
Err(_) => {
// Task doesn't exist yet, keep polling silently
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
};
Sse::new(stream).keep_alive(KeepAlive::default())
}