1use crate::ServerConfig;
2use crate::auth_bridge::{RequestContextError, RequestContextExtractor};
3use crate::rest::controllers::ui::{
4 McpUiInitializeParams, McpUiMessageParams, McpUiUpdateModelContextParams,
5 initialize_mcp_ui_bridge, mark_mcp_ui_initialized, message_mcp_ui_bridge,
6 update_mcp_ui_bridge_model_context,
7};
8use crate::ui_protocol::{
9 SUPPORTED_UI_PROTOCOLS, UI_PROTOCOL_CAPABILITIES, normalize_runtime_ui_protocol,
10};
11use adk_core::{RequestContext, SessionId, UserId};
12use axum::{
13 Json,
14 extract::{Path, State},
15 http::{HeaderMap, StatusCode},
16 response::sse::{Event, KeepAlive, Sse},
17};
18use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD};
19use futures::{StreamExt, stream::Stream};
20use serde::{Deserialize, Serialize};
21use serde_json::{Map, Value, json};
22use std::collections::HashMap;
23use std::convert::Infallible;
24use tracing::{Instrument, info, warn};
25use uuid::Uuid;
26
27fn default_streaming_true() -> bool {
28 true
29}
30
31const UI_PROTOCOL_HEADER: &str = "x-adk-ui-protocol";
32const UI_TRANSPORT_HEADER: &str = "x-adk-ui-transport";
33
34#[derive(Clone)]
35pub struct RuntimeController {
36 config: ServerConfig,
37}
38
39impl RuntimeController {
40 pub fn new(config: ServerConfig) -> Self {
41 Self { config }
42 }
43}
44
45#[derive(Serialize, Deserialize, Debug)]
47pub struct Attachment {
48 pub name: String,
49 #[serde(rename = "type")]
50 pub mime_type: String,
51 pub base64: String,
52}
53
54#[derive(Serialize, Deserialize)]
55pub struct RunRequest {
56 pub new_message: String,
57 #[serde(default, alias = "uiProtocol")]
58 pub ui_protocol: Option<String>,
59 #[serde(default)]
60 pub protocol: Option<String>,
61 #[serde(default, alias = "ui_transport")]
62 pub ui_transport: Option<String>,
63 #[serde(default)]
64 pub attachments: Vec<Attachment>,
65}
66
67#[derive(Serialize, Deserialize, Debug)]
69#[serde(rename_all = "camelCase")]
70pub struct RunSseRequest {
71 pub app_name: String,
72 pub user_id: String,
73 pub session_id: String,
74 #[serde(default)]
75 pub new_message: Option<NewMessage>,
76 #[serde(default = "default_streaming_true")]
77 pub streaming: bool,
78 #[serde(default)]
79 pub state_delta: Option<Value>,
80 #[serde(default, alias = "ui_protocol")]
81 pub ui_protocol: Option<String>,
82 #[serde(default)]
83 pub protocol: Option<String>,
84 #[serde(default, alias = "ui_transport")]
85 pub ui_transport: Option<String>,
86 #[serde(default)]
87 pub input: Option<AgUiRunInput>,
88 #[serde(default)]
89 pub ag_ui_input: Option<AgUiRunInput>,
90 #[serde(default)]
91 pub ag_ui_compatibility_event: Option<Value>,
92 #[serde(default)]
93 pub protocol_envelope: Option<Value>,
94 #[serde(default)]
95 pub mcp_apps_request: Option<McpAppsRuntimeEnvelope>,
96 #[serde(default)]
97 pub mcp_apps_initialize: Option<McpAppsRuntimeEnvelope>,
98 #[serde(default)]
99 pub mcp_apps_initialized: Option<Value>,
100 #[serde(default)]
101 pub method: Option<String>,
102 #[serde(default)]
103 pub params: Option<Value>,
104}
105
106#[derive(Serialize, Deserialize, Debug, Clone, Default)]
107#[serde(rename_all = "camelCase")]
108pub struct AgUiInputMessage {
109 #[serde(default)]
110 pub id: Option<String>,
111 #[serde(default)]
112 pub role: Option<String>,
113 #[serde(default)]
114 pub name: Option<String>,
115 #[serde(default)]
116 pub activity_type: Option<String>,
117 #[serde(default)]
118 pub content: Option<Value>,
119 #[serde(default)]
120 pub replace: Option<bool>,
121 #[serde(default)]
122 pub patch: Option<Vec<Value>>,
123}
124
125#[derive(Serialize, Deserialize, Debug, Clone, Default)]
126#[serde(rename_all = "camelCase")]
127pub struct AgUiRunInput {
128 #[serde(default)]
129 pub thread_id: Option<String>,
130 #[serde(default)]
131 pub run_id: Option<String>,
132 #[serde(default)]
133 pub parent_run_id: Option<String>,
134 #[serde(default)]
135 pub state: Option<Value>,
136 #[serde(default)]
137 pub messages: Vec<AgUiInputMessage>,
138 #[serde(default)]
139 pub tools: Vec<Value>,
140 #[serde(default)]
141 pub context: Vec<Value>,
142 #[serde(default)]
143 pub forwarded_props: Option<Value>,
144}
145
146#[derive(Serialize, Deserialize, Debug, Clone)]
147#[serde(rename_all = "camelCase")]
148pub struct McpAppsRuntimeEnvelope {
149 pub method: String,
150 #[serde(default)]
151 pub params: Option<Value>,
152}
153
154#[derive(Serialize, Deserialize, Debug, Clone)]
155pub struct NewMessage {
156 pub role: String,
157 pub parts: Vec<MessagePart>,
158}
159
160#[derive(Serialize, Deserialize, Debug, Clone)]
161pub struct MessagePart {
162 #[serde(default)]
163 pub text: Option<String>,
164 #[serde(default, rename = "inlineData")]
165 pub inline_data: Option<InlineData>,
166}
167
168#[derive(Serialize, Deserialize, Debug, Clone)]
169#[serde(rename_all = "camelCase")]
170pub struct InlineData {
171 pub display_name: Option<String>,
172 pub data: String,
173 pub mime_type: String,
174}
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177enum UiProfile {
178 AdkUi,
179 A2ui,
180 AgUi,
181 McpApps,
182}
183
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185enum UiTransportMode {
186 LegacyWrapper,
187 ProtocolNative,
188}
189
190impl UiProfile {
191 fn as_str(self) -> &'static str {
192 match self {
193 Self::AdkUi => "adk_ui",
194 Self::A2ui => "a2ui",
195 Self::AgUi => "ag_ui",
196 Self::McpApps => "mcp_apps",
197 }
198 }
199}
200
201type RuntimeError = (StatusCode, String);
202
203fn adk_err_to_runtime(err: adk_core::AdkError) -> RuntimeError {
209 let status =
210 StatusCode::from_u16(err.http_status_code()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
211 let body = err.to_problem_json().to_string();
212 (status, body)
213}
214
215fn parse_ui_profile(raw: &str) -> Option<UiProfile> {
216 match normalize_runtime_ui_protocol(raw)? {
217 "adk_ui" => Some(UiProfile::AdkUi),
218 "a2ui" => Some(UiProfile::A2ui),
219 "ag_ui" => Some(UiProfile::AgUi),
220 "mcp_apps" => Some(UiProfile::McpApps),
221 _ => None,
222 }
223}
224
225fn resolve_ui_profile(
226 headers: &HeaderMap,
227 body_ui_protocol: Option<&str>,
228) -> Result<UiProfile, RuntimeError> {
229 let header_value = headers.get(UI_PROTOCOL_HEADER).and_then(|v| v.to_str().ok());
230 let candidate = header_value.or(body_ui_protocol);
231
232 let Some(raw) = candidate else {
233 return Ok(UiProfile::AdkUi);
234 };
235
236 parse_ui_profile(raw).ok_or_else(|| {
237 let supported = SUPPORTED_UI_PROTOCOLS.join(", ");
238 warn!(
239 requested = %raw,
240 header = %UI_PROTOCOL_HEADER,
241 "unsupported ui protocol requested"
242 );
243 (
244 StatusCode::BAD_REQUEST,
245 format!("Unsupported ui protocol '{}'. Supported profiles: {}", raw, supported),
246 )
247 })
248}
249
250fn parse_ui_transport(raw: &str) -> Option<UiTransportMode> {
251 match raw.trim().to_ascii_lowercase().as_str() {
252 "legacy" | "legacy_wrapper" => Some(UiTransportMode::LegacyWrapper),
253 "native" | "protocol_native" => Some(UiTransportMode::ProtocolNative),
254 _ => None,
255 }
256}
257
258fn resolve_ui_transport(
259 headers: &HeaderMap,
260 body_ui_transport: Option<&str>,
261) -> Result<UiTransportMode, RuntimeError> {
262 let header_value = headers.get(UI_TRANSPORT_HEADER).and_then(|v| v.to_str().ok());
263 let candidate = header_value.or(body_ui_transport);
264
265 let Some(raw) = candidate else {
266 return Ok(UiTransportMode::LegacyWrapper);
267 };
268
269 parse_ui_transport(raw).ok_or_else(|| {
270 warn!(
271 requested = %raw,
272 header = %UI_TRANSPORT_HEADER,
273 "unsupported ui transport requested"
274 );
275 (
276 StatusCode::BAD_REQUEST,
277 format!(
278 "Unsupported ui transport '{}'. Supported values: legacy_wrapper, protocol_native",
279 raw
280 ),
281 )
282 })
283}
284
285fn validate_transport_support(
286 profile: UiProfile,
287 transport: UiTransportMode,
288) -> Result<(), RuntimeError> {
289 if transport == UiTransportMode::ProtocolNative && profile != UiProfile::AgUi {
290 return Err((
291 StatusCode::BAD_REQUEST,
292 "protocol_native transport is currently available only for ag_ui; use the MCP Apps bridge endpoints for mcp_apps".to_string(),
293 ));
294 }
295 Ok(())
296}
297
298fn protocol_from_envelope(envelope: &Value) -> Option<&str> {
299 envelope.as_object().and_then(|object| object.get("protocol")).and_then(|value| value.as_str())
300}
301
302fn serialize_runtime_event(event: &adk_core::Event, profile: UiProfile) -> Option<String> {
303 if profile == UiProfile::AdkUi {
304 return serde_json::to_string(event).ok();
305 }
306
307 serde_json::to_string(&json!({
308 "ui_protocol": profile.as_str(),
309 "event": event
310 }))
311 .ok()
312}
313
314fn infer_sse_request_protocol(req: &RunSseRequest) -> Option<&str> {
315 req.ui_protocol
316 .as_deref()
317 .or(req.protocol.as_deref())
318 .or_else(|| req.protocol_envelope.as_ref().and_then(protocol_from_envelope))
319 .or_else(|| req.ag_ui_input.as_ref().map(|_| "ag_ui"))
320 .or_else(|| req.input.as_ref().map(|_| "ag_ui"))
321 .or_else(|| req.mcp_apps_request.as_ref().map(|_| "mcp_apps"))
322 .or_else(|| req.mcp_apps_initialize.as_ref().map(|_| "mcp_apps"))
323}
324
325fn infer_run_request_protocol(req: &RunRequest) -> Option<&str> {
326 req.ui_protocol.as_deref().or(req.protocol.as_deref())
327}
328
329fn ag_ui_input_from_request(req: &RunSseRequest) -> Option<AgUiRunInput> {
330 req.ag_ui_input.clone().or_else(|| req.input.clone()).or_else(|| {
331 let envelope = req.protocol_envelope.as_ref()?;
332 if protocol_from_envelope(envelope)? != "ag_ui" {
333 return None;
334 }
335 envelope
336 .as_object()
337 .and_then(|object| object.get("input"))
338 .and_then(|value| serde_json::from_value(value.clone()).ok())
339 })
340}
341
342fn mcp_apps_request_from_request(req: &RunSseRequest) -> Option<McpAppsRuntimeEnvelope> {
343 req.mcp_apps_request.clone().or_else(|| {
344 if let Some(method) = req.method.clone() {
345 return Some(McpAppsRuntimeEnvelope { method, params: req.params.clone() });
346 }
347 let envelope = req.protocol_envelope.as_ref()?;
348 if protocol_from_envelope(envelope)? != "mcp_apps" {
349 return None;
350 }
351 let object = envelope.as_object()?;
352 let method = object.get("method")?.as_str()?.to_string();
353 let params = object.get("params").cloned();
354 Some(McpAppsRuntimeEnvelope { method, params })
355 })
356}
357
358fn mcp_apps_initialize_from_request(req: &RunSseRequest) -> Option<McpAppsRuntimeEnvelope> {
359 req.mcp_apps_initialize.clone()
360}
361
362fn extract_text_segments(value: &Value) -> Vec<String> {
363 match value {
364 Value::String(text) => {
365 let trimmed = text.trim();
366 if trimmed.is_empty() { vec![] } else { vec![trimmed.to_string()] }
367 }
368 Value::Array(items) => items
369 .iter()
370 .flat_map(|item| {
371 if let Some(text) = item
372 .as_object()
373 .and_then(|object| object.get("text"))
374 .and_then(|text| text.as_str())
375 {
376 let trimmed = text.trim();
377 if !trimmed.is_empty() {
378 return vec![trimmed.to_string()];
379 }
380 }
381 vec![]
382 })
383 .collect(),
384 Value::Object(object) => object
385 .get("text")
386 .and_then(|text| text.as_str())
387 .map(|text| text.trim().to_string())
388 .filter(|text| !text.is_empty())
389 .into_iter()
390 .collect(),
391 _ => vec![],
392 }
393}
394
395fn new_message_from_ag_ui_input(input: &AgUiRunInput) -> Option<NewMessage> {
396 let selected = input
397 .messages
398 .iter()
399 .rev()
400 .find(|message| message.role.as_deref().unwrap_or("user") == "user")
401 .or_else(|| input.messages.last())?;
402
403 let content = selected.content.as_ref()?;
404 let parts: Vec<MessagePart> = extract_text_segments(content)
405 .into_iter()
406 .map(|text| MessagePart { text: Some(text), inline_data: None })
407 .collect();
408 if parts.is_empty() {
409 return None;
410 }
411
412 Some(NewMessage { role: selected.role.clone().unwrap_or_else(|| "user".to_string()), parts })
413}
414
415fn activity_content_snapshot(value: Option<&Value>) -> Value {
416 match value.cloned() {
417 Some(Value::Object(object)) => Value::Object(object),
418 Some(other) => json!({ "value": other }),
419 None => json!({}),
420 }
421}
422
423fn activity_message_id(message: &AgUiInputMessage) -> String {
424 message.id.clone().unwrap_or_else(|| format!("activity-{}", Uuid::new_v4()))
425}
426
427fn activity_message_type(message: &AgUiInputMessage) -> String {
428 message
429 .activity_type
430 .clone()
431 .or_else(|| message.name.clone())
432 .unwrap_or_else(|| "CUSTOM".to_string())
433}
434
435fn activity_events_from_ag_ui_input(input: &AgUiRunInput) -> Vec<Value> {
436 input
437 .messages
438 .iter()
439 .filter(|message| message.role.as_deref() == Some("activity"))
440 .map(|message| {
441 let timestamp = chrono::Utc::now().timestamp_millis().max(0) as u64;
442 let message_id = activity_message_id(message);
443 let activity_type = activity_message_type(message);
444 if let Some(patch) = &message.patch {
445 json!({
446 "type": "ACTIVITY_DELTA",
447 "messageId": message_id,
448 "activityType": activity_type,
449 "patch": patch,
450 "timestamp": timestamp,
451 })
452 } else {
453 let mut event = json!({
454 "type": "ACTIVITY_SNAPSHOT",
455 "messageId": message_id,
456 "activityType": activity_type,
457 "content": activity_content_snapshot(message.content.as_ref()),
458 "timestamp": timestamp,
459 });
460 if let Some(replace) = message.replace {
461 if let Some(object) = event.as_object_mut() {
462 object.insert("replace".to_string(), Value::Bool(replace));
463 }
464 }
465 event
466 }
467 })
468 .collect()
469}
470
471fn messages_snapshot_from_ag_ui_input(input: &AgUiRunInput) -> Option<Value> {
472 if input.messages.is_empty() {
473 return None;
474 }
475
476 let filtered: Vec<AgUiInputMessage> = input
477 .messages
478 .iter()
479 .filter(|message| !(message.role.as_deref() == Some("activity") && message.patch.is_some()))
480 .cloned()
481 .collect();
482 if filtered.is_empty() {
483 return None;
484 }
485
486 serde_json::to_value(filtered).ok()
487}
488
489fn object_entries_to_state_delta(object: &Map<String, Value>) -> HashMap<String, Value> {
490 object.iter().map(|(key, value)| (key.clone(), value.clone())).collect()
491}
492
493fn ag_ui_state_delta(input: &AgUiRunInput) -> HashMap<String, Value> {
494 let mut delta = HashMap::new();
495
496 if let Some(state) = input.state.clone() {
497 match state {
498 Value::Object(object) => {
499 delta.extend(object_entries_to_state_delta(&object));
500 }
501 value => {
502 delta.insert("temp:ag_ui_state".to_string(), value);
503 }
504 }
505 }
506
507 if !input.messages.is_empty() {
508 if let Ok(value) = serde_json::to_value(&input.messages) {
509 delta.insert("temp:ag_ui_messages".to_string(), value);
510 }
511 }
512 if !input.tools.is_empty() {
513 delta.insert("temp:ag_ui_tools".to_string(), Value::Array(input.tools.clone()));
514 }
515 if !input.context.is_empty() {
516 delta.insert("temp:ag_ui_context".to_string(), Value::Array(input.context.clone()));
517 }
518 if let Some(forwarded_props) = input.forwarded_props.clone() {
519 delta.insert("temp:ag_ui_forwarded_props".to_string(), forwarded_props);
520 }
521
522 delta
523}
524
525fn body_state_delta(value: Option<&Value>) -> Result<HashMap<String, Value>, RuntimeError> {
526 let Some(value) = value else {
527 return Ok(HashMap::new());
528 };
529 let object = value.as_object().ok_or_else(|| {
530 (StatusCode::BAD_REQUEST, "stateDelta must be a JSON object when provided".to_string())
531 })?;
532 Ok(object_entries_to_state_delta(object))
533}
534
535fn log_profile_deprecation(profile: UiProfile) {
536 if profile != UiProfile::AdkUi {
537 return;
538 }
539 let Some(spec) = UI_PROTOCOL_CAPABILITIES
540 .iter()
541 .find(|capability| capability.protocol == profile.as_str())
542 .and_then(|capability| capability.deprecation)
543 else {
544 return;
545 };
546
547 warn!(
548 protocol = %profile.as_str(),
549 stage = %spec.stage,
550 announced_on = %spec.announced_on,
551 sunset_target_on = ?spec.sunset_target_on,
552 replacements = ?spec.replacement_protocols,
553 "legacy ui protocol profile selected"
554 );
555}
556
557fn build_content_with_attachments(
559 text: &str,
560 attachments: &[Attachment],
561) -> Result<adk_core::Content, RuntimeError> {
562 let mut content = adk_core::Content::new("user");
563
564 content.parts.push(adk_core::Part::Text { text: text.to_string() });
566
567 for attachment in attachments {
569 match BASE64_STANDARD.decode(&attachment.base64) {
570 Ok(data) => {
571 if data.len() > adk_core::MAX_INLINE_DATA_SIZE {
572 return Err((
573 StatusCode::PAYLOAD_TOO_LARGE,
574 format!(
575 "Attachment '{}' exceeds max inline size of {} bytes",
576 attachment.name,
577 adk_core::MAX_INLINE_DATA_SIZE
578 ),
579 ));
580 }
581 content.parts.push(adk_core::Part::InlineData {
582 mime_type: attachment.mime_type.clone(),
583 data,
584 });
585 }
586 Err(e) => {
587 return Err((
588 StatusCode::BAD_REQUEST,
589 format!("Invalid base64 data for attachment '{}': {}", attachment.name, e),
590 ));
591 }
592 }
593 }
594
595 Ok(content)
596}
597
598fn build_content_from_parts(parts: &[MessagePart]) -> Result<adk_core::Content, RuntimeError> {
600 let mut content = adk_core::Content::new("user");
601
602 for part in parts {
603 if let Some(text) = &part.text {
605 content.parts.push(adk_core::Part::Text { text: text.clone() });
606 }
607
608 if let Some(inline_data) = &part.inline_data {
610 match BASE64_STANDARD.decode(&inline_data.data) {
611 Ok(data) => {
612 if data.len() > adk_core::MAX_INLINE_DATA_SIZE {
613 return Err((
614 StatusCode::PAYLOAD_TOO_LARGE,
615 format!(
616 "inline_data exceeds max inline size of {} bytes",
617 adk_core::MAX_INLINE_DATA_SIZE
618 ),
619 ));
620 }
621 content.parts.push(adk_core::Part::InlineData {
622 mime_type: inline_data.mime_type.clone(),
623 data,
624 });
625 }
626 Err(e) => {
627 return Err((
628 StatusCode::BAD_REQUEST,
629 format!("Invalid base64 data in inline_data: {}", e),
630 ));
631 }
632 }
633 }
634 }
635
636 Ok(content)
637}
638
639async fn apply_state_delta_to_session(
640 session_service: &std::sync::Arc<dyn adk_session::SessionService>,
641 app_name: &str,
642 user_id: &str,
643 session_id: &str,
644 state_delta: HashMap<String, Value>,
645) -> Result<(), RuntimeError> {
646 if state_delta.is_empty() {
647 return Ok(());
648 }
649
650 let identity = adk_core::AdkIdentity::new(
651 adk_core::AppName::try_from(app_name).map_err(|error| {
652 (
653 StatusCode::BAD_REQUEST,
654 format!("invalid app_name for state delta application: {}", error),
655 )
656 })?,
657 adk_core::UserId::try_from(user_id).map_err(|error| {
658 (
659 StatusCode::BAD_REQUEST,
660 format!("invalid user_id for state delta application: {}", error),
661 )
662 })?,
663 adk_core::SessionId::try_from(session_id).map_err(|error| {
664 (
665 StatusCode::BAD_REQUEST,
666 format!("invalid session_id for state delta application: {}", error),
667 )
668 })?,
669 );
670
671 let mut event = adk_core::Event::new(format!("ui-input-{}", Uuid::new_v4()));
672 event.author = "ui_protocol_bridge".to_string();
673 event.actions.state_delta = state_delta;
674 session_service
675 .append_event_for_identity(adk_session::AppendEventRequest { identity, event })
676 .await
677 .map_err(adk_err_to_runtime)
678}
679
680fn merge_runtime_state_delta(
681 body_delta: HashMap<String, Value>,
682 ag_ui_delta: HashMap<String, Value>,
683) -> HashMap<String, Value> {
684 let mut merged = body_delta;
685 merged.extend(ag_ui_delta);
686 merged
687}
688
689fn json_pointer_escape(segment: &str) -> String {
690 segment.replace('~', "~0").replace('/', "~1")
691}
692
693fn state_delta_to_json_patch(delta: &HashMap<String, Value>) -> Vec<Value> {
694 delta
695 .iter()
696 .map(|(key, value)| {
697 json!({
698 "op": "add",
699 "path": format!("/{}", json_pointer_escape(key)),
700 "value": value
701 })
702 })
703 .collect()
704}
705
706fn timestamp_millis(event: &adk_core::Event) -> u64 {
707 event.timestamp.timestamp_millis().max(0) as u64
708}
709
710fn serialize_ag_ui_tool_call_delta(args: &Value, allow_raw_string_delta: bool) -> String {
711 if allow_raw_string_delta {
712 if let Value::String(delta) = args {
713 return delta.clone();
714 }
715 }
716
717 serde_json::to_string(args).unwrap_or_else(|_| args.to_string())
718}
719
720fn translate_ag_ui_event(event: &adk_core::Event, thread_id: &str, run_id: &str) -> Vec<Value> {
721 let mut translated = Vec::new();
722 let timestamp = timestamp_millis(event);
723 let is_partial = event.llm_response.partial;
724
725 if !event.actions.state_delta.is_empty() {
726 translated.push(json!({
727 "type": "STATE_DELTA",
728 "delta": state_delta_to_json_patch(&event.actions.state_delta),
729 "timestamp": timestamp,
730 }));
731 }
732
733 if let Some(message) = event.llm_response.error_message.clone() {
734 translated.push(json!({
735 "type": "RUN_ERROR",
736 "threadId": thread_id,
737 "runId": run_id,
738 "message": message,
739 "code": event.llm_response.error_code,
740 "timestamp": timestamp,
741 }));
742 }
743
744 let Some(content) = &event.llm_response.content else {
745 return translated;
746 };
747
748 for (index, part) in content.parts.iter().enumerate() {
749 match part {
750 adk_core::Part::Text { text } if !text.trim().is_empty() => {
751 let message_id = format!("{}-text-{}", event.id, index);
752 if is_partial {
753 translated.push(json!({
754 "type": "TEXT_MESSAGE_CHUNK",
755 "messageId": message_id,
756 "role": "assistant",
757 "delta": text,
758 "timestamp": timestamp,
759 }));
760 } else {
761 translated.push(json!({
762 "type": "TEXT_MESSAGE_START",
763 "messageId": message_id,
764 "role": "assistant",
765 "timestamp": timestamp,
766 }));
767 translated.push(json!({
768 "type": "TEXT_MESSAGE_CONTENT",
769 "messageId": format!("{}-text-{}", event.id, index),
770 "delta": text,
771 "timestamp": timestamp,
772 }));
773 translated.push(json!({
774 "type": "TEXT_MESSAGE_END",
775 "messageId": format!("{}-text-{}", event.id, index),
776 "timestamp": timestamp,
777 }));
778 }
779 }
780 adk_core::Part::Thinking { thinking, .. } if !thinking.trim().is_empty() => {
781 let message_id = format!("{}-reasoning-{}", event.id, index);
782 if is_partial {
783 translated.push(json!({
784 "type": "REASONING_MESSAGE_CHUNK",
785 "messageId": message_id,
786 "delta": thinking,
787 "timestamp": timestamp,
788 }));
789 } else {
790 let reasoning_id = format!("{}-reasoning-phase-{}", event.id, index);
791 translated.push(json!({
792 "type": "REASONING_START",
793 "messageId": reasoning_id,
794 "timestamp": timestamp,
795 }));
796 translated.push(json!({
797 "type": "REASONING_MESSAGE_START",
798 "messageId": message_id,
799 "role": "assistant",
800 "timestamp": timestamp,
801 }));
802 translated.push(json!({
803 "type": "REASONING_MESSAGE_CONTENT",
804 "messageId": format!("{}-reasoning-{}", event.id, index),
805 "delta": thinking,
806 "timestamp": timestamp,
807 }));
808 translated.push(json!({
809 "type": "REASONING_MESSAGE_END",
810 "messageId": format!("{}-reasoning-{}", event.id, index),
811 "timestamp": timestamp,
812 }));
813 translated.push(json!({
814 "type": "REASONING_END",
815 "messageId": reasoning_id,
816 "timestamp": timestamp,
817 }));
818 }
819 }
820 adk_core::Part::FunctionCall { name, args, id, .. } => {
821 let tool_call_id =
822 id.clone().unwrap_or_else(|| format!("{}-tool-call-{}", event.id, index));
823 let raw_chunk_supported = is_partial && matches!(args, Value::String(_));
824 let args_delta = serialize_ag_ui_tool_call_delta(args, raw_chunk_supported);
825 if raw_chunk_supported {
826 translated.push(json!({
827 "type": "TOOL_CALL_CHUNK",
828 "toolCallId": tool_call_id,
829 "toolCallName": name,
830 "delta": args_delta,
831 "timestamp": timestamp,
832 }));
833 } else {
834 translated.push(json!({
835 "type": "TOOL_CALL_START",
836 "toolCallId": tool_call_id,
837 "toolCallName": name,
838 "timestamp": timestamp,
839 }));
840 translated.push(json!({
841 "type": "TOOL_CALL_ARGS",
842 "toolCallId": id.clone().unwrap_or_else(|| format!("{}-tool-call-{}", event.id, index)),
843 "delta": args_delta,
844 "timestamp": timestamp,
845 }));
846 translated.push(json!({
847 "type": "TOOL_CALL_END",
848 "toolCallId": id.clone().unwrap_or_else(|| format!("{}-tool-call-{}", event.id, index)),
849 "timestamp": timestamp,
850 }));
851 }
852 }
853 adk_core::Part::FunctionResponse { function_response, id } => {
854 let tool_call_id =
855 id.clone().unwrap_or_else(|| format!("{}-tool-result-{}", event.id, index));
856 let response_content = serde_json::to_string(&function_response.response)
857 .unwrap_or_else(|_| function_response.response.to_string());
858 translated.push(json!({
859 "type": "TOOL_CALL_RESULT",
860 "messageId": format!("msg-{}", tool_call_id),
861 "toolCallId": tool_call_id,
862 "toolCallName": function_response.name,
863 "content": response_content,
864 "role": "tool",
865 "timestamp": timestamp,
866 }));
867 }
868 _ => {}
869 }
870 }
871
872 translated
873}
874
875async fn extract_request_context(
881 extractor: Option<&dyn RequestContextExtractor>,
882 headers: &HeaderMap,
883) -> Result<Option<RequestContext>, RuntimeError> {
884 let Some(extractor) = extractor else {
885 return Ok(None);
886 };
887
888 let mut builder = axum::http::Request::builder();
890 for (name, value) in headers {
891 builder = builder.header(name, value);
892 }
893 let (parts, _) = builder
894 .body(())
895 .map_err(|e| {
896 (StatusCode::INTERNAL_SERVER_ERROR, format!("failed to build request parts: {e}"))
897 })?
898 .into_parts();
899
900 match extractor.extract(&parts).await {
901 Ok(ctx) => Ok(Some(ctx)),
902 Err(RequestContextError::MissingAuth) => {
903 Err((StatusCode::UNAUTHORIZED, "missing authorization".to_string()))
904 }
905 Err(RequestContextError::InvalidToken(msg)) => {
906 Err((StatusCode::UNAUTHORIZED, format!("invalid token: {msg}")))
907 }
908 Err(RequestContextError::ExtractionFailed(msg)) => {
909 Err((StatusCode::INTERNAL_SERVER_ERROR, format!("auth extraction failed: {msg}")))
910 }
911 }
912}
913
914fn bridge_params_with_identity(
915 app_name: &str,
916 user_id: &str,
917 session_id: &str,
918 params: Option<Value>,
919) -> Value {
920 let mut object = params.and_then(|value| value.as_object().cloned()).unwrap_or_default();
921 object.insert("appName".to_string(), Value::String(app_name.to_string()));
922 object.insert("userId".to_string(), Value::String(user_id.to_string()));
923 object.insert("sessionId".to_string(), Value::String(session_id.to_string()));
924 Value::Object(object)
925}
926
927fn deserialize_bridge_params<T: for<'de> Deserialize<'de>>(
928 app_name: &str,
929 user_id: &str,
930 session_id: &str,
931 params: Option<Value>,
932) -> Result<T, RuntimeError> {
933 serde_json::from_value(bridge_params_with_identity(app_name, user_id, session_id, params))
934 .map_err(|error| {
935 (StatusCode::BAD_REQUEST, format!("invalid protocol-native bridge payload: {}", error))
936 })
937}
938
939fn maybe_mark_mcp_ui_initialized(
940 app_name: &str,
941 user_id: &str,
942 session_id: &str,
943 initialized_notification: Option<&Value>,
944) -> Result<(), RuntimeError> {
945 let Some(value) = initialized_notification else {
946 return Ok(());
947 };
948 let method = value
949 .as_object()
950 .and_then(|object| object.get("method"))
951 .and_then(|value| value.as_str())
952 .unwrap_or_default();
953 if method == "ui/notifications/initialized" {
954 mark_mcp_ui_initialized(app_name, user_id, session_id)?;
955 }
956 Ok(())
957}
958
959fn apply_mcp_apps_runtime_envelope(
960 app_name: &str,
961 user_id: &str,
962 session_id: &str,
963 envelope: McpAppsRuntimeEnvelope,
964) -> Result<(), RuntimeError> {
965 match envelope.method.as_str() {
966 "ui/initialize" => {
967 let params = deserialize_bridge_params::<McpUiInitializeParams>(
968 app_name,
969 user_id,
970 session_id,
971 envelope.params,
972 )?;
973 initialize_mcp_ui_bridge(params)?;
974 Ok(())
975 }
976 "ui/message" => {
977 let params = deserialize_bridge_params::<McpUiMessageParams>(
978 app_name,
979 user_id,
980 session_id,
981 envelope.params,
982 )?;
983 message_mcp_ui_bridge(params)?;
984 Ok(())
985 }
986 "ui/update-model-context" => {
987 let params = deserialize_bridge_params::<McpUiUpdateModelContextParams>(
988 app_name,
989 user_id,
990 session_id,
991 envelope.params,
992 )?;
993 update_mcp_ui_bridge_model_context(params)?;
994 Ok(())
995 }
996 "ui/notifications/initialized" => {
997 mark_mcp_ui_initialized(app_name, user_id, session_id)?;
998 Ok(())
999 }
1000 method => Err((
1001 StatusCode::BAD_REQUEST,
1002 format!("unsupported MCP Apps runtime bridge method '{}'", method),
1003 )),
1004 }
1005}
1006
1007fn direct_ag_ui_events(event: &adk_core::Event, thread_id: &str, run_id: &str) -> Vec<String> {
1008 translate_ag_ui_event(event, thread_id, run_id)
1009 .into_iter()
1010 .filter_map(|item| serde_json::to_string(&item).ok())
1011 .collect()
1012}
1013
1014fn build_runtime_sse_stream<S>(
1015 mut event_stream: S,
1016 profile: UiProfile,
1017 transport: UiTransportMode,
1018 session_id: String,
1019 ag_ui_input: Option<AgUiRunInput>,
1020) -> std::pin::Pin<Box<dyn Stream<Item = Result<Event, Infallible>> + Send>>
1021where
1022 S: Stream<Item = adk_core::Result<adk_core::Event>> + Send + 'static + Unpin,
1023{
1024 let selected_thread_id =
1025 ag_ui_input.as_ref().and_then(|input| input.thread_id.clone()).unwrap_or(session_id);
1026 let selected_run_input = ag_ui_input.clone();
1027 let selected_parent_run_id = ag_ui_input.as_ref().and_then(|input| input.parent_run_id.clone());
1028 let selected_initial_state = ag_ui_input.as_ref().and_then(|input| input.state.clone());
1029 let selected_messages_snapshot =
1030 ag_ui_input.as_ref().and_then(messages_snapshot_from_ag_ui_input);
1031 let selected_activity_events =
1032 ag_ui_input.as_ref().map(activity_events_from_ag_ui_input).unwrap_or_default();
1033
1034 Box::pin(async_stream::stream! {
1035 let native_ag_ui = profile == UiProfile::AgUi && transport == UiTransportMode::ProtocolNative;
1036 let mut started = false;
1037 let mut active_run_id = ag_ui_input.as_ref().and_then(|input| input.run_id.clone());
1038
1039 while let Some(item) = event_stream.next().await {
1040 match item {
1041 Ok(event) => {
1042 if native_ag_ui {
1043 let run_id = active_run_id
1044 .get_or_insert_with(|| event.invocation_id.clone())
1045 .clone();
1046 if !started {
1047 let mut started_event = json!({
1048 "type": "RUN_STARTED",
1049 "threadId": selected_thread_id,
1050 "runId": run_id,
1051 });
1052 if let Some(parent_run_id) = selected_parent_run_id.clone() {
1053 if let Some(object) = started_event.as_object_mut() {
1054 object.insert("parentRunId".to_string(), Value::String(parent_run_id));
1055 }
1056 }
1057 if let Some(run_input) = selected_run_input.clone() {
1058 if let Ok(value) = serde_json::to_value(run_input) {
1059 if let Some(object) = started_event.as_object_mut() {
1060 object.insert("input".to_string(), value);
1061 }
1062 }
1063 }
1064 yield Ok(Event::default().data(started_event.to_string()));
1065
1066 if let Some(snapshot) = selected_initial_state.clone() {
1067 yield Ok(Event::default().data(json!({
1068 "type": "STATE_SNAPSHOT",
1069 "snapshot": snapshot,
1070 }).to_string()));
1071 }
1072 if let Some(messages) = selected_messages_snapshot.clone() {
1073 yield Ok(Event::default().data(json!({
1074 "type": "MESSAGES_SNAPSHOT",
1075 "messages": messages,
1076 }).to_string()));
1077 }
1078 for activity_event in selected_activity_events.clone() {
1079 yield Ok(Event::default().data(activity_event.to_string()));
1080 }
1081 started = true;
1082 }
1083
1084 for payload in direct_ag_ui_events(&event, &selected_thread_id, &run_id) {
1085 yield Ok(Event::default().data(payload));
1086 }
1087 } else if let Some(payload) = serialize_runtime_event(&event, profile) {
1088 yield Ok(Event::default().data(payload));
1089 }
1090 }
1091 Err(error) => {
1092 if native_ag_ui {
1093 let run_id =
1094 active_run_id.unwrap_or_else(|| format!("run-{}", Uuid::new_v4()));
1095 if !started {
1096 yield Ok(Event::default().data(json!({
1097 "type": "RUN_STARTED",
1098 "threadId": selected_thread_id,
1099 "runId": run_id,
1100 }).to_string()));
1101 }
1102 yield Ok(Event::default().data(json!({
1103 "type": "RUN_ERROR",
1104 "threadId": selected_thread_id,
1105 "runId": run_id,
1106 "message": error.to_string(),
1107 }).to_string()));
1108 }
1109 return;
1110 }
1111 }
1112 }
1113
1114 if native_ag_ui {
1115 let run_id = active_run_id.unwrap_or_else(|| format!("run-{}", Uuid::new_v4()));
1116 if !started {
1117 yield Ok(Event::default().data(json!({
1118 "type": "RUN_STARTED",
1119 "threadId": selected_thread_id,
1120 "runId": run_id,
1121 }).to_string()));
1122 }
1123 yield Ok(Event::default().data(json!({
1124 "type": "RUN_FINISHED",
1125 "threadId": selected_thread_id,
1126 "runId": run_id,
1127 }).to_string()));
1128 }
1129 })
1130}
1131
1132pub async fn run_sse(
1133 State(controller): State<RuntimeController>,
1134 Path((app_name, user_id, session_id)): Path<(String, String, String)>,
1135 headers: HeaderMap,
1136 Json(req): Json<RunRequest>,
1137) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, RuntimeError> {
1138 let ui_profile = resolve_ui_profile(&headers, infer_run_request_protocol(&req))?;
1139 let transport = resolve_ui_transport(&headers, req.ui_transport.as_deref())?;
1140 validate_transport_support(ui_profile, transport)?;
1141 let span = tracing::info_span!("run_sse", session_id = %session_id, app_name = %app_name, user_id = %user_id);
1142
1143 async move {
1144 log_profile_deprecation(ui_profile);
1145 info!(
1146 ui_protocol = %ui_profile.as_str(),
1147 ui_transport = ?transport,
1148 "resolved ui protocol profile for runtime request"
1149 );
1150
1151 let request_context = extract_request_context(
1155 controller.config.request_context_extractor.as_deref(),
1156 &headers,
1157 )
1158 .await?;
1159
1160 let effective_user_id = request_context.as_ref().map_or(user_id, |rc| rc.user_id.clone());
1166
1167 controller
1169 .config
1170 .session_service
1171 .get(adk_session::GetRequest {
1172 app_name: app_name.clone(),
1173 user_id: effective_user_id.clone(),
1174 session_id: session_id.clone(),
1175 num_recent_events: None,
1176 after: None,
1177 })
1178 .await
1179 .map_err(|_| (StatusCode::NOT_FOUND, "session not found".to_string()))?;
1180
1181 let agent = controller
1183 .config
1184 .agent_loader
1185 .load_agent(&app_name)
1186 .await
1187 .map_err(adk_err_to_runtime)?;
1188
1189 let runner = adk_runner::Runner::new(adk_runner::RunnerConfig {
1191 app_name: app_name.clone(),
1192 agent,
1193 session_service: controller.config.session_service.clone(),
1194 artifact_service: controller.config.artifact_service.clone(),
1195 memory_service: controller.config.memory_service.clone(),
1196 plugin_manager: None,
1197 run_config: None,
1198 compaction_config: controller.config.compaction_config.clone(),
1199 context_cache_config: controller.config.context_cache_config.clone(),
1200 cache_capable: controller.config.cache_capable.clone(),
1201 request_context,
1202 cancellation_token: None,
1203 })
1204 .map_err(adk_err_to_runtime)?;
1205
1206 let content = build_content_with_attachments(&req.new_message, &req.attachments)?;
1208
1209 if !req.attachments.is_empty() {
1211 info!(attachment_count = req.attachments.len(), "processing request with attachments");
1212 }
1213
1214 let typed_user_id =
1216 UserId::new(effective_user_id).map_err(|err| adk_err_to_runtime(err.into()))?;
1217 let typed_session_id =
1218 SessionId::new(session_id.clone()).map_err(|err| adk_err_to_runtime(err.into()))?;
1219 let event_stream = runner
1220 .run(typed_user_id, typed_session_id, content)
1221 .await
1222 .map_err(adk_err_to_runtime)?;
1223
1224 let sse_stream =
1226 build_runtime_sse_stream(event_stream, ui_profile, transport, session_id.clone(), None);
1227
1228 Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
1229 }
1230 .instrument(span)
1231 .await
1232}
1233
1234pub async fn run_sse_compat(
1237 State(controller): State<RuntimeController>,
1238 headers: HeaderMap,
1239 Json(req): Json<RunSseRequest>,
1240) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, RuntimeError> {
1241 let ui_profile = resolve_ui_profile(&headers, infer_sse_request_protocol(&req))?;
1242 let transport = resolve_ui_transport(&headers, req.ui_transport.as_deref())?;
1243 validate_transport_support(ui_profile, transport)?;
1244 let app_name = req.app_name.clone();
1245 let user_id = req.user_id.clone();
1246 let session_id = req.session_id.clone();
1247 let ag_ui_input = ag_ui_input_from_request(&req);
1248 let mcp_apps_request = mcp_apps_request_from_request(&req);
1249 let mcp_apps_initialize = mcp_apps_initialize_from_request(&req);
1250
1251 info!(
1252 app_name = %app_name,
1253 user_id = %user_id,
1254 session_id = %session_id,
1255 ui_protocol = %ui_profile.as_str(),
1256 ui_transport = ?transport,
1257 "POST /run_sse request received"
1258 );
1259 log_profile_deprecation(ui_profile);
1260
1261 let request_context =
1265 extract_request_context(controller.config.request_context_extractor.as_deref(), &headers)
1266 .await?;
1267
1268 let effective_user_id = request_context.as_ref().map_or(user_id, |rc| rc.user_id.clone());
1274
1275 let resolved_new_message = req
1276 .new_message
1277 .clone()
1278 .or_else(|| ag_ui_input.as_ref().and_then(new_message_from_ag_ui_input))
1279 .ok_or_else(|| {
1280 (
1281 StatusCode::BAD_REQUEST,
1282 "newMessage is required unless protocol-native ag_ui input supplies a user message"
1283 .to_string(),
1284 )
1285 })?;
1286
1287 let content = build_content_from_parts(&resolved_new_message.parts)?;
1289
1290 let text_parts: Vec<_> =
1292 resolved_new_message.parts.iter().filter(|p| p.text.is_some()).collect();
1293 let data_parts: Vec<_> =
1294 resolved_new_message.parts.iter().filter(|p| p.inline_data.is_some()).collect();
1295 if !data_parts.is_empty() {
1296 info!(
1297 text_parts = text_parts.len(),
1298 inline_data_parts = data_parts.len(),
1299 "processing request with inline data"
1300 );
1301 }
1302
1303 let merged_state_delta = merge_runtime_state_delta(
1304 body_state_delta(req.state_delta.as_ref())?,
1305 ag_ui_input.as_ref().map(ag_ui_state_delta).unwrap_or_default(),
1306 );
1307
1308 let session_result = controller
1310 .config
1311 .session_service
1312 .get(adk_session::GetRequest {
1313 app_name: app_name.clone(),
1314 user_id: effective_user_id.clone(),
1315 session_id: session_id.clone(),
1316 num_recent_events: None,
1317 after: None,
1318 })
1319 .await;
1320
1321 if session_result.is_err() {
1323 controller
1324 .config
1325 .session_service
1326 .create(adk_session::CreateRequest {
1327 app_name: app_name.clone(),
1328 user_id: effective_user_id.clone(),
1329 session_id: Some(session_id.clone()),
1330 state: merged_state_delta.clone(),
1331 })
1332 .await
1333 .map_err(adk_err_to_runtime)?;
1334 } else {
1335 apply_state_delta_to_session(
1336 &controller.config.session_service,
1337 &app_name,
1338 &effective_user_id,
1339 &session_id,
1340 merged_state_delta.clone(),
1341 )
1342 .await?;
1343 }
1344
1345 if ui_profile == UiProfile::McpApps {
1346 if let Some(initialize) = mcp_apps_initialize {
1347 apply_mcp_apps_runtime_envelope(
1348 &app_name,
1349 &effective_user_id,
1350 &session_id,
1351 initialize,
1352 )?;
1353 }
1354 if let Some(request) = mcp_apps_request {
1355 apply_mcp_apps_runtime_envelope(&app_name, &effective_user_id, &session_id, request)?;
1356 }
1357 maybe_mark_mcp_ui_initialized(
1358 &app_name,
1359 &effective_user_id,
1360 &session_id,
1361 req.mcp_apps_initialized.as_ref(),
1362 )?;
1363 }
1364
1365 let agent =
1367 controller.config.agent_loader.load_agent(&app_name).await.map_err(adk_err_to_runtime)?;
1368
1369 let streaming_mode =
1371 if req.streaming { adk_core::StreamingMode::SSE } else { adk_core::StreamingMode::None };
1372
1373 let runner = adk_runner::Runner::new(adk_runner::RunnerConfig {
1374 app_name,
1375 agent,
1376 session_service: controller.config.session_service.clone(),
1377 artifact_service: controller.config.artifact_service.clone(),
1378 memory_service: controller.config.memory_service.clone(),
1379 plugin_manager: None,
1380 run_config: Some(adk_core::RunConfig { streaming_mode, ..adk_core::RunConfig::default() }),
1381 compaction_config: controller.config.compaction_config.clone(),
1382 context_cache_config: controller.config.context_cache_config.clone(),
1383 cache_capable: controller.config.cache_capable.clone(),
1384 request_context,
1385 cancellation_token: None,
1386 })
1387 .map_err(adk_err_to_runtime)?;
1388
1389 let typed_user_id =
1391 UserId::new(effective_user_id).map_err(|err| adk_err_to_runtime(err.into()))?;
1392 let typed_session_id =
1393 SessionId::new(session_id.clone()).map_err(|err| adk_err_to_runtime(err.into()))?;
1394 let event_stream =
1395 runner.run(typed_user_id, typed_session_id, content).await.map_err(adk_err_to_runtime)?;
1396
1397 let sse_stream = build_runtime_sse_stream(
1399 event_stream,
1400 ui_profile,
1401 transport,
1402 session_id.clone(),
1403 ag_ui_input,
1404 );
1405
1406 Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
1407}