Skip to main content

adk_server/rest/controllers/
runtime.rs

1use crate::ServerConfig;
2use crate::auth_bridge::{RequestContextError, RequestContextExtractor};
3use crate::rest::controllers::ui::{
4    McpUiInitializeParams, McpUiMessageParams, McpUiUpdateModelContextParams,
5    initialize_mcp_ui_bridge, mark_mcp_ui_initialized, message_mcp_ui_bridge,
6    update_mcp_ui_bridge_model_context,
7};
8use crate::ui_protocol::{
9    SUPPORTED_UI_PROTOCOLS, UI_PROTOCOL_CAPABILITIES, normalize_runtime_ui_protocol,
10};
11use adk_core::{RequestContext, SessionId, UserId};
12use axum::{
13    Json,
14    extract::{Path, State},
15    http::{HeaderMap, StatusCode},
16    response::sse::{Event, KeepAlive, Sse},
17};
18use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD};
19use futures::{StreamExt, stream::Stream};
20use serde::{Deserialize, Serialize};
21use serde_json::{Map, Value, json};
22use std::collections::HashMap;
23use std::convert::Infallible;
24use tracing::{Instrument, info, warn};
25use uuid::Uuid;
26
27fn default_streaming_true() -> bool {
28    true
29}
30
31const UI_PROTOCOL_HEADER: &str = "x-adk-ui-protocol";
32const UI_TRANSPORT_HEADER: &str = "x-adk-ui-transport";
33
34#[derive(Clone)]
35pub struct RuntimeController {
36    config: ServerConfig,
37}
38
39impl RuntimeController {
40    pub fn new(config: ServerConfig) -> Self {
41        Self { config }
42    }
43}
44
45/// Attachment structure for the legacy /run endpoint
46#[derive(Serialize, Deserialize, Debug)]
47pub struct Attachment {
48    pub name: String,
49    #[serde(rename = "type")]
50    pub mime_type: String,
51    pub base64: String,
52}
53
54#[derive(Serialize, Deserialize)]
55pub struct RunRequest {
56    pub new_message: String,
57    #[serde(default, alias = "uiProtocol")]
58    pub ui_protocol: Option<String>,
59    #[serde(default)]
60    pub protocol: Option<String>,
61    #[serde(default, alias = "ui_transport")]
62    pub ui_transport: Option<String>,
63    #[serde(default)]
64    pub attachments: Vec<Attachment>,
65}
66
67/// Request format for /run_sse (adk-go compatible)
68#[derive(Serialize, Deserialize, Debug)]
69#[serde(rename_all = "camelCase")]
70pub struct RunSseRequest {
71    pub app_name: String,
72    pub user_id: String,
73    pub session_id: String,
74    #[serde(default)]
75    pub new_message: Option<NewMessage>,
76    #[serde(default = "default_streaming_true")]
77    pub streaming: bool,
78    #[serde(default)]
79    pub state_delta: Option<Value>,
80    #[serde(default, alias = "ui_protocol")]
81    pub ui_protocol: Option<String>,
82    #[serde(default)]
83    pub protocol: Option<String>,
84    #[serde(default, alias = "ui_transport")]
85    pub ui_transport: Option<String>,
86    #[serde(default)]
87    pub input: Option<AgUiRunInput>,
88    #[serde(default)]
89    pub ag_ui_input: Option<AgUiRunInput>,
90    #[serde(default)]
91    pub ag_ui_compatibility_event: Option<Value>,
92    #[serde(default)]
93    pub protocol_envelope: Option<Value>,
94    #[serde(default)]
95    pub mcp_apps_request: Option<McpAppsRuntimeEnvelope>,
96    #[serde(default)]
97    pub mcp_apps_initialize: Option<McpAppsRuntimeEnvelope>,
98    #[serde(default)]
99    pub mcp_apps_initialized: Option<Value>,
100    #[serde(default)]
101    pub method: Option<String>,
102    #[serde(default)]
103    pub params: Option<Value>,
104}
105
106#[derive(Serialize, Deserialize, Debug, Clone, Default)]
107#[serde(rename_all = "camelCase")]
108pub struct AgUiInputMessage {
109    #[serde(default)]
110    pub id: Option<String>,
111    #[serde(default)]
112    pub role: Option<String>,
113    #[serde(default)]
114    pub name: Option<String>,
115    #[serde(default)]
116    pub activity_type: Option<String>,
117    #[serde(default)]
118    pub content: Option<Value>,
119    #[serde(default)]
120    pub replace: Option<bool>,
121    #[serde(default)]
122    pub patch: Option<Vec<Value>>,
123}
124
125#[derive(Serialize, Deserialize, Debug, Clone, Default)]
126#[serde(rename_all = "camelCase")]
127pub struct AgUiRunInput {
128    #[serde(default)]
129    pub thread_id: Option<String>,
130    #[serde(default)]
131    pub run_id: Option<String>,
132    #[serde(default)]
133    pub parent_run_id: Option<String>,
134    #[serde(default)]
135    pub state: Option<Value>,
136    #[serde(default)]
137    pub messages: Vec<AgUiInputMessage>,
138    #[serde(default)]
139    pub tools: Vec<Value>,
140    #[serde(default)]
141    pub context: Vec<Value>,
142    #[serde(default)]
143    pub forwarded_props: Option<Value>,
144}
145
146#[derive(Serialize, Deserialize, Debug, Clone)]
147#[serde(rename_all = "camelCase")]
148pub struct McpAppsRuntimeEnvelope {
149    pub method: String,
150    #[serde(default)]
151    pub params: Option<Value>,
152}
153
154#[derive(Serialize, Deserialize, Debug, Clone)]
155pub struct NewMessage {
156    pub role: String,
157    pub parts: Vec<MessagePart>,
158}
159
160#[derive(Serialize, Deserialize, Debug, Clone)]
161pub struct MessagePart {
162    #[serde(default)]
163    pub text: Option<String>,
164    #[serde(default, rename = "inlineData")]
165    pub inline_data: Option<InlineData>,
166}
167
168#[derive(Serialize, Deserialize, Debug, Clone)]
169#[serde(rename_all = "camelCase")]
170pub struct InlineData {
171    pub display_name: Option<String>,
172    pub data: String,
173    pub mime_type: String,
174}
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177enum UiProfile {
178    AdkUi,
179    A2ui,
180    AgUi,
181    McpApps,
182}
183
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185enum UiTransportMode {
186    LegacyWrapper,
187    ProtocolNative,
188}
189
190impl UiProfile {
191    fn as_str(self) -> &'static str {
192        match self {
193            Self::AdkUi => "adk_ui",
194            Self::A2ui => "a2ui",
195            Self::AgUi => "ag_ui",
196            Self::McpApps => "mcp_apps",
197        }
198    }
199}
200
201type RuntimeError = (StatusCode, String);
202
203/// Convert an `AdkError` into a `RuntimeError` using the structured error envelope.
204///
205/// Uses `AdkError::http_status_code()` for the HTTP status and
206/// `AdkError::to_problem_json()` for the response body. The problem JSON
207/// includes `retry_after_ms` when the error carries retry guidance.
208fn adk_err_to_runtime(err: adk_core::AdkError) -> RuntimeError {
209    let status =
210        StatusCode::from_u16(err.http_status_code()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
211    let body = err.to_problem_json().to_string();
212    (status, body)
213}
214
215fn parse_ui_profile(raw: &str) -> Option<UiProfile> {
216    match normalize_runtime_ui_protocol(raw)? {
217        "adk_ui" => Some(UiProfile::AdkUi),
218        "a2ui" => Some(UiProfile::A2ui),
219        "ag_ui" => Some(UiProfile::AgUi),
220        "mcp_apps" => Some(UiProfile::McpApps),
221        _ => None,
222    }
223}
224
225fn resolve_ui_profile(
226    headers: &HeaderMap,
227    body_ui_protocol: Option<&str>,
228) -> Result<UiProfile, RuntimeError> {
229    let header_value = headers.get(UI_PROTOCOL_HEADER).and_then(|v| v.to_str().ok());
230    let candidate = header_value.or(body_ui_protocol);
231
232    let Some(raw) = candidate else {
233        return Ok(UiProfile::AdkUi);
234    };
235
236    parse_ui_profile(raw).ok_or_else(|| {
237        let supported = SUPPORTED_UI_PROTOCOLS.join(", ");
238        warn!(
239            requested = %raw,
240            header = %UI_PROTOCOL_HEADER,
241            "unsupported ui protocol requested"
242        );
243        (
244            StatusCode::BAD_REQUEST,
245            format!("Unsupported ui protocol '{}'. Supported profiles: {}", raw, supported),
246        )
247    })
248}
249
250fn parse_ui_transport(raw: &str) -> Option<UiTransportMode> {
251    match raw.trim().to_ascii_lowercase().as_str() {
252        "legacy" | "legacy_wrapper" => Some(UiTransportMode::LegacyWrapper),
253        "native" | "protocol_native" => Some(UiTransportMode::ProtocolNative),
254        _ => None,
255    }
256}
257
258fn resolve_ui_transport(
259    headers: &HeaderMap,
260    body_ui_transport: Option<&str>,
261) -> Result<UiTransportMode, RuntimeError> {
262    let header_value = headers.get(UI_TRANSPORT_HEADER).and_then(|v| v.to_str().ok());
263    let candidate = header_value.or(body_ui_transport);
264
265    let Some(raw) = candidate else {
266        return Ok(UiTransportMode::LegacyWrapper);
267    };
268
269    parse_ui_transport(raw).ok_or_else(|| {
270        warn!(
271            requested = %raw,
272            header = %UI_TRANSPORT_HEADER,
273            "unsupported ui transport requested"
274        );
275        (
276            StatusCode::BAD_REQUEST,
277            format!(
278                "Unsupported ui transport '{}'. Supported values: legacy_wrapper, protocol_native",
279                raw
280            ),
281        )
282    })
283}
284
285fn validate_transport_support(
286    profile: UiProfile,
287    transport: UiTransportMode,
288) -> Result<(), RuntimeError> {
289    if transport == UiTransportMode::ProtocolNative && profile != UiProfile::AgUi {
290        return Err((
291            StatusCode::BAD_REQUEST,
292            "protocol_native transport is currently available only for ag_ui; use the MCP Apps bridge endpoints for mcp_apps".to_string(),
293        ));
294    }
295    Ok(())
296}
297
298fn protocol_from_envelope(envelope: &Value) -> Option<&str> {
299    envelope.as_object().and_then(|object| object.get("protocol")).and_then(|value| value.as_str())
300}
301
302fn serialize_runtime_event(event: &adk_core::Event, profile: UiProfile) -> Option<String> {
303    if profile == UiProfile::AdkUi {
304        return serde_json::to_string(event).ok();
305    }
306
307    serde_json::to_string(&json!({
308        "ui_protocol": profile.as_str(),
309        "event": event
310    }))
311    .ok()
312}
313
314fn infer_sse_request_protocol(req: &RunSseRequest) -> Option<&str> {
315    req.ui_protocol
316        .as_deref()
317        .or(req.protocol.as_deref())
318        .or_else(|| req.protocol_envelope.as_ref().and_then(protocol_from_envelope))
319        .or_else(|| req.ag_ui_input.as_ref().map(|_| "ag_ui"))
320        .or_else(|| req.input.as_ref().map(|_| "ag_ui"))
321        .or_else(|| req.mcp_apps_request.as_ref().map(|_| "mcp_apps"))
322        .or_else(|| req.mcp_apps_initialize.as_ref().map(|_| "mcp_apps"))
323}
324
325fn infer_run_request_protocol(req: &RunRequest) -> Option<&str> {
326    req.ui_protocol.as_deref().or(req.protocol.as_deref())
327}
328
329fn ag_ui_input_from_request(req: &RunSseRequest) -> Option<AgUiRunInput> {
330    req.ag_ui_input.clone().or_else(|| req.input.clone()).or_else(|| {
331        let envelope = req.protocol_envelope.as_ref()?;
332        if protocol_from_envelope(envelope)? != "ag_ui" {
333            return None;
334        }
335        envelope
336            .as_object()
337            .and_then(|object| object.get("input"))
338            .and_then(|value| serde_json::from_value(value.clone()).ok())
339    })
340}
341
342fn mcp_apps_request_from_request(req: &RunSseRequest) -> Option<McpAppsRuntimeEnvelope> {
343    req.mcp_apps_request.clone().or_else(|| {
344        if let Some(method) = req.method.clone() {
345            return Some(McpAppsRuntimeEnvelope { method, params: req.params.clone() });
346        }
347        let envelope = req.protocol_envelope.as_ref()?;
348        if protocol_from_envelope(envelope)? != "mcp_apps" {
349            return None;
350        }
351        let object = envelope.as_object()?;
352        let method = object.get("method")?.as_str()?.to_string();
353        let params = object.get("params").cloned();
354        Some(McpAppsRuntimeEnvelope { method, params })
355    })
356}
357
358fn mcp_apps_initialize_from_request(req: &RunSseRequest) -> Option<McpAppsRuntimeEnvelope> {
359    req.mcp_apps_initialize.clone()
360}
361
362fn extract_text_segments(value: &Value) -> Vec<String> {
363    match value {
364        Value::String(text) => {
365            let trimmed = text.trim();
366            if trimmed.is_empty() { vec![] } else { vec![trimmed.to_string()] }
367        }
368        Value::Array(items) => items
369            .iter()
370            .flat_map(|item| {
371                if let Some(text) = item
372                    .as_object()
373                    .and_then(|object| object.get("text"))
374                    .and_then(|text| text.as_str())
375                {
376                    let trimmed = text.trim();
377                    if !trimmed.is_empty() {
378                        return vec![trimmed.to_string()];
379                    }
380                }
381                vec![]
382            })
383            .collect(),
384        Value::Object(object) => object
385            .get("text")
386            .and_then(|text| text.as_str())
387            .map(|text| text.trim().to_string())
388            .filter(|text| !text.is_empty())
389            .into_iter()
390            .collect(),
391        _ => vec![],
392    }
393}
394
395fn new_message_from_ag_ui_input(input: &AgUiRunInput) -> Option<NewMessage> {
396    let selected = input
397        .messages
398        .iter()
399        .rev()
400        .find(|message| message.role.as_deref().unwrap_or("user") == "user")
401        .or_else(|| input.messages.last())?;
402
403    let content = selected.content.as_ref()?;
404    let parts: Vec<MessagePart> = extract_text_segments(content)
405        .into_iter()
406        .map(|text| MessagePart { text: Some(text), inline_data: None })
407        .collect();
408    if parts.is_empty() {
409        return None;
410    }
411
412    Some(NewMessage { role: selected.role.clone().unwrap_or_else(|| "user".to_string()), parts })
413}
414
415fn activity_content_snapshot(value: Option<&Value>) -> Value {
416    match value.cloned() {
417        Some(Value::Object(object)) => Value::Object(object),
418        Some(other) => json!({ "value": other }),
419        None => json!({}),
420    }
421}
422
423fn activity_message_id(message: &AgUiInputMessage) -> String {
424    message.id.clone().unwrap_or_else(|| format!("activity-{}", Uuid::new_v4()))
425}
426
427fn activity_message_type(message: &AgUiInputMessage) -> String {
428    message
429        .activity_type
430        .clone()
431        .or_else(|| message.name.clone())
432        .unwrap_or_else(|| "CUSTOM".to_string())
433}
434
435fn activity_events_from_ag_ui_input(input: &AgUiRunInput) -> Vec<Value> {
436    input
437        .messages
438        .iter()
439        .filter(|message| message.role.as_deref() == Some("activity"))
440        .map(|message| {
441            let timestamp = chrono::Utc::now().timestamp_millis().max(0) as u64;
442            let message_id = activity_message_id(message);
443            let activity_type = activity_message_type(message);
444            if let Some(patch) = &message.patch {
445                json!({
446                    "type": "ACTIVITY_DELTA",
447                    "messageId": message_id,
448                    "activityType": activity_type,
449                    "patch": patch,
450                    "timestamp": timestamp,
451                })
452            } else {
453                let mut event = json!({
454                    "type": "ACTIVITY_SNAPSHOT",
455                    "messageId": message_id,
456                    "activityType": activity_type,
457                    "content": activity_content_snapshot(message.content.as_ref()),
458                    "timestamp": timestamp,
459                });
460                if let Some(replace) = message.replace {
461                    if let Some(object) = event.as_object_mut() {
462                        object.insert("replace".to_string(), Value::Bool(replace));
463                    }
464                }
465                event
466            }
467        })
468        .collect()
469}
470
471fn messages_snapshot_from_ag_ui_input(input: &AgUiRunInput) -> Option<Value> {
472    if input.messages.is_empty() {
473        return None;
474    }
475
476    let filtered: Vec<AgUiInputMessage> = input
477        .messages
478        .iter()
479        .filter(|message| !(message.role.as_deref() == Some("activity") && message.patch.is_some()))
480        .cloned()
481        .collect();
482    if filtered.is_empty() {
483        return None;
484    }
485
486    serde_json::to_value(filtered).ok()
487}
488
489fn object_entries_to_state_delta(object: &Map<String, Value>) -> HashMap<String, Value> {
490    object.iter().map(|(key, value)| (key.clone(), value.clone())).collect()
491}
492
493fn ag_ui_state_delta(input: &AgUiRunInput) -> HashMap<String, Value> {
494    let mut delta = HashMap::new();
495
496    if let Some(state) = input.state.clone() {
497        match state {
498            Value::Object(object) => {
499                delta.extend(object_entries_to_state_delta(&object));
500            }
501            value => {
502                delta.insert("temp:ag_ui_state".to_string(), value);
503            }
504        }
505    }
506
507    if !input.messages.is_empty() {
508        if let Ok(value) = serde_json::to_value(&input.messages) {
509            delta.insert("temp:ag_ui_messages".to_string(), value);
510        }
511    }
512    if !input.tools.is_empty() {
513        delta.insert("temp:ag_ui_tools".to_string(), Value::Array(input.tools.clone()));
514    }
515    if !input.context.is_empty() {
516        delta.insert("temp:ag_ui_context".to_string(), Value::Array(input.context.clone()));
517    }
518    if let Some(forwarded_props) = input.forwarded_props.clone() {
519        delta.insert("temp:ag_ui_forwarded_props".to_string(), forwarded_props);
520    }
521
522    delta
523}
524
525fn body_state_delta(value: Option<&Value>) -> Result<HashMap<String, Value>, RuntimeError> {
526    let Some(value) = value else {
527        return Ok(HashMap::new());
528    };
529    let object = value.as_object().ok_or_else(|| {
530        (StatusCode::BAD_REQUEST, "stateDelta must be a JSON object when provided".to_string())
531    })?;
532    Ok(object_entries_to_state_delta(object))
533}
534
535fn log_profile_deprecation(profile: UiProfile) {
536    if profile != UiProfile::AdkUi {
537        return;
538    }
539    let Some(spec) = UI_PROTOCOL_CAPABILITIES
540        .iter()
541        .find(|capability| capability.protocol == profile.as_str())
542        .and_then(|capability| capability.deprecation)
543    else {
544        return;
545    };
546
547    warn!(
548        protocol = %profile.as_str(),
549        stage = %spec.stage,
550        announced_on = %spec.announced_on,
551        sunset_target_on = ?spec.sunset_target_on,
552        replacements = ?spec.replacement_protocols,
553        "legacy ui protocol profile selected"
554    );
555}
556
557/// Build Content from message text and attachments
558fn build_content_with_attachments(
559    text: &str,
560    attachments: &[Attachment],
561) -> Result<adk_core::Content, RuntimeError> {
562    let mut content = adk_core::Content::new("user");
563
564    // Add the text part
565    content.parts.push(adk_core::Part::Text { text: text.to_string() });
566
567    // Add attachment parts
568    for attachment in attachments {
569        match BASE64_STANDARD.decode(&attachment.base64) {
570            Ok(data) => {
571                if data.len() > adk_core::MAX_INLINE_DATA_SIZE {
572                    return Err((
573                        StatusCode::PAYLOAD_TOO_LARGE,
574                        format!(
575                            "Attachment '{}' exceeds max inline size of {} bytes",
576                            attachment.name,
577                            adk_core::MAX_INLINE_DATA_SIZE
578                        ),
579                    ));
580                }
581                content.parts.push(adk_core::Part::InlineData {
582                    mime_type: attachment.mime_type.clone(),
583                    data,
584                });
585            }
586            Err(e) => {
587                return Err((
588                    StatusCode::BAD_REQUEST,
589                    format!("Invalid base64 data for attachment '{}': {}", attachment.name, e),
590                ));
591            }
592        }
593    }
594
595    Ok(content)
596}
597
598/// Build Content from message parts (for /run_sse endpoint)
599fn build_content_from_parts(parts: &[MessagePart]) -> Result<adk_core::Content, RuntimeError> {
600    let mut content = adk_core::Content::new("user");
601
602    for part in parts {
603        // Add text part if present
604        if let Some(text) = &part.text {
605            content.parts.push(adk_core::Part::Text { text: text.clone() });
606        }
607
608        // Add inline data part if present
609        if let Some(inline_data) = &part.inline_data {
610            match BASE64_STANDARD.decode(&inline_data.data) {
611                Ok(data) => {
612                    if data.len() > adk_core::MAX_INLINE_DATA_SIZE {
613                        return Err((
614                            StatusCode::PAYLOAD_TOO_LARGE,
615                            format!(
616                                "inline_data exceeds max inline size of {} bytes",
617                                adk_core::MAX_INLINE_DATA_SIZE
618                            ),
619                        ));
620                    }
621                    content.parts.push(adk_core::Part::InlineData {
622                        mime_type: inline_data.mime_type.clone(),
623                        data,
624                    });
625                }
626                Err(e) => {
627                    return Err((
628                        StatusCode::BAD_REQUEST,
629                        format!("Invalid base64 data in inline_data: {}", e),
630                    ));
631                }
632            }
633        }
634    }
635
636    Ok(content)
637}
638
639async fn apply_state_delta_to_session(
640    session_service: &std::sync::Arc<dyn adk_session::SessionService>,
641    app_name: &str,
642    user_id: &str,
643    session_id: &str,
644    state_delta: HashMap<String, Value>,
645) -> Result<(), RuntimeError> {
646    if state_delta.is_empty() {
647        return Ok(());
648    }
649
650    let identity = adk_core::AdkIdentity::new(
651        adk_core::AppName::try_from(app_name).map_err(|error| {
652            (
653                StatusCode::BAD_REQUEST,
654                format!("invalid app_name for state delta application: {}", error),
655            )
656        })?,
657        adk_core::UserId::try_from(user_id).map_err(|error| {
658            (
659                StatusCode::BAD_REQUEST,
660                format!("invalid user_id for state delta application: {}", error),
661            )
662        })?,
663        adk_core::SessionId::try_from(session_id).map_err(|error| {
664            (
665                StatusCode::BAD_REQUEST,
666                format!("invalid session_id for state delta application: {}", error),
667            )
668        })?,
669    );
670
671    let mut event = adk_core::Event::new(format!("ui-input-{}", Uuid::new_v4()));
672    event.author = "ui_protocol_bridge".to_string();
673    event.actions.state_delta = state_delta;
674    session_service
675        .append_event_for_identity(adk_session::AppendEventRequest { identity, event })
676        .await
677        .map_err(adk_err_to_runtime)
678}
679
680fn merge_runtime_state_delta(
681    body_delta: HashMap<String, Value>,
682    ag_ui_delta: HashMap<String, Value>,
683) -> HashMap<String, Value> {
684    let mut merged = body_delta;
685    merged.extend(ag_ui_delta);
686    merged
687}
688
689fn json_pointer_escape(segment: &str) -> String {
690    segment.replace('~', "~0").replace('/', "~1")
691}
692
693fn state_delta_to_json_patch(delta: &HashMap<String, Value>) -> Vec<Value> {
694    delta
695        .iter()
696        .map(|(key, value)| {
697            json!({
698                "op": "add",
699                "path": format!("/{}", json_pointer_escape(key)),
700                "value": value
701            })
702        })
703        .collect()
704}
705
706fn timestamp_millis(event: &adk_core::Event) -> u64 {
707    event.timestamp.timestamp_millis().max(0) as u64
708}
709
710fn serialize_ag_ui_tool_call_delta(args: &Value, allow_raw_string_delta: bool) -> String {
711    if allow_raw_string_delta {
712        if let Value::String(delta) = args {
713            return delta.clone();
714        }
715    }
716
717    serde_json::to_string(args).unwrap_or_else(|_| args.to_string())
718}
719
720fn translate_ag_ui_event(event: &adk_core::Event, thread_id: &str, run_id: &str) -> Vec<Value> {
721    let mut translated = Vec::new();
722    let timestamp = timestamp_millis(event);
723    let is_partial = event.llm_response.partial;
724
725    if !event.actions.state_delta.is_empty() {
726        translated.push(json!({
727            "type": "STATE_DELTA",
728            "delta": state_delta_to_json_patch(&event.actions.state_delta),
729            "timestamp": timestamp,
730        }));
731    }
732
733    if let Some(message) = event.llm_response.error_message.clone() {
734        translated.push(json!({
735            "type": "RUN_ERROR",
736            "threadId": thread_id,
737            "runId": run_id,
738            "message": message,
739            "code": event.llm_response.error_code,
740            "timestamp": timestamp,
741        }));
742    }
743
744    let Some(content) = &event.llm_response.content else {
745        return translated;
746    };
747
748    for (index, part) in content.parts.iter().enumerate() {
749        match part {
750            adk_core::Part::Text { text } if !text.trim().is_empty() => {
751                let message_id = format!("{}-text-{}", event.id, index);
752                if is_partial {
753                    translated.push(json!({
754                        "type": "TEXT_MESSAGE_CHUNK",
755                        "messageId": message_id,
756                        "role": "assistant",
757                        "delta": text,
758                        "timestamp": timestamp,
759                    }));
760                } else {
761                    translated.push(json!({
762                        "type": "TEXT_MESSAGE_START",
763                        "messageId": message_id,
764                        "role": "assistant",
765                        "timestamp": timestamp,
766                    }));
767                    translated.push(json!({
768                        "type": "TEXT_MESSAGE_CONTENT",
769                        "messageId": format!("{}-text-{}", event.id, index),
770                        "delta": text,
771                        "timestamp": timestamp,
772                    }));
773                    translated.push(json!({
774                        "type": "TEXT_MESSAGE_END",
775                        "messageId": format!("{}-text-{}", event.id, index),
776                        "timestamp": timestamp,
777                    }));
778                }
779            }
780            adk_core::Part::Thinking { thinking, .. } if !thinking.trim().is_empty() => {
781                let message_id = format!("{}-reasoning-{}", event.id, index);
782                if is_partial {
783                    translated.push(json!({
784                        "type": "REASONING_MESSAGE_CHUNK",
785                        "messageId": message_id,
786                        "delta": thinking,
787                        "timestamp": timestamp,
788                    }));
789                } else {
790                    let reasoning_id = format!("{}-reasoning-phase-{}", event.id, index);
791                    translated.push(json!({
792                        "type": "REASONING_START",
793                        "messageId": reasoning_id,
794                        "timestamp": timestamp,
795                    }));
796                    translated.push(json!({
797                        "type": "REASONING_MESSAGE_START",
798                        "messageId": message_id,
799                        "role": "assistant",
800                        "timestamp": timestamp,
801                    }));
802                    translated.push(json!({
803                        "type": "REASONING_MESSAGE_CONTENT",
804                        "messageId": format!("{}-reasoning-{}", event.id, index),
805                        "delta": thinking,
806                        "timestamp": timestamp,
807                    }));
808                    translated.push(json!({
809                        "type": "REASONING_MESSAGE_END",
810                        "messageId": format!("{}-reasoning-{}", event.id, index),
811                        "timestamp": timestamp,
812                    }));
813                    translated.push(json!({
814                        "type": "REASONING_END",
815                        "messageId": reasoning_id,
816                        "timestamp": timestamp,
817                    }));
818                }
819            }
820            adk_core::Part::FunctionCall { name, args, id, .. } => {
821                let tool_call_id =
822                    id.clone().unwrap_or_else(|| format!("{}-tool-call-{}", event.id, index));
823                let raw_chunk_supported = is_partial && matches!(args, Value::String(_));
824                let args_delta = serialize_ag_ui_tool_call_delta(args, raw_chunk_supported);
825                if raw_chunk_supported {
826                    translated.push(json!({
827                        "type": "TOOL_CALL_CHUNK",
828                        "toolCallId": tool_call_id,
829                        "toolCallName": name,
830                        "delta": args_delta,
831                        "timestamp": timestamp,
832                    }));
833                } else {
834                    translated.push(json!({
835                        "type": "TOOL_CALL_START",
836                        "toolCallId": tool_call_id,
837                        "toolCallName": name,
838                        "timestamp": timestamp,
839                    }));
840                    translated.push(json!({
841                        "type": "TOOL_CALL_ARGS",
842                        "toolCallId": id.clone().unwrap_or_else(|| format!("{}-tool-call-{}", event.id, index)),
843                        "delta": args_delta,
844                        "timestamp": timestamp,
845                    }));
846                    translated.push(json!({
847                        "type": "TOOL_CALL_END",
848                        "toolCallId": id.clone().unwrap_or_else(|| format!("{}-tool-call-{}", event.id, index)),
849                        "timestamp": timestamp,
850                    }));
851                }
852            }
853            adk_core::Part::FunctionResponse { function_response, id } => {
854                let tool_call_id =
855                    id.clone().unwrap_or_else(|| format!("{}-tool-result-{}", event.id, index));
856                let response_content = serde_json::to_string(&function_response.response)
857                    .unwrap_or_else(|_| function_response.response.to_string());
858                translated.push(json!({
859                    "type": "TOOL_CALL_RESULT",
860                    "messageId": format!("msg-{}", tool_call_id),
861                    "toolCallId": tool_call_id,
862                    "toolCallName": function_response.name,
863                    "content": response_content,
864                    "role": "tool",
865                    "timestamp": timestamp,
866                }));
867            }
868            _ => {}
869        }
870    }
871
872    translated
873}
874
875/// Extract [`RequestContext`] from the configured extractor, if present.
876///
877/// Constructs minimal HTTP request [`Parts`] from the provided headers so the
878/// extractor can inspect `Authorization` and other headers. Returns `None`
879/// when no extractor is configured (fall-through to existing behavior).
880async fn extract_request_context(
881    extractor: Option<&dyn RequestContextExtractor>,
882    headers: &HeaderMap,
883) -> Result<Option<RequestContext>, RuntimeError> {
884    let Some(extractor) = extractor else {
885        return Ok(None);
886    };
887
888    // Build minimal Parts from the headers
889    let mut builder = axum::http::Request::builder();
890    for (name, value) in headers {
891        builder = builder.header(name, value);
892    }
893    let (parts, _) = builder
894        .body(())
895        .map_err(|e| {
896            (StatusCode::INTERNAL_SERVER_ERROR, format!("failed to build request parts: {e}"))
897        })?
898        .into_parts();
899
900    match extractor.extract(&parts).await {
901        Ok(ctx) => Ok(Some(ctx)),
902        Err(RequestContextError::MissingAuth) => {
903            Err((StatusCode::UNAUTHORIZED, "missing authorization".to_string()))
904        }
905        Err(RequestContextError::InvalidToken(msg)) => {
906            Err((StatusCode::UNAUTHORIZED, format!("invalid token: {msg}")))
907        }
908        Err(RequestContextError::ExtractionFailed(msg)) => {
909            Err((StatusCode::INTERNAL_SERVER_ERROR, format!("auth extraction failed: {msg}")))
910        }
911    }
912}
913
914fn bridge_params_with_identity(
915    app_name: &str,
916    user_id: &str,
917    session_id: &str,
918    params: Option<Value>,
919) -> Value {
920    let mut object = params.and_then(|value| value.as_object().cloned()).unwrap_or_default();
921    object.insert("appName".to_string(), Value::String(app_name.to_string()));
922    object.insert("userId".to_string(), Value::String(user_id.to_string()));
923    object.insert("sessionId".to_string(), Value::String(session_id.to_string()));
924    Value::Object(object)
925}
926
927fn deserialize_bridge_params<T: for<'de> Deserialize<'de>>(
928    app_name: &str,
929    user_id: &str,
930    session_id: &str,
931    params: Option<Value>,
932) -> Result<T, RuntimeError> {
933    serde_json::from_value(bridge_params_with_identity(app_name, user_id, session_id, params))
934        .map_err(|error| {
935            (StatusCode::BAD_REQUEST, format!("invalid protocol-native bridge payload: {}", error))
936        })
937}
938
939fn maybe_mark_mcp_ui_initialized(
940    app_name: &str,
941    user_id: &str,
942    session_id: &str,
943    initialized_notification: Option<&Value>,
944) -> Result<(), RuntimeError> {
945    let Some(value) = initialized_notification else {
946        return Ok(());
947    };
948    let method = value
949        .as_object()
950        .and_then(|object| object.get("method"))
951        .and_then(|value| value.as_str())
952        .unwrap_or_default();
953    if method == "ui/notifications/initialized" {
954        mark_mcp_ui_initialized(app_name, user_id, session_id)?;
955    }
956    Ok(())
957}
958
959fn apply_mcp_apps_runtime_envelope(
960    app_name: &str,
961    user_id: &str,
962    session_id: &str,
963    envelope: McpAppsRuntimeEnvelope,
964) -> Result<(), RuntimeError> {
965    match envelope.method.as_str() {
966        "ui/initialize" => {
967            let params = deserialize_bridge_params::<McpUiInitializeParams>(
968                app_name,
969                user_id,
970                session_id,
971                envelope.params,
972            )?;
973            initialize_mcp_ui_bridge(params)?;
974            Ok(())
975        }
976        "ui/message" => {
977            let params = deserialize_bridge_params::<McpUiMessageParams>(
978                app_name,
979                user_id,
980                session_id,
981                envelope.params,
982            )?;
983            message_mcp_ui_bridge(params)?;
984            Ok(())
985        }
986        "ui/update-model-context" => {
987            let params = deserialize_bridge_params::<McpUiUpdateModelContextParams>(
988                app_name,
989                user_id,
990                session_id,
991                envelope.params,
992            )?;
993            update_mcp_ui_bridge_model_context(params)?;
994            Ok(())
995        }
996        "ui/notifications/initialized" => {
997            mark_mcp_ui_initialized(app_name, user_id, session_id)?;
998            Ok(())
999        }
1000        method => Err((
1001            StatusCode::BAD_REQUEST,
1002            format!("unsupported MCP Apps runtime bridge method '{}'", method),
1003        )),
1004    }
1005}
1006
1007fn direct_ag_ui_events(event: &adk_core::Event, thread_id: &str, run_id: &str) -> Vec<String> {
1008    translate_ag_ui_event(event, thread_id, run_id)
1009        .into_iter()
1010        .filter_map(|item| serde_json::to_string(&item).ok())
1011        .collect()
1012}
1013
1014fn build_runtime_sse_stream<S>(
1015    mut event_stream: S,
1016    profile: UiProfile,
1017    transport: UiTransportMode,
1018    session_id: String,
1019    ag_ui_input: Option<AgUiRunInput>,
1020) -> std::pin::Pin<Box<dyn Stream<Item = Result<Event, Infallible>> + Send>>
1021where
1022    S: Stream<Item = adk_core::Result<adk_core::Event>> + Send + 'static + Unpin,
1023{
1024    let selected_thread_id =
1025        ag_ui_input.as_ref().and_then(|input| input.thread_id.clone()).unwrap_or(session_id);
1026    let selected_run_input = ag_ui_input.clone();
1027    let selected_parent_run_id = ag_ui_input.as_ref().and_then(|input| input.parent_run_id.clone());
1028    let selected_initial_state = ag_ui_input.as_ref().and_then(|input| input.state.clone());
1029    let selected_messages_snapshot =
1030        ag_ui_input.as_ref().and_then(messages_snapshot_from_ag_ui_input);
1031    let selected_activity_events =
1032        ag_ui_input.as_ref().map(activity_events_from_ag_ui_input).unwrap_or_default();
1033
1034    Box::pin(async_stream::stream! {
1035        let native_ag_ui = profile == UiProfile::AgUi && transport == UiTransportMode::ProtocolNative;
1036        let mut started = false;
1037        let mut active_run_id = ag_ui_input.as_ref().and_then(|input| input.run_id.clone());
1038
1039        while let Some(item) = event_stream.next().await {
1040            match item {
1041                Ok(event) => {
1042                    if native_ag_ui {
1043                        let run_id = active_run_id
1044                            .get_or_insert_with(|| event.invocation_id.clone())
1045                            .clone();
1046                        if !started {
1047                            let mut started_event = json!({
1048                                "type": "RUN_STARTED",
1049                                "threadId": selected_thread_id,
1050                                "runId": run_id,
1051                            });
1052                            if let Some(parent_run_id) = selected_parent_run_id.clone() {
1053                                if let Some(object) = started_event.as_object_mut() {
1054                                    object.insert("parentRunId".to_string(), Value::String(parent_run_id));
1055                                }
1056                            }
1057                            if let Some(run_input) = selected_run_input.clone() {
1058                                if let Ok(value) = serde_json::to_value(run_input) {
1059                                    if let Some(object) = started_event.as_object_mut() {
1060                                        object.insert("input".to_string(), value);
1061                                    }
1062                                }
1063                            }
1064                            yield Ok(Event::default().data(started_event.to_string()));
1065
1066                            if let Some(snapshot) = selected_initial_state.clone() {
1067                                yield Ok(Event::default().data(json!({
1068                                    "type": "STATE_SNAPSHOT",
1069                                    "snapshot": snapshot,
1070                                }).to_string()));
1071                            }
1072                            if let Some(messages) = selected_messages_snapshot.clone() {
1073                                yield Ok(Event::default().data(json!({
1074                                    "type": "MESSAGES_SNAPSHOT",
1075                                    "messages": messages,
1076                                }).to_string()));
1077                            }
1078                            for activity_event in selected_activity_events.clone() {
1079                                yield Ok(Event::default().data(activity_event.to_string()));
1080                            }
1081                            started = true;
1082                        }
1083
1084                        for payload in direct_ag_ui_events(&event, &selected_thread_id, &run_id) {
1085                            yield Ok(Event::default().data(payload));
1086                        }
1087                    } else if let Some(payload) = serialize_runtime_event(&event, profile) {
1088                        yield Ok(Event::default().data(payload));
1089                    }
1090                }
1091                Err(error) => {
1092                    if native_ag_ui {
1093                        let run_id =
1094                            active_run_id.unwrap_or_else(|| format!("run-{}", Uuid::new_v4()));
1095                        if !started {
1096                            yield Ok(Event::default().data(json!({
1097                                "type": "RUN_STARTED",
1098                                "threadId": selected_thread_id,
1099                                "runId": run_id,
1100                            }).to_string()));
1101                        }
1102                        yield Ok(Event::default().data(json!({
1103                            "type": "RUN_ERROR",
1104                            "threadId": selected_thread_id,
1105                            "runId": run_id,
1106                            "message": error.to_string(),
1107                        }).to_string()));
1108                    }
1109                    return;
1110                }
1111            }
1112        }
1113
1114        if native_ag_ui {
1115            let run_id = active_run_id.unwrap_or_else(|| format!("run-{}", Uuid::new_v4()));
1116            if !started {
1117                yield Ok(Event::default().data(json!({
1118                    "type": "RUN_STARTED",
1119                    "threadId": selected_thread_id,
1120                    "runId": run_id,
1121                }).to_string()));
1122            }
1123            yield Ok(Event::default().data(json!({
1124                "type": "RUN_FINISHED",
1125                "threadId": selected_thread_id,
1126                "runId": run_id,
1127            }).to_string()));
1128        }
1129    })
1130}
1131
1132pub async fn run_sse(
1133    State(controller): State<RuntimeController>,
1134    Path((app_name, user_id, session_id)): Path<(String, String, String)>,
1135    headers: HeaderMap,
1136    Json(req): Json<RunRequest>,
1137) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, RuntimeError> {
1138    let ui_profile = resolve_ui_profile(&headers, infer_run_request_protocol(&req))?;
1139    let transport = resolve_ui_transport(&headers, req.ui_transport.as_deref())?;
1140    validate_transport_support(ui_profile, transport)?;
1141    let span = tracing::info_span!("run_sse", session_id = %session_id, app_name = %app_name, user_id = %user_id);
1142
1143    async move {
1144        log_profile_deprecation(ui_profile);
1145        info!(
1146            ui_protocol = %ui_profile.as_str(),
1147            ui_transport = ?transport,
1148            "resolved ui protocol profile for runtime request"
1149        );
1150
1151        // Extract request context from auth middleware bridge if configured.
1152        // This returns Err (401/500) when the extractor is present but auth
1153        // fails, ensuring authorization checks are never bypassed.
1154        let request_context = extract_request_context(
1155            controller.config.request_context_extractor.as_deref(),
1156            &headers,
1157        )
1158        .await?;
1159
1160        // Explicit authenticated user override: when an auth extractor is
1161        // configured and succeeds, the authenticated user_id takes precedence
1162        // over the path parameter. This prevents callers from impersonating
1163        // other users via the URL while keeping the path param as a fallback
1164        // for unauthenticated deployments (no extractor configured).
1165        let effective_user_id = request_context.as_ref().map_or(user_id, |rc| rc.user_id.clone());
1166
1167        // Validate session exists
1168        controller
1169            .config
1170            .session_service
1171            .get(adk_session::GetRequest {
1172                app_name: app_name.clone(),
1173                user_id: effective_user_id.clone(),
1174                session_id: session_id.clone(),
1175                num_recent_events: None,
1176                after: None,
1177            })
1178            .await
1179            .map_err(|_| (StatusCode::NOT_FOUND, "session not found".to_string()))?;
1180
1181        // Load agent
1182        let agent = controller
1183            .config
1184            .agent_loader
1185            .load_agent(&app_name)
1186            .await
1187            .map_err(adk_err_to_runtime)?;
1188
1189        // Create runner
1190        let runner = adk_runner::Runner::new(adk_runner::RunnerConfig {
1191            app_name: app_name.clone(),
1192            agent,
1193            session_service: controller.config.session_service.clone(),
1194            artifact_service: controller.config.artifact_service.clone(),
1195            memory_service: controller.config.memory_service.clone(),
1196            plugin_manager: None,
1197            run_config: None,
1198            compaction_config: controller.config.compaction_config.clone(),
1199            context_cache_config: controller.config.context_cache_config.clone(),
1200            cache_capable: controller.config.cache_capable.clone(),
1201            request_context,
1202            cancellation_token: None,
1203        })
1204        .map_err(adk_err_to_runtime)?;
1205
1206        // Build content with attachments
1207        let content = build_content_with_attachments(&req.new_message, &req.attachments)?;
1208
1209        // Log attachment info
1210        if !req.attachments.is_empty() {
1211            info!(attachment_count = req.attachments.len(), "processing request with attachments");
1212        }
1213
1214        // Run agent
1215        let typed_user_id =
1216            UserId::new(effective_user_id).map_err(|err| adk_err_to_runtime(err.into()))?;
1217        let typed_session_id =
1218            SessionId::new(session_id.clone()).map_err(|err| adk_err_to_runtime(err.into()))?;
1219        let event_stream = runner
1220            .run(typed_user_id, typed_session_id, content)
1221            .await
1222            .map_err(adk_err_to_runtime)?;
1223
1224        // Convert to SSE stream
1225        let sse_stream =
1226            build_runtime_sse_stream(event_stream, ui_profile, transport, session_id.clone(), None);
1227
1228        Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
1229    }
1230    .instrument(span)
1231    .await
1232}
1233
1234/// POST /run_sse - adk-go compatible endpoint
1235/// Accepts JSON body with appName, userId, sessionId, newMessage
1236pub async fn run_sse_compat(
1237    State(controller): State<RuntimeController>,
1238    headers: HeaderMap,
1239    Json(req): Json<RunSseRequest>,
1240) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, RuntimeError> {
1241    let ui_profile = resolve_ui_profile(&headers, infer_sse_request_protocol(&req))?;
1242    let transport = resolve_ui_transport(&headers, req.ui_transport.as_deref())?;
1243    validate_transport_support(ui_profile, transport)?;
1244    let app_name = req.app_name.clone();
1245    let user_id = req.user_id.clone();
1246    let session_id = req.session_id.clone();
1247    let ag_ui_input = ag_ui_input_from_request(&req);
1248    let mcp_apps_request = mcp_apps_request_from_request(&req);
1249    let mcp_apps_initialize = mcp_apps_initialize_from_request(&req);
1250
1251    info!(
1252        app_name = %app_name,
1253        user_id = %user_id,
1254        session_id = %session_id,
1255        ui_protocol = %ui_profile.as_str(),
1256        ui_transport = ?transport,
1257        "POST /run_sse request received"
1258    );
1259    log_profile_deprecation(ui_profile);
1260
1261    // Extract request context from auth middleware bridge if configured.
1262    // This returns Err (401/500) when the extractor is present but auth
1263    // fails, ensuring authorization checks are never bypassed.
1264    let request_context =
1265        extract_request_context(controller.config.request_context_extractor.as_deref(), &headers)
1266            .await?;
1267
1268    // Explicit authenticated user override: when an auth extractor is
1269    // configured and succeeds, the authenticated user_id takes precedence
1270    // over the request body value. This prevents callers from impersonating
1271    // other users via the JSON payload while keeping the body param as a
1272    // fallback for unauthenticated deployments (no extractor configured).
1273    let effective_user_id = request_context.as_ref().map_or(user_id, |rc| rc.user_id.clone());
1274
1275    let resolved_new_message = req
1276        .new_message
1277        .clone()
1278        .or_else(|| ag_ui_input.as_ref().and_then(new_message_from_ag_ui_input))
1279        .ok_or_else(|| {
1280            (
1281                StatusCode::BAD_REQUEST,
1282                "newMessage is required unless protocol-native ag_ui input supplies a user message"
1283                    .to_string(),
1284            )
1285        })?;
1286
1287    // Build content from message parts (includes both text and inline_data)
1288    let content = build_content_from_parts(&resolved_new_message.parts)?;
1289
1290    // Log part info
1291    let text_parts: Vec<_> =
1292        resolved_new_message.parts.iter().filter(|p| p.text.is_some()).collect();
1293    let data_parts: Vec<_> =
1294        resolved_new_message.parts.iter().filter(|p| p.inline_data.is_some()).collect();
1295    if !data_parts.is_empty() {
1296        info!(
1297            text_parts = text_parts.len(),
1298            inline_data_parts = data_parts.len(),
1299            "processing request with inline data"
1300        );
1301    }
1302
1303    let merged_state_delta = merge_runtime_state_delta(
1304        body_state_delta(req.state_delta.as_ref())?,
1305        ag_ui_input.as_ref().map(ag_ui_state_delta).unwrap_or_default(),
1306    );
1307
1308    // Validate session exists or create it
1309    let session_result = controller
1310        .config
1311        .session_service
1312        .get(adk_session::GetRequest {
1313            app_name: app_name.clone(),
1314            user_id: effective_user_id.clone(),
1315            session_id: session_id.clone(),
1316            num_recent_events: None,
1317            after: None,
1318        })
1319        .await;
1320
1321    // If session doesn't exist, create it
1322    if session_result.is_err() {
1323        controller
1324            .config
1325            .session_service
1326            .create(adk_session::CreateRequest {
1327                app_name: app_name.clone(),
1328                user_id: effective_user_id.clone(),
1329                session_id: Some(session_id.clone()),
1330                state: merged_state_delta.clone(),
1331            })
1332            .await
1333            .map_err(adk_err_to_runtime)?;
1334    } else {
1335        apply_state_delta_to_session(
1336            &controller.config.session_service,
1337            &app_name,
1338            &effective_user_id,
1339            &session_id,
1340            merged_state_delta.clone(),
1341        )
1342        .await?;
1343    }
1344
1345    if ui_profile == UiProfile::McpApps {
1346        if let Some(initialize) = mcp_apps_initialize {
1347            apply_mcp_apps_runtime_envelope(
1348                &app_name,
1349                &effective_user_id,
1350                &session_id,
1351                initialize,
1352            )?;
1353        }
1354        if let Some(request) = mcp_apps_request {
1355            apply_mcp_apps_runtime_envelope(&app_name, &effective_user_id, &session_id, request)?;
1356        }
1357        maybe_mark_mcp_ui_initialized(
1358            &app_name,
1359            &effective_user_id,
1360            &session_id,
1361            req.mcp_apps_initialized.as_ref(),
1362        )?;
1363    }
1364
1365    // Load agent
1366    let agent =
1367        controller.config.agent_loader.load_agent(&app_name).await.map_err(adk_err_to_runtime)?;
1368
1369    // Create runner with streaming config from request
1370    let streaming_mode =
1371        if req.streaming { adk_core::StreamingMode::SSE } else { adk_core::StreamingMode::None };
1372
1373    let runner = adk_runner::Runner::new(adk_runner::RunnerConfig {
1374        app_name,
1375        agent,
1376        session_service: controller.config.session_service.clone(),
1377        artifact_service: controller.config.artifact_service.clone(),
1378        memory_service: controller.config.memory_service.clone(),
1379        plugin_manager: None,
1380        run_config: Some(adk_core::RunConfig { streaming_mode, ..adk_core::RunConfig::default() }),
1381        compaction_config: controller.config.compaction_config.clone(),
1382        context_cache_config: controller.config.context_cache_config.clone(),
1383        cache_capable: controller.config.cache_capable.clone(),
1384        request_context,
1385        cancellation_token: None,
1386    })
1387    .map_err(adk_err_to_runtime)?;
1388
1389    // Run agent with full content (text + inline data)
1390    let typed_user_id =
1391        UserId::new(effective_user_id).map_err(|err| adk_err_to_runtime(err.into()))?;
1392    let typed_session_id =
1393        SessionId::new(session_id.clone()).map_err(|err| adk_err_to_runtime(err.into()))?;
1394    let event_stream =
1395        runner.run(typed_user_id, typed_session_id, content).await.map_err(adk_err_to_runtime)?;
1396
1397    // Convert to SSE stream
1398    let sse_stream = build_runtime_sse_stream(
1399        event_stream,
1400        ui_profile,
1401        transport,
1402        session_id.clone(),
1403        ag_ui_input,
1404    );
1405
1406    Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
1407}