1use crate::ServerConfig;
2use crate::auth_bridge::{RequestContextError, RequestContextExtractor};
3use crate::rest::controllers::ui::{
4 McpUiInitializeParams, McpUiMessageParams, McpUiUpdateModelContextParams,
5 initialize_mcp_ui_bridge, mark_mcp_ui_initialized, message_mcp_ui_bridge,
6 update_mcp_ui_bridge_model_context,
7};
8use crate::ui_protocol::{
9 SUPPORTED_UI_PROTOCOLS, UI_PROTOCOL_CAPABILITIES, normalize_runtime_ui_protocol,
10};
11use adk_core::{RequestContext, SessionId, UserId};
12use axum::{
13 Json,
14 extract::{Path, State},
15 http::{HeaderMap, StatusCode},
16 response::sse::{Event, KeepAlive, Sse},
17};
18use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD};
19use futures::{StreamExt, stream::Stream};
20use serde::{Deserialize, Serialize};
21use serde_json::{Map, Value, json};
22use std::collections::HashMap;
23use std::convert::Infallible;
24use tracing::{Instrument, info, warn};
25use uuid::Uuid;
26
27fn default_streaming_true() -> bool {
28 true
29}
30
31const UI_PROTOCOL_HEADER: &str = "x-adk-ui-protocol";
32const UI_TRANSPORT_HEADER: &str = "x-adk-ui-transport";
33
34#[derive(Clone)]
35pub struct RuntimeController {
36 config: ServerConfig,
37}
38
39impl RuntimeController {
40 pub fn new(config: ServerConfig) -> Self {
41 Self { config }
42 }
43}
44
45#[derive(Serialize, Deserialize, Debug)]
47pub struct Attachment {
48 pub name: String,
49 #[serde(rename = "type")]
50 pub mime_type: String,
51 pub base64: String,
52}
53
54#[derive(Serialize, Deserialize)]
55pub struct RunRequest {
56 pub new_message: String,
57 #[serde(default, alias = "uiProtocol")]
58 pub ui_protocol: Option<String>,
59 #[serde(default)]
60 pub protocol: Option<String>,
61 #[serde(default, alias = "ui_transport")]
62 pub ui_transport: Option<String>,
63 #[serde(default)]
64 pub attachments: Vec<Attachment>,
65}
66
67#[derive(Serialize, Deserialize, Debug)]
69#[serde(rename_all = "camelCase")]
70pub struct RunSseRequest {
71 pub app_name: String,
72 pub user_id: String,
73 pub session_id: String,
74 #[serde(default)]
75 pub new_message: Option<NewMessage>,
76 #[serde(default = "default_streaming_true")]
77 pub streaming: bool,
78 #[serde(default)]
79 pub state_delta: Option<Value>,
80 #[serde(default, alias = "ui_protocol")]
81 pub ui_protocol: Option<String>,
82 #[serde(default)]
83 pub protocol: Option<String>,
84 #[serde(default, alias = "ui_transport")]
85 pub ui_transport: Option<String>,
86 #[serde(default)]
87 pub input: Option<AgUiRunInput>,
88 #[serde(default)]
89 pub ag_ui_input: Option<AgUiRunInput>,
90 #[serde(default)]
91 pub ag_ui_compatibility_event: Option<Value>,
92 #[serde(default)]
93 pub protocol_envelope: Option<Value>,
94 #[serde(default)]
95 pub mcp_apps_request: Option<McpAppsRuntimeEnvelope>,
96 #[serde(default)]
97 pub mcp_apps_initialize: Option<McpAppsRuntimeEnvelope>,
98 #[serde(default)]
99 pub mcp_apps_initialized: Option<Value>,
100 #[serde(default)]
101 pub method: Option<String>,
102 #[serde(default)]
103 pub params: Option<Value>,
104}
105
106#[derive(Serialize, Deserialize, Debug, Clone, Default)]
107#[serde(rename_all = "camelCase")]
108pub struct AgUiInputMessage {
109 #[serde(default)]
110 pub id: Option<String>,
111 #[serde(default)]
112 pub role: Option<String>,
113 #[serde(default)]
114 pub name: Option<String>,
115 #[serde(default)]
116 pub activity_type: Option<String>,
117 #[serde(default)]
118 pub content: Option<Value>,
119 #[serde(default)]
120 pub replace: Option<bool>,
121 #[serde(default)]
122 pub patch: Option<Vec<Value>>,
123}
124
125#[derive(Serialize, Deserialize, Debug, Clone, Default)]
126#[serde(rename_all = "camelCase")]
127pub struct AgUiRunInput {
128 #[serde(default)]
129 pub thread_id: Option<String>,
130 #[serde(default)]
131 pub run_id: Option<String>,
132 #[serde(default)]
133 pub parent_run_id: Option<String>,
134 #[serde(default)]
135 pub state: Option<Value>,
136 #[serde(default)]
137 pub messages: Vec<AgUiInputMessage>,
138 #[serde(default)]
139 pub tools: Vec<Value>,
140 #[serde(default)]
141 pub context: Vec<Value>,
142 #[serde(default)]
143 pub forwarded_props: Option<Value>,
144}
145
146#[derive(Serialize, Deserialize, Debug, Clone)]
147#[serde(rename_all = "camelCase")]
148pub struct McpAppsRuntimeEnvelope {
149 pub method: String,
150 #[serde(default)]
151 pub params: Option<Value>,
152}
153
154#[derive(Serialize, Deserialize, Debug, Clone)]
155pub struct NewMessage {
156 pub role: String,
157 pub parts: Vec<MessagePart>,
158}
159
160#[derive(Serialize, Deserialize, Debug, Clone)]
161pub struct MessagePart {
162 #[serde(default)]
163 pub text: Option<String>,
164 #[serde(default, rename = "inlineData")]
165 pub inline_data: Option<InlineData>,
166}
167
168#[derive(Serialize, Deserialize, Debug, Clone)]
169#[serde(rename_all = "camelCase")]
170pub struct InlineData {
171 pub display_name: Option<String>,
172 pub data: String,
173 pub mime_type: String,
174}
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177enum UiProfile {
178 AdkUi,
179 A2ui,
180 AgUi,
181 McpApps,
182}
183
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185enum UiTransportMode {
186 LegacyWrapper,
187 ProtocolNative,
188}
189
190impl UiProfile {
191 fn as_str(self) -> &'static str {
192 match self {
193 Self::AdkUi => "adk_ui",
194 Self::A2ui => "a2ui",
195 Self::AgUi => "ag_ui",
196 Self::McpApps => "mcp_apps",
197 }
198 }
199}
200
201type RuntimeError = (StatusCode, String);
202
203fn adk_err_to_runtime(err: adk_core::AdkError) -> RuntimeError {
209 let status =
210 StatusCode::from_u16(err.http_status_code()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
211 let body = err.to_problem_json().to_string();
212 (status, body)
213}
214
215fn parse_ui_profile(raw: &str) -> Option<UiProfile> {
216 match normalize_runtime_ui_protocol(raw)? {
217 "adk_ui" => Some(UiProfile::AdkUi),
218 "a2ui" => Some(UiProfile::A2ui),
219 "ag_ui" => Some(UiProfile::AgUi),
220 "mcp_apps" => Some(UiProfile::McpApps),
221 _ => None,
222 }
223}
224
225fn resolve_ui_profile(
226 headers: &HeaderMap,
227 body_ui_protocol: Option<&str>,
228) -> Result<UiProfile, RuntimeError> {
229 let header_value = headers.get(UI_PROTOCOL_HEADER).and_then(|v| v.to_str().ok());
230 let candidate = header_value.or(body_ui_protocol);
231
232 let Some(raw) = candidate else {
233 return Ok(UiProfile::AdkUi);
234 };
235
236 parse_ui_profile(raw).ok_or_else(|| {
237 let supported = SUPPORTED_UI_PROTOCOLS.join(", ");
238 warn!(
239 requested = %raw,
240 header = %UI_PROTOCOL_HEADER,
241 "unsupported ui protocol requested"
242 );
243 (
244 StatusCode::BAD_REQUEST,
245 format!("Unsupported ui protocol '{}'. Supported profiles: {}", raw, supported),
246 )
247 })
248}
249
250fn parse_ui_transport(raw: &str) -> Option<UiTransportMode> {
251 match raw.trim().to_ascii_lowercase().as_str() {
252 "legacy" | "legacy_wrapper" => Some(UiTransportMode::LegacyWrapper),
253 "native" | "protocol_native" => Some(UiTransportMode::ProtocolNative),
254 _ => None,
255 }
256}
257
258fn resolve_ui_transport(
259 headers: &HeaderMap,
260 body_ui_transport: Option<&str>,
261) -> Result<UiTransportMode, RuntimeError> {
262 let header_value = headers.get(UI_TRANSPORT_HEADER).and_then(|v| v.to_str().ok());
263 let candidate = header_value.or(body_ui_transport);
264
265 let Some(raw) = candidate else {
266 return Ok(UiTransportMode::LegacyWrapper);
267 };
268
269 parse_ui_transport(raw).ok_or_else(|| {
270 warn!(
271 requested = %raw,
272 header = %UI_TRANSPORT_HEADER,
273 "unsupported ui transport requested"
274 );
275 (
276 StatusCode::BAD_REQUEST,
277 format!(
278 "Unsupported ui transport '{}'. Supported values: legacy_wrapper, protocol_native",
279 raw
280 ),
281 )
282 })
283}
284
285fn validate_transport_support(
286 profile: UiProfile,
287 transport: UiTransportMode,
288) -> Result<(), RuntimeError> {
289 if transport == UiTransportMode::ProtocolNative && profile != UiProfile::AgUi {
290 return Err((
291 StatusCode::BAD_REQUEST,
292 "protocol_native transport is currently available only for ag_ui; use the MCP Apps bridge endpoints for mcp_apps".to_string(),
293 ));
294 }
295 Ok(())
296}
297
298fn protocol_from_envelope(envelope: &Value) -> Option<&str> {
299 envelope.as_object().and_then(|object| object.get("protocol")).and_then(|value| value.as_str())
300}
301
302fn serialize_runtime_event(event: &adk_core::Event, profile: UiProfile) -> Option<String> {
303 if profile == UiProfile::AdkUi {
304 return serde_json::to_string(event).ok();
305 }
306
307 serde_json::to_string(&json!({
308 "ui_protocol": profile.as_str(),
309 "event": event
310 }))
311 .ok()
312}
313
314fn infer_sse_request_protocol(req: &RunSseRequest) -> Option<&str> {
315 req.ui_protocol
316 .as_deref()
317 .or(req.protocol.as_deref())
318 .or_else(|| req.protocol_envelope.as_ref().and_then(protocol_from_envelope))
319 .or_else(|| req.ag_ui_input.as_ref().map(|_| "ag_ui"))
320 .or_else(|| req.input.as_ref().map(|_| "ag_ui"))
321 .or_else(|| req.mcp_apps_request.as_ref().map(|_| "mcp_apps"))
322 .or_else(|| req.mcp_apps_initialize.as_ref().map(|_| "mcp_apps"))
323}
324
325fn infer_run_request_protocol(req: &RunRequest) -> Option<&str> {
326 req.ui_protocol.as_deref().or(req.protocol.as_deref())
327}
328
329fn ag_ui_input_from_request(req: &RunSseRequest) -> Option<AgUiRunInput> {
330 req.ag_ui_input.clone().or_else(|| req.input.clone()).or_else(|| {
331 let envelope = req.protocol_envelope.as_ref()?;
332 if protocol_from_envelope(envelope)? != "ag_ui" {
333 return None;
334 }
335 envelope
336 .as_object()
337 .and_then(|object| object.get("input"))
338 .and_then(|value| serde_json::from_value(value.clone()).ok())
339 })
340}
341
342fn mcp_apps_request_from_request(req: &RunSseRequest) -> Option<McpAppsRuntimeEnvelope> {
343 req.mcp_apps_request.clone().or_else(|| {
344 if let Some(method) = req.method.clone() {
345 return Some(McpAppsRuntimeEnvelope { method, params: req.params.clone() });
346 }
347 let envelope = req.protocol_envelope.as_ref()?;
348 if protocol_from_envelope(envelope)? != "mcp_apps" {
349 return None;
350 }
351 let object = envelope.as_object()?;
352 let method = object.get("method")?.as_str()?.to_string();
353 let params = object.get("params").cloned();
354 Some(McpAppsRuntimeEnvelope { method, params })
355 })
356}
357
358fn mcp_apps_initialize_from_request(req: &RunSseRequest) -> Option<McpAppsRuntimeEnvelope> {
359 req.mcp_apps_initialize.clone()
360}
361
362fn extract_text_segments(value: &Value) -> Vec<String> {
363 match value {
364 Value::String(text) => {
365 let trimmed = text.trim();
366 if trimmed.is_empty() { vec![] } else { vec![trimmed.to_string()] }
367 }
368 Value::Array(items) => items
369 .iter()
370 .flat_map(|item| {
371 if let Some(text) = item
372 .as_object()
373 .and_then(|object| object.get("text"))
374 .and_then(|text| text.as_str())
375 {
376 let trimmed = text.trim();
377 if !trimmed.is_empty() {
378 return vec![trimmed.to_string()];
379 }
380 }
381 vec![]
382 })
383 .collect(),
384 Value::Object(object) => object
385 .get("text")
386 .and_then(|text| text.as_str())
387 .map(|text| text.trim().to_string())
388 .filter(|text| !text.is_empty())
389 .into_iter()
390 .collect(),
391 _ => vec![],
392 }
393}
394
395fn new_message_from_ag_ui_input(input: &AgUiRunInput) -> Option<NewMessage> {
396 let selected = input
397 .messages
398 .iter()
399 .rev()
400 .find(|message| message.role.as_deref().unwrap_or("user") == "user")
401 .or_else(|| input.messages.last())?;
402
403 let content = selected.content.as_ref()?;
404 let parts: Vec<MessagePart> = extract_text_segments(content)
405 .into_iter()
406 .map(|text| MessagePart { text: Some(text), inline_data: None })
407 .collect();
408 if parts.is_empty() {
409 return None;
410 }
411
412 Some(NewMessage { role: selected.role.clone().unwrap_or_else(|| "user".to_string()), parts })
413}
414
415fn activity_content_snapshot(value: Option<&Value>) -> Value {
416 match value.cloned() {
417 Some(Value::Object(object)) => Value::Object(object),
418 Some(other) => json!({ "value": other }),
419 None => json!({}),
420 }
421}
422
423fn activity_message_id(message: &AgUiInputMessage) -> String {
424 message.id.clone().unwrap_or_else(|| format!("activity-{}", Uuid::new_v4()))
425}
426
427fn activity_message_type(message: &AgUiInputMessage) -> String {
428 message
429 .activity_type
430 .clone()
431 .or_else(|| message.name.clone())
432 .unwrap_or_else(|| "CUSTOM".to_string())
433}
434
435fn activity_events_from_ag_ui_input(input: &AgUiRunInput) -> Vec<Value> {
436 input
437 .messages
438 .iter()
439 .filter(|message| message.role.as_deref() == Some("activity"))
440 .map(|message| {
441 let timestamp = chrono::Utc::now().timestamp_millis().max(0) as u64;
442 let message_id = activity_message_id(message);
443 let activity_type = activity_message_type(message);
444 if let Some(patch) = &message.patch {
445 json!({
446 "type": "ACTIVITY_DELTA",
447 "messageId": message_id,
448 "activityType": activity_type,
449 "patch": patch,
450 "timestamp": timestamp,
451 })
452 } else {
453 let mut event = json!({
454 "type": "ACTIVITY_SNAPSHOT",
455 "messageId": message_id,
456 "activityType": activity_type,
457 "content": activity_content_snapshot(message.content.as_ref()),
458 "timestamp": timestamp,
459 });
460 if let Some(replace) = message.replace
461 && let Some(object) = event.as_object_mut()
462 {
463 object.insert("replace".to_string(), Value::Bool(replace));
464 }
465 event
466 }
467 })
468 .collect()
469}
470
471fn messages_snapshot_from_ag_ui_input(input: &AgUiRunInput) -> Option<Value> {
472 if input.messages.is_empty() {
473 return None;
474 }
475
476 let filtered: Vec<AgUiInputMessage> = input
477 .messages
478 .iter()
479 .filter(|message| !(message.role.as_deref() == Some("activity") && message.patch.is_some()))
480 .cloned()
481 .collect();
482 if filtered.is_empty() {
483 return None;
484 }
485
486 serde_json::to_value(filtered).ok()
487}
488
489fn object_entries_to_state_delta(object: &Map<String, Value>) -> HashMap<String, Value> {
490 object.iter().map(|(key, value)| (key.clone(), value.clone())).collect()
491}
492
493fn ag_ui_state_delta(input: &AgUiRunInput) -> HashMap<String, Value> {
494 let mut delta = HashMap::new();
495
496 if let Some(state) = input.state.clone() {
497 match state {
498 Value::Object(object) => {
499 delta.extend(object_entries_to_state_delta(&object));
500 }
501 value => {
502 delta.insert("temp:ag_ui_state".to_string(), value);
503 }
504 }
505 }
506
507 if !input.messages.is_empty()
508 && let Ok(value) = serde_json::to_value(&input.messages)
509 {
510 delta.insert("temp:ag_ui_messages".to_string(), value);
511 }
512 if !input.tools.is_empty() {
513 delta.insert("temp:ag_ui_tools".to_string(), Value::Array(input.tools.clone()));
514 }
515 if !input.context.is_empty() {
516 delta.insert("temp:ag_ui_context".to_string(), Value::Array(input.context.clone()));
517 }
518 if let Some(forwarded_props) = input.forwarded_props.clone() {
519 delta.insert("temp:ag_ui_forwarded_props".to_string(), forwarded_props);
520 }
521
522 delta
523}
524
525fn body_state_delta(value: Option<&Value>) -> Result<HashMap<String, Value>, RuntimeError> {
526 let Some(value) = value else {
527 return Ok(HashMap::new());
528 };
529 let object = value.as_object().ok_or_else(|| {
530 (StatusCode::BAD_REQUEST, "stateDelta must be a JSON object when provided".to_string())
531 })?;
532 Ok(object_entries_to_state_delta(object))
533}
534
535fn log_profile_deprecation(profile: UiProfile) {
536 if profile != UiProfile::AdkUi {
537 return;
538 }
539 let Some(spec) = UI_PROTOCOL_CAPABILITIES
540 .iter()
541 .find(|capability| capability.protocol == profile.as_str())
542 .and_then(|capability| capability.deprecation)
543 else {
544 return;
545 };
546
547 warn!(
548 protocol = %profile.as_str(),
549 stage = %spec.stage,
550 announced_on = %spec.announced_on,
551 sunset_target_on = ?spec.sunset_target_on,
552 replacements = ?spec.replacement_protocols,
553 "legacy ui protocol profile selected"
554 );
555}
556
557fn build_content_with_attachments(
559 text: &str,
560 attachments: &[Attachment],
561) -> Result<adk_core::Content, RuntimeError> {
562 let mut content = adk_core::Content::new("user");
563
564 content.parts.push(adk_core::Part::Text { text: text.to_string() });
566
567 for attachment in attachments {
569 match BASE64_STANDARD.decode(&attachment.base64) {
570 Ok(data) => {
571 if data.len() > adk_core::MAX_INLINE_DATA_SIZE {
572 return Err((
573 StatusCode::PAYLOAD_TOO_LARGE,
574 format!(
575 "Attachment '{}' exceeds max inline size of {} bytes",
576 attachment.name,
577 adk_core::MAX_INLINE_DATA_SIZE
578 ),
579 ));
580 }
581 content.parts.push(adk_core::Part::InlineData {
582 mime_type: attachment.mime_type.clone(),
583 data,
584 });
585 }
586 Err(e) => {
587 return Err((
588 StatusCode::BAD_REQUEST,
589 format!("Invalid base64 data for attachment '{}': {}", attachment.name, e),
590 ));
591 }
592 }
593 }
594
595 Ok(content)
596}
597
598fn build_content_from_parts(parts: &[MessagePart]) -> Result<adk_core::Content, RuntimeError> {
600 let mut content = adk_core::Content::new("user");
601
602 for part in parts {
603 if let Some(text) = &part.text {
605 content.parts.push(adk_core::Part::Text { text: text.clone() });
606 }
607
608 if let Some(inline_data) = &part.inline_data {
610 match BASE64_STANDARD.decode(&inline_data.data) {
611 Ok(data) => {
612 if data.len() > adk_core::MAX_INLINE_DATA_SIZE {
613 return Err((
614 StatusCode::PAYLOAD_TOO_LARGE,
615 format!(
616 "inline_data exceeds max inline size of {} bytes",
617 adk_core::MAX_INLINE_DATA_SIZE
618 ),
619 ));
620 }
621 content.parts.push(adk_core::Part::InlineData {
622 mime_type: inline_data.mime_type.clone(),
623 data,
624 });
625 }
626 Err(e) => {
627 return Err((
628 StatusCode::BAD_REQUEST,
629 format!("Invalid base64 data in inline_data: {}", e),
630 ));
631 }
632 }
633 }
634 }
635
636 Ok(content)
637}
638
639async fn apply_state_delta_to_session(
640 session_service: &std::sync::Arc<dyn adk_session::SessionService>,
641 app_name: &str,
642 user_id: &str,
643 session_id: &str,
644 state_delta: HashMap<String, Value>,
645) -> Result<(), RuntimeError> {
646 if state_delta.is_empty() {
647 return Ok(());
648 }
649
650 let identity = adk_core::AdkIdentity::new(
651 adk_core::AppName::try_from(app_name).map_err(|error| {
652 (
653 StatusCode::BAD_REQUEST,
654 format!("invalid app_name for state delta application: {}", error),
655 )
656 })?,
657 adk_core::UserId::try_from(user_id).map_err(|error| {
658 (
659 StatusCode::BAD_REQUEST,
660 format!("invalid user_id for state delta application: {}", error),
661 )
662 })?,
663 adk_core::SessionId::try_from(session_id).map_err(|error| {
664 (
665 StatusCode::BAD_REQUEST,
666 format!("invalid session_id for state delta application: {}", error),
667 )
668 })?,
669 );
670
671 let mut event = adk_core::Event::new(format!("ui-input-{}", Uuid::new_v4()));
672 event.author = "ui_protocol_bridge".to_string();
673 event.actions.state_delta = state_delta;
674 session_service
675 .append_event_for_identity(adk_session::AppendEventRequest { identity, event })
676 .await
677 .map_err(adk_err_to_runtime)
678}
679
680fn merge_runtime_state_delta(
681 body_delta: HashMap<String, Value>,
682 ag_ui_delta: HashMap<String, Value>,
683) -> HashMap<String, Value> {
684 let mut merged = body_delta;
685 merged.extend(ag_ui_delta);
686 merged
687}
688
689fn json_pointer_escape(segment: &str) -> String {
690 segment.replace('~', "~0").replace('/', "~1")
691}
692
693fn state_delta_to_json_patch(delta: &HashMap<String, Value>) -> Vec<Value> {
694 delta
695 .iter()
696 .map(|(key, value)| {
697 json!({
698 "op": "add",
699 "path": format!("/{}", json_pointer_escape(key)),
700 "value": value
701 })
702 })
703 .collect()
704}
705
706fn timestamp_millis(event: &adk_core::Event) -> u64 {
707 event.timestamp.timestamp_millis().max(0) as u64
708}
709
710fn serialize_ag_ui_tool_call_delta(args: &Value, allow_raw_string_delta: bool) -> String {
711 if allow_raw_string_delta && let Value::String(delta) = args {
712 return delta.clone();
713 }
714
715 serde_json::to_string(args).unwrap_or_else(|_| args.to_string())
716}
717
718fn translate_ag_ui_event(event: &adk_core::Event, thread_id: &str, run_id: &str) -> Vec<Value> {
719 let mut translated = Vec::new();
720 let timestamp = timestamp_millis(event);
721 let is_partial = event.llm_response.partial;
722
723 if !event.actions.state_delta.is_empty() {
724 translated.push(json!({
725 "type": "STATE_DELTA",
726 "delta": state_delta_to_json_patch(&event.actions.state_delta),
727 "timestamp": timestamp,
728 }));
729 }
730
731 if let Some(message) = event.llm_response.error_message.clone() {
732 translated.push(json!({
733 "type": "RUN_ERROR",
734 "threadId": thread_id,
735 "runId": run_id,
736 "message": message,
737 "code": event.llm_response.error_code,
738 "timestamp": timestamp,
739 }));
740 }
741
742 let Some(content) = &event.llm_response.content else {
743 return translated;
744 };
745
746 for (index, part) in content.parts.iter().enumerate() {
747 match part {
748 adk_core::Part::Text { text } if !text.trim().is_empty() => {
749 let message_id = format!("{}-text-{}", event.id, index);
750 if is_partial {
751 translated.push(json!({
752 "type": "TEXT_MESSAGE_CHUNK",
753 "messageId": message_id,
754 "role": "assistant",
755 "delta": text,
756 "timestamp": timestamp,
757 }));
758 } else {
759 translated.push(json!({
760 "type": "TEXT_MESSAGE_START",
761 "messageId": message_id,
762 "role": "assistant",
763 "timestamp": timestamp,
764 }));
765 translated.push(json!({
766 "type": "TEXT_MESSAGE_CONTENT",
767 "messageId": format!("{}-text-{}", event.id, index),
768 "delta": text,
769 "timestamp": timestamp,
770 }));
771 translated.push(json!({
772 "type": "TEXT_MESSAGE_END",
773 "messageId": format!("{}-text-{}", event.id, index),
774 "timestamp": timestamp,
775 }));
776 }
777 }
778 adk_core::Part::Thinking { thinking, .. } if !thinking.trim().is_empty() => {
779 let message_id = format!("{}-reasoning-{}", event.id, index);
780 if is_partial {
781 translated.push(json!({
782 "type": "REASONING_MESSAGE_CHUNK",
783 "messageId": message_id,
784 "delta": thinking,
785 "timestamp": timestamp,
786 }));
787 } else {
788 let reasoning_id = format!("{}-reasoning-phase-{}", event.id, index);
789 translated.push(json!({
790 "type": "REASONING_START",
791 "messageId": reasoning_id,
792 "timestamp": timestamp,
793 }));
794 translated.push(json!({
795 "type": "REASONING_MESSAGE_START",
796 "messageId": message_id,
797 "role": "assistant",
798 "timestamp": timestamp,
799 }));
800 translated.push(json!({
801 "type": "REASONING_MESSAGE_CONTENT",
802 "messageId": format!("{}-reasoning-{}", event.id, index),
803 "delta": thinking,
804 "timestamp": timestamp,
805 }));
806 translated.push(json!({
807 "type": "REASONING_MESSAGE_END",
808 "messageId": format!("{}-reasoning-{}", event.id, index),
809 "timestamp": timestamp,
810 }));
811 translated.push(json!({
812 "type": "REASONING_END",
813 "messageId": reasoning_id,
814 "timestamp": timestamp,
815 }));
816 }
817 }
818 adk_core::Part::FunctionCall { name, args, id, .. } => {
819 let tool_call_id =
820 id.clone().unwrap_or_else(|| format!("{}-tool-call-{}", event.id, index));
821 let raw_chunk_supported = is_partial && matches!(args, Value::String(_));
822 let args_delta = serialize_ag_ui_tool_call_delta(args, raw_chunk_supported);
823 if raw_chunk_supported {
824 translated.push(json!({
825 "type": "TOOL_CALL_CHUNK",
826 "toolCallId": tool_call_id,
827 "toolCallName": name,
828 "delta": args_delta,
829 "timestamp": timestamp,
830 }));
831 } else {
832 translated.push(json!({
833 "type": "TOOL_CALL_START",
834 "toolCallId": tool_call_id,
835 "toolCallName": name,
836 "timestamp": timestamp,
837 }));
838 translated.push(json!({
839 "type": "TOOL_CALL_ARGS",
840 "toolCallId": id.clone().unwrap_or_else(|| format!("{}-tool-call-{}", event.id, index)),
841 "delta": args_delta,
842 "timestamp": timestamp,
843 }));
844 translated.push(json!({
845 "type": "TOOL_CALL_END",
846 "toolCallId": id.clone().unwrap_or_else(|| format!("{}-tool-call-{}", event.id, index)),
847 "timestamp": timestamp,
848 }));
849 }
850 }
851 adk_core::Part::FunctionResponse { function_response, id } => {
852 let tool_call_id =
853 id.clone().unwrap_or_else(|| format!("{}-tool-result-{}", event.id, index));
854 let response_content = serde_json::to_string(&function_response.response)
855 .unwrap_or_else(|_| function_response.response.to_string());
856 translated.push(json!({
857 "type": "TOOL_CALL_RESULT",
858 "messageId": format!("msg-{}", tool_call_id),
859 "toolCallId": tool_call_id,
860 "toolCallName": function_response.name,
861 "content": response_content,
862 "role": "tool",
863 "timestamp": timestamp,
864 }));
865 }
866 _ => {}
867 }
868 }
869
870 translated
871}
872
873async fn extract_request_context(
879 extractor: Option<&dyn RequestContextExtractor>,
880 headers: &HeaderMap,
881) -> Result<Option<RequestContext>, RuntimeError> {
882 let Some(extractor) = extractor else {
883 return Ok(None);
884 };
885
886 let mut builder = axum::http::Request::builder();
888 for (name, value) in headers {
889 builder = builder.header(name, value);
890 }
891 let (parts, _) = builder
892 .body(())
893 .map_err(|e| {
894 (StatusCode::INTERNAL_SERVER_ERROR, format!("failed to build request parts: {e}"))
895 })?
896 .into_parts();
897
898 match extractor.extract(&parts).await {
899 Ok(ctx) => Ok(Some(ctx)),
900 Err(RequestContextError::MissingAuth) => {
901 Err((StatusCode::UNAUTHORIZED, "missing authorization".to_string()))
902 }
903 Err(RequestContextError::InvalidToken(msg)) => {
904 Err((StatusCode::UNAUTHORIZED, format!("invalid token: {msg}")))
905 }
906 Err(RequestContextError::ExtractionFailed(msg)) => {
907 Err((StatusCode::INTERNAL_SERVER_ERROR, format!("auth extraction failed: {msg}")))
908 }
909 }
910}
911
912fn bridge_params_with_identity(
913 app_name: &str,
914 user_id: &str,
915 session_id: &str,
916 params: Option<Value>,
917) -> Value {
918 let mut object = params.and_then(|value| value.as_object().cloned()).unwrap_or_default();
919 object.insert("appName".to_string(), Value::String(app_name.to_string()));
920 object.insert("userId".to_string(), Value::String(user_id.to_string()));
921 object.insert("sessionId".to_string(), Value::String(session_id.to_string()));
922 Value::Object(object)
923}
924
925fn deserialize_bridge_params<T: for<'de> Deserialize<'de>>(
926 app_name: &str,
927 user_id: &str,
928 session_id: &str,
929 params: Option<Value>,
930) -> Result<T, RuntimeError> {
931 serde_json::from_value(bridge_params_with_identity(app_name, user_id, session_id, params))
932 .map_err(|error| {
933 (StatusCode::BAD_REQUEST, format!("invalid protocol-native bridge payload: {}", error))
934 })
935}
936
937fn maybe_mark_mcp_ui_initialized(
938 app_name: &str,
939 user_id: &str,
940 session_id: &str,
941 initialized_notification: Option<&Value>,
942) -> Result<(), RuntimeError> {
943 let Some(value) = initialized_notification else {
944 return Ok(());
945 };
946 let method = value
947 .as_object()
948 .and_then(|object| object.get("method"))
949 .and_then(|value| value.as_str())
950 .unwrap_or_default();
951 if method == "ui/notifications/initialized" {
952 mark_mcp_ui_initialized(app_name, user_id, session_id)?;
953 }
954 Ok(())
955}
956
957fn apply_mcp_apps_runtime_envelope(
958 app_name: &str,
959 user_id: &str,
960 session_id: &str,
961 envelope: McpAppsRuntimeEnvelope,
962) -> Result<(), RuntimeError> {
963 match envelope.method.as_str() {
964 "ui/initialize" => {
965 let params = deserialize_bridge_params::<McpUiInitializeParams>(
966 app_name,
967 user_id,
968 session_id,
969 envelope.params,
970 )?;
971 initialize_mcp_ui_bridge(params)?;
972 Ok(())
973 }
974 "ui/message" => {
975 let params = deserialize_bridge_params::<McpUiMessageParams>(
976 app_name,
977 user_id,
978 session_id,
979 envelope.params,
980 )?;
981 message_mcp_ui_bridge(params)?;
982 Ok(())
983 }
984 "ui/update-model-context" => {
985 let params = deserialize_bridge_params::<McpUiUpdateModelContextParams>(
986 app_name,
987 user_id,
988 session_id,
989 envelope.params,
990 )?;
991 update_mcp_ui_bridge_model_context(params)?;
992 Ok(())
993 }
994 "ui/notifications/initialized" => {
995 mark_mcp_ui_initialized(app_name, user_id, session_id)?;
996 Ok(())
997 }
998 method => Err((
999 StatusCode::BAD_REQUEST,
1000 format!("unsupported MCP Apps runtime bridge method '{}'", method),
1001 )),
1002 }
1003}
1004
1005fn direct_ag_ui_events(event: &adk_core::Event, thread_id: &str, run_id: &str) -> Vec<String> {
1006 translate_ag_ui_event(event, thread_id, run_id)
1007 .into_iter()
1008 .filter_map(|item| serde_json::to_string(&item).ok())
1009 .collect()
1010}
1011
1012fn build_runtime_sse_stream<S>(
1013 mut event_stream: S,
1014 profile: UiProfile,
1015 transport: UiTransportMode,
1016 session_id: String,
1017 ag_ui_input: Option<AgUiRunInput>,
1018) -> std::pin::Pin<Box<dyn Stream<Item = Result<Event, Infallible>> + Send>>
1019where
1020 S: Stream<Item = adk_core::Result<adk_core::Event>> + Send + 'static + Unpin,
1021{
1022 let selected_thread_id =
1023 ag_ui_input.as_ref().and_then(|input| input.thread_id.clone()).unwrap_or(session_id);
1024 let selected_run_input = ag_ui_input.clone();
1025 let selected_parent_run_id = ag_ui_input.as_ref().and_then(|input| input.parent_run_id.clone());
1026 let selected_initial_state = ag_ui_input.as_ref().and_then(|input| input.state.clone());
1027 let selected_messages_snapshot =
1028 ag_ui_input.as_ref().and_then(messages_snapshot_from_ag_ui_input);
1029 let selected_activity_events =
1030 ag_ui_input.as_ref().map(activity_events_from_ag_ui_input).unwrap_or_default();
1031
1032 Box::pin(async_stream::stream! {
1033 let native_ag_ui = profile == UiProfile::AgUi && transport == UiTransportMode::ProtocolNative;
1034 let mut started = false;
1035 let mut active_run_id = ag_ui_input.as_ref().and_then(|input| input.run_id.clone());
1036
1037 while let Some(item) = event_stream.next().await {
1038 match item {
1039 Ok(event) => {
1040 if native_ag_ui {
1041 let run_id = active_run_id
1042 .get_or_insert_with(|| event.invocation_id.clone())
1043 .clone();
1044 if !started {
1045 let mut started_event = json!({
1046 "type": "RUN_STARTED",
1047 "threadId": selected_thread_id,
1048 "runId": run_id,
1049 });
1050 if let Some(parent_run_id) = selected_parent_run_id.clone()
1051 && let Some(object) = started_event.as_object_mut()
1052 {
1053 object.insert("parentRunId".to_string(), Value::String(parent_run_id));
1054 }
1055 if let Some(run_input) = selected_run_input.clone()
1056 && let Ok(value) = serde_json::to_value(run_input)
1057 && let Some(object) = started_event.as_object_mut()
1058 {
1059 object.insert("input".to_string(), value);
1060 }
1061 yield Ok(Event::default().data(started_event.to_string()));
1062
1063 if let Some(snapshot) = selected_initial_state.clone() {
1064 yield Ok(Event::default().data(json!({
1065 "type": "STATE_SNAPSHOT",
1066 "snapshot": snapshot,
1067 }).to_string()));
1068 }
1069 if let Some(messages) = selected_messages_snapshot.clone() {
1070 yield Ok(Event::default().data(json!({
1071 "type": "MESSAGES_SNAPSHOT",
1072 "messages": messages,
1073 }).to_string()));
1074 }
1075 for activity_event in selected_activity_events.clone() {
1076 yield Ok(Event::default().data(activity_event.to_string()));
1077 }
1078 started = true;
1079 }
1080
1081 for payload in direct_ag_ui_events(&event, &selected_thread_id, &run_id) {
1082 yield Ok(Event::default().data(payload));
1083 }
1084 } else if let Some(payload) = serialize_runtime_event(&event, profile) {
1085 yield Ok(Event::default().data(payload));
1086 }
1087 }
1088 Err(error) => {
1089 if native_ag_ui {
1090 let run_id =
1091 active_run_id.unwrap_or_else(|| format!("run-{}", Uuid::new_v4()));
1092 if !started {
1093 yield Ok(Event::default().data(json!({
1094 "type": "RUN_STARTED",
1095 "threadId": selected_thread_id,
1096 "runId": run_id,
1097 }).to_string()));
1098 }
1099 yield Ok(Event::default().data(json!({
1100 "type": "RUN_ERROR",
1101 "threadId": selected_thread_id,
1102 "runId": run_id,
1103 "message": error.to_string(),
1104 }).to_string()));
1105 }
1106 return;
1107 }
1108 }
1109 }
1110
1111 if native_ag_ui {
1112 let run_id = active_run_id.unwrap_or_else(|| format!("run-{}", Uuid::new_v4()));
1113 if !started {
1114 yield Ok(Event::default().data(json!({
1115 "type": "RUN_STARTED",
1116 "threadId": selected_thread_id,
1117 "runId": run_id,
1118 }).to_string()));
1119 }
1120 yield Ok(Event::default().data(json!({
1121 "type": "RUN_FINISHED",
1122 "threadId": selected_thread_id,
1123 "runId": run_id,
1124 }).to_string()));
1125 }
1126 })
1127}
1128
1129pub async fn run_sse(
1130 State(controller): State<RuntimeController>,
1131 Path((app_name, user_id, session_id)): Path<(String, String, String)>,
1132 headers: HeaderMap,
1133 Json(req): Json<RunRequest>,
1134) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, RuntimeError> {
1135 let ui_profile = resolve_ui_profile(&headers, infer_run_request_protocol(&req))?;
1136 let transport = resolve_ui_transport(&headers, req.ui_transport.as_deref())?;
1137 validate_transport_support(ui_profile, transport)?;
1138 let span = tracing::info_span!("run_sse", session_id = %session_id, app_name = %app_name, user_id = %user_id);
1139
1140 async move {
1141 log_profile_deprecation(ui_profile);
1142 info!(
1143 ui_protocol = %ui_profile.as_str(),
1144 ui_transport = ?transport,
1145 "resolved ui protocol profile for runtime request"
1146 );
1147
1148 let request_context = extract_request_context(
1152 controller.config.request_context_extractor.as_deref(),
1153 &headers,
1154 )
1155 .await?;
1156
1157 let effective_user_id = request_context.as_ref().map_or(user_id, |rc| rc.user_id.clone());
1163
1164 controller
1166 .config
1167 .session_service
1168 .get(adk_session::GetRequest {
1169 app_name: app_name.clone(),
1170 user_id: effective_user_id.clone(),
1171 session_id: session_id.clone(),
1172 num_recent_events: None,
1173 after: None,
1174 })
1175 .await
1176 .map_err(|_| (StatusCode::NOT_FOUND, "session not found".to_string()))?;
1177
1178 let agent = controller
1180 .config
1181 .agent_loader
1182 .load_agent(&app_name)
1183 .await
1184 .map_err(adk_err_to_runtime)?;
1185
1186 let mut runner_builder = adk_runner::Runner::builder()
1188 .app_name(app_name.clone())
1189 .agent(agent)
1190 .session_service(controller.config.session_service.clone());
1191 if let Some(ref artifact_service) = controller.config.artifact_service {
1192 runner_builder = runner_builder.artifact_service(artifact_service.clone());
1193 }
1194 if let Some(ref memory_service) = controller.config.memory_service {
1195 runner_builder = runner_builder.memory_service(memory_service.clone());
1196 }
1197 if let Some(ref compaction_config) = controller.config.compaction_config {
1198 runner_builder = runner_builder.compaction_config(compaction_config.clone());
1199 }
1200 if let Some(ref context_cache_config) = controller.config.context_cache_config {
1201 runner_builder = runner_builder.context_cache_config(context_cache_config.clone());
1202 }
1203 if let Some(ref cache_capable) = controller.config.cache_capable {
1204 runner_builder = runner_builder.cache_capable(cache_capable.clone());
1205 }
1206 if let Some(request_context) = request_context {
1207 runner_builder = runner_builder.request_context(request_context);
1208 }
1209 let runner = runner_builder.build().map_err(adk_err_to_runtime)?;
1210
1211 let content = build_content_with_attachments(&req.new_message, &req.attachments)?;
1213
1214 if !req.attachments.is_empty() {
1216 info!(attachment_count = req.attachments.len(), "processing request with attachments");
1217 }
1218
1219 let typed_user_id =
1221 UserId::new(effective_user_id).map_err(|err| adk_err_to_runtime(err.into()))?;
1222 let typed_session_id =
1223 SessionId::new(session_id.clone()).map_err(|err| adk_err_to_runtime(err.into()))?;
1224 let event_stream = runner
1225 .run(typed_user_id, typed_session_id, content)
1226 .await
1227 .map_err(adk_err_to_runtime)?;
1228
1229 let sse_stream =
1231 build_runtime_sse_stream(event_stream, ui_profile, transport, session_id.clone(), None);
1232
1233 Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
1234 }
1235 .instrument(span)
1236 .await
1237}
1238
1239pub async fn run_sse_compat(
1242 State(controller): State<RuntimeController>,
1243 headers: HeaderMap,
1244 Json(req): Json<RunSseRequest>,
1245) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, RuntimeError> {
1246 let ui_profile = resolve_ui_profile(&headers, infer_sse_request_protocol(&req))?;
1247 let transport = resolve_ui_transport(&headers, req.ui_transport.as_deref())?;
1248 validate_transport_support(ui_profile, transport)?;
1249 let app_name = req.app_name.clone();
1250 let user_id = req.user_id.clone();
1251 let session_id = req.session_id.clone();
1252 let ag_ui_input = ag_ui_input_from_request(&req);
1253 let mcp_apps_request = mcp_apps_request_from_request(&req);
1254 let mcp_apps_initialize = mcp_apps_initialize_from_request(&req);
1255
1256 info!(
1257 app_name = %app_name,
1258 user_id = %user_id,
1259 session_id = %session_id,
1260 ui_protocol = %ui_profile.as_str(),
1261 ui_transport = ?transport,
1262 "POST /run_sse request received"
1263 );
1264 log_profile_deprecation(ui_profile);
1265
1266 let request_context =
1270 extract_request_context(controller.config.request_context_extractor.as_deref(), &headers)
1271 .await?;
1272
1273 let effective_user_id = request_context.as_ref().map_or(user_id, |rc| rc.user_id.clone());
1279
1280 let resolved_new_message = req
1281 .new_message
1282 .clone()
1283 .or_else(|| ag_ui_input.as_ref().and_then(new_message_from_ag_ui_input))
1284 .ok_or_else(|| {
1285 (
1286 StatusCode::BAD_REQUEST,
1287 "newMessage is required unless protocol-native ag_ui input supplies a user message"
1288 .to_string(),
1289 )
1290 })?;
1291
1292 let content = build_content_from_parts(&resolved_new_message.parts)?;
1294
1295 let text_parts: Vec<_> =
1297 resolved_new_message.parts.iter().filter(|p| p.text.is_some()).collect();
1298 let data_parts: Vec<_> =
1299 resolved_new_message.parts.iter().filter(|p| p.inline_data.is_some()).collect();
1300 if !data_parts.is_empty() {
1301 info!(
1302 text_parts = text_parts.len(),
1303 inline_data_parts = data_parts.len(),
1304 "processing request with inline data"
1305 );
1306 }
1307
1308 let merged_state_delta = merge_runtime_state_delta(
1309 body_state_delta(req.state_delta.as_ref())?,
1310 ag_ui_input.as_ref().map(ag_ui_state_delta).unwrap_or_default(),
1311 );
1312
1313 let session_result = controller
1315 .config
1316 .session_service
1317 .get(adk_session::GetRequest {
1318 app_name: app_name.clone(),
1319 user_id: effective_user_id.clone(),
1320 session_id: session_id.clone(),
1321 num_recent_events: None,
1322 after: None,
1323 })
1324 .await;
1325
1326 if session_result.is_err() {
1328 controller
1329 .config
1330 .session_service
1331 .create(adk_session::CreateRequest {
1332 app_name: app_name.clone(),
1333 user_id: effective_user_id.clone(),
1334 session_id: Some(session_id.clone()),
1335 state: merged_state_delta.clone(),
1336 })
1337 .await
1338 .map_err(adk_err_to_runtime)?;
1339 } else {
1340 apply_state_delta_to_session(
1341 &controller.config.session_service,
1342 &app_name,
1343 &effective_user_id,
1344 &session_id,
1345 merged_state_delta.clone(),
1346 )
1347 .await?;
1348 }
1349
1350 if ui_profile == UiProfile::McpApps {
1351 if let Some(initialize) = mcp_apps_initialize {
1352 apply_mcp_apps_runtime_envelope(
1353 &app_name,
1354 &effective_user_id,
1355 &session_id,
1356 initialize,
1357 )?;
1358 }
1359 if let Some(request) = mcp_apps_request {
1360 apply_mcp_apps_runtime_envelope(&app_name, &effective_user_id, &session_id, request)?;
1361 }
1362 maybe_mark_mcp_ui_initialized(
1363 &app_name,
1364 &effective_user_id,
1365 &session_id,
1366 req.mcp_apps_initialized.as_ref(),
1367 )?;
1368 }
1369
1370 let agent =
1372 controller.config.agent_loader.load_agent(&app_name).await.map_err(adk_err_to_runtime)?;
1373
1374 let streaming_mode =
1376 if req.streaming { adk_core::StreamingMode::SSE } else { adk_core::StreamingMode::None };
1377
1378 let mut runner_builder = adk_runner::Runner::builder()
1379 .app_name(app_name)
1380 .agent(agent)
1381 .session_service(controller.config.session_service.clone())
1382 .run_config(adk_core::RunConfig::builder().streaming_mode(streaming_mode).build());
1383 if let Some(ref artifact_service) = controller.config.artifact_service {
1384 runner_builder = runner_builder.artifact_service(artifact_service.clone());
1385 }
1386 if let Some(ref memory_service) = controller.config.memory_service {
1387 runner_builder = runner_builder.memory_service(memory_service.clone());
1388 }
1389 if let Some(ref compaction_config) = controller.config.compaction_config {
1390 runner_builder = runner_builder.compaction_config(compaction_config.clone());
1391 }
1392 if let Some(ref context_cache_config) = controller.config.context_cache_config {
1393 runner_builder = runner_builder.context_cache_config(context_cache_config.clone());
1394 }
1395 if let Some(ref cache_capable) = controller.config.cache_capable {
1396 runner_builder = runner_builder.cache_capable(cache_capable.clone());
1397 }
1398 if let Some(request_context) = request_context {
1399 runner_builder = runner_builder.request_context(request_context);
1400 }
1401 let runner = runner_builder.build().map_err(adk_err_to_runtime)?;
1402
1403 let typed_user_id =
1405 UserId::new(effective_user_id).map_err(|err| adk_err_to_runtime(err.into()))?;
1406 let typed_session_id =
1407 SessionId::new(session_id.clone()).map_err(|err| adk_err_to_runtime(err.into()))?;
1408 let event_stream =
1409 runner.run(typed_user_id, typed_session_id, content).await.map_err(adk_err_to_runtime)?;
1410
1411 let sse_stream = build_runtime_sse_stream(
1413 event_stream,
1414 ui_profile,
1415 transport,
1416 session_id.clone(),
1417 ag_ui_input,
1418 );
1419
1420 Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
1421}