Skip to main content

meerkat_mobkit/runtime/
console_ingress.rs

1//! Console ingress types and JSON request/response structures.
2
3use super::*;
4use crate::rpc::MOBKIT_CONTRACT_VERSION;
5
6/// Console-facing view of a single mob member.
7///
8/// Narrow projection of meerkat's roster enriched with cached diagnostics
9/// such as the current bridge session id when the console read model has one.
10/// Wire field names match the console JSON — `agent_identity`, `role`,
11/// `state` — so the admin UI reads them unchanged from the live snapshot.
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub struct ConsoleMember {
14    pub agent_identity: String,
15    pub role: String,
16    pub state: String,
17    #[serde(default)]
18    pub model_capabilities: ConsoleModelCapabilities,
19    #[serde(default, skip_serializing_if = "Option::is_none")]
20    pub runtime_mode: Option<String>,
21    #[serde(default, skip_serializing_if = "Option::is_none")]
22    pub session_id: Option<String>,
23    pub wired_to: Vec<String>,
24    #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
25    pub labels: std::collections::BTreeMap<String, String>,
26}
27
28#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
29pub struct ConsoleModelCapabilities {
30    #[serde(default)]
31    pub image_input: bool,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct ConsoleRestJsonRequest {
36    pub method: String,
37    pub path: String,
38    pub auth: Option<ConsoleAccessRequest>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct ConsoleRestJsonResponse {
43    pub status: u16,
44    pub body: Value,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48pub struct ConsoleAgentLiveSnapshot {
49    pub agent_id: String,
50    pub member_id: String,
51    pub label: String,
52    pub kind: String,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub identity: Option<String>,
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub role: Option<String>,
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub state: Option<String>,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub session_id: Option<String>,
61    #[serde(default)]
62    pub model_capabilities: ConsoleModelCapabilities,
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub response_phase: Option<String>,
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub watched: Option<bool>,
67    #[serde(
68        default,
69        skip_serializing_if = "Option::is_none",
70        rename = "alertLevel"
71    )]
72    pub alert_level: Option<String>,
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    pub degraded: Option<bool>,
75    #[serde(
76        default,
77        skip_serializing_if = "Option::is_none",
78        rename = "degradedReason"
79    )]
80    pub degraded_reason: Option<String>,
81}
82
83#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84pub struct ConsoleLiveSnapshot {
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub runtime_id: Option<String>,
87    pub running: bool,
88    pub loaded_modules: Vec<String>,
89    #[serde(default)]
90    pub agents: Vec<ConsoleAgentLiveSnapshot>,
91    pub members: Vec<ConsoleMember>,
92    pub has_mob_runtime: bool,
93}
94
95fn console_member_console_identity(member: &ConsoleMember) -> &str {
96    member
97        .labels
98        .get("agent_identity")
99        .filter(|value| !value.trim().is_empty())
100        .map_or(member.agent_identity.as_str(), String::as_str)
101}
102
103impl ConsoleLiveSnapshot {
104    pub fn new(
105        runtime_id: Option<String>,
106        running: bool,
107        loaded_modules: Vec<String>,
108        agents: Vec<ConsoleAgentLiveSnapshot>,
109        members: Vec<ConsoleMember>,
110        has_mob_runtime: bool,
111    ) -> Self {
112        let mut seen = BTreeSet::new();
113        let mut deduped_modules = Vec::new();
114        for module_id in loaded_modules {
115            if seen.insert(module_id.clone()) {
116                deduped_modules.push(module_id);
117            }
118        }
119        let mut seen_agents = BTreeSet::new();
120        let mut deduped_agents = Vec::new();
121        for agent in agents {
122            if seen_agents.insert(agent.agent_id.clone()) {
123                deduped_agents.push(agent);
124            }
125        }
126        Self {
127            runtime_id,
128            running,
129            loaded_modules: deduped_modules,
130            agents: deduped_agents,
131            members,
132            has_mob_runtime,
133        }
134    }
135}
136
137pub fn handle_console_rest_json_route(
138    decisions: &RuntimeDecisionState,
139    request: &ConsoleRestJsonRequest,
140) -> ConsoleRestJsonResponse {
141    handle_console_rest_json_route_with_snapshot(decisions, request, None)
142}
143
144pub fn handle_console_rest_json_route_with_snapshot(
145    decisions: &RuntimeDecisionState,
146    request: &ConsoleRestJsonRequest,
147    live_snapshot: Option<&ConsoleLiveSnapshot>,
148) -> ConsoleRestJsonResponse {
149    let (base_path, query_params) = split_path_and_query(&request.path);
150    if request.method != "GET"
151        || (base_path != CONSOLE_MODULES_ROUTE && base_path != CONSOLE_EXPERIENCE_ROUTE)
152    {
153        return ConsoleRestJsonResponse {
154            status: 404,
155            body: serde_json::json!({"error":"not_found"}),
156        };
157    }
158
159    let resolved_auth = match resolve_console_auth(decisions, request.auth.as_ref(), &query_params)
160    {
161        Ok(auth) => auth,
162        Err(error) => {
163            return ConsoleRestJsonResponse {
164                status: 401,
165                body: serde_json::json!({
166                    "error":"unauthorized",
167                    "reason": console_auth_error_reason(&error),
168                }),
169            };
170        }
171    };
172
173    match resolved_auth {
174        Some(auth) => {
175            if let Err(error) =
176                enforce_console_route_access(&decisions.auth, &decisions.console, &auth)
177            {
178                return ConsoleRestJsonResponse {
179                    status: 401,
180                    body: serde_json::json!({
181                        "error":"unauthorized",
182                        "reason": auth_error_reason(&error),
183                    }),
184                };
185            }
186        }
187        None if decisions.console.require_app_auth => {
188            return ConsoleRestJsonResponse {
189                status: 401,
190                body: serde_json::json!({
191                    "error":"unauthorized",
192                    "reason":"missing_credentials",
193                }),
194            };
195        }
196        None => {}
197    }
198
199    let modules: Vec<String> = decisions
200        .modules
201        .iter()
202        .map(|module| module.id.clone())
203        .collect();
204    let live_snapshot = live_snapshot
205        .cloned()
206        .unwrap_or_else(|| default_console_live_snapshot(decisions));
207    let body = if base_path == CONSOLE_EXPERIENCE_ROUTE {
208        build_console_experience_contract(&modules, &live_snapshot, &decisions.console)
209    } else {
210        serde_json::json!({
211            "contract_version": MOBKIT_CONTRACT_VERSION,
212            "modules": modules
213        })
214    };
215    ConsoleRestJsonResponse { status: 200, body }
216}
217
218fn default_console_live_snapshot(decisions: &RuntimeDecisionState) -> ConsoleLiveSnapshot {
219    let loaded_modules = decisions
220        .modules
221        .iter()
222        .map(|module| module.id.clone())
223        .collect::<Vec<_>>();
224    let agents = loaded_modules
225        .iter()
226        .map(|module_id| ConsoleAgentLiveSnapshot {
227            agent_id: module_id.clone(),
228            member_id: module_id.clone(),
229            label: module_id.clone(),
230            kind: "module_agent".to_string(),
231            identity: None,
232            role: None,
233            state: Some("idle".to_string()),
234            session_id: None,
235            model_capabilities: ConsoleModelCapabilities::default(),
236            response_phase: None,
237            watched: None,
238            alert_level: None,
239            degraded: None,
240            degraded_reason: None,
241        })
242        .collect::<Vec<_>>();
243    ConsoleLiveSnapshot::new(
244        None,
245        !decisions.modules.is_empty(),
246        loaded_modules,
247        agents,
248        Vec::new(),
249        false,
250    )
251}
252
253fn build_console_experience_contract(
254    modules: &[String],
255    live_snapshot: &ConsoleLiveSnapshot,
256    console_policy: &ConsolePolicy,
257) -> Value {
258    let console_config = &console_policy.ui;
259    let is_aggregate_console = live_snapshot.runtime_id.as_deref() == Some("console-aggregator");
260    fn has_extended_agent_contract(agent: &ConsoleAgentLiveSnapshot) -> bool {
261        agent.role.is_some()
262            || agent.session_id.is_some()
263            || agent.model_capabilities != ConsoleModelCapabilities::default()
264            || agent.response_phase.is_some()
265            || agent.watched.is_some()
266            || agent.alert_level.is_some()
267            || agent.degraded.is_some()
268            || agent.degraded_reason.is_some()
269            || agent.kind != "module_agent"
270            || agent.member_id != agent.agent_id
271            || agent.label != agent.agent_id
272    }
273
274    let module_panels = modules
275        .iter()
276        .map(|module_id| {
277            serde_json::json!({
278                "panel_id": format!("module.{module_id}"),
279                "module_id": module_id,
280                "title": format!("{module_id} module"),
281                "route": format!("/console/modules/{module_id}"),
282                "capabilities": {
283                    "can_render": true,
284                    "can_subscribe_activity": true,
285                }
286            })
287        })
288        .collect::<Vec<_>>();
289
290    // P0 fix: Build sidebar from the full mob roster (members) when a mob
291    // runtime is present, so multi-instance profiles (e.g. 5 profiles → 15
292    // agents) enumerate every individual agent, not just profile-level IDs.
293    // Fall back to loaded_modules for module-only runtimes.
294    let has_roster_members = live_snapshot.has_mob_runtime && !live_snapshot.members.is_empty();
295    let sidebar_agents: Vec<Value> = if has_roster_members {
296        let mut sorted_members: Vec<&ConsoleMember> = live_snapshot.members.iter().collect();
297        sorted_members.sort_by(|a, b| a.agent_identity.cmp(&b.agent_identity));
298        sorted_members
299                .iter()
300                .map(|member| {
301                    let console_identity = console_member_console_identity(member);
302                    let label = member
303                        .labels
304                        .get("display_name")
305                        .cloned()
306                        .unwrap_or_else(|| member.agent_identity.clone());
307                    let is_active = member.state == "active";
308                    let addressable = is_active
309                        && member
310                        .labels
311                        .get("addressable")
312                        .map(|v| v != "false")
313                        .unwrap_or(true);
314                    let watched = member
315                        .labels
316                        .get("console_watched")
317                        .map(|value| value == "true");
318                    let alert_level = member
319                        .labels
320                        .get("console_alert_level")
321                        .filter(|value| matches!(value.as_str(), "elevated" | "critical"))
322                        .cloned();
323                    let degraded = member
324                        .labels
325                        .get("console_degraded")
326                        .map(|value| value == "true");
327                    let degraded_reason = member.labels.get("console_degraded_reason").cloned();
328                    let singleton = member
329                        .labels
330                        .get("singleton")
331                        .map(|v| v == "true")
332                        .unwrap_or(false);
333                    let group = member
334                        .labels
335                        .get("group")
336                        .cloned()
337                        .unwrap_or_else(|| member.role.clone());
338                    serde_json::json!({
339                        "agent_id": member.agent_identity,
340                        "member_id": member.agent_identity,
341                        "identity": console_identity,
342                        "label": label,
343                        "kind": "mob_agent",
344                        "role": member.role,
345                        "state": member.state,
346                        "model_capabilities": member.model_capabilities,
347                        "session_id": member.session_id,
348                        "wired_to": member.wired_to,
349                        "labels": member.labels,
350                        "group": group,
351                        "addressable": addressable,
352                        "watched": watched,
353                        "alertLevel": alert_level,
354                        "degraded": degraded,
355                        "degradedReason": degraded_reason,
356                        "affordances": {
357                            "addressable": addressable,
358                            "can_send_message": addressable,
359                            "can_retire": is_active && !is_aggregate_console && !singleton,
360                            "can_respawn": !is_aggregate_console,
361                            "runtime_mode": if is_aggregate_console { "console_aggregator" } else { "mob_agent" },
362                        },
363                    })
364                })
365                .collect()
366    } else {
367        live_snapshot
368            .loaded_modules
369            .iter()
370            .map(|module_id| {
371                serde_json::json!({
372                    "agent_id": module_id,
373                    "member_id": module_id,
374                    "label": module_id,
375                    "kind": "module_agent",
376                })
377            })
378            .collect()
379    };
380
381    let sidebar_agents: Vec<Value> = if has_roster_members {
382        sidebar_agents
383    } else if live_snapshot.agents.iter().any(has_extended_agent_contract) {
384        live_snapshot
385            .agents
386            .iter()
387            .map(|agent| {
388                let mut record = serde_json::Map::new();
389                record.insert(
390                    "agent_id".to_string(),
391                    Value::String(agent.agent_id.clone()),
392                );
393                record.insert(
394                    "member_id".to_string(),
395                    Value::String(agent.member_id.clone()),
396                );
397                record.insert("label".to_string(), Value::String(agent.label.clone()));
398                record.insert("kind".to_string(), Value::String(agent.kind.clone()));
399                if let Some(role) = &agent.role {
400                    record.insert("role".to_string(), Value::String(role.clone()));
401                }
402                if let Some(state) = &agent.state {
403                    record.insert("state".to_string(), Value::String(state.clone()));
404                }
405                if let Some(identity) = &agent.identity {
406                    record.insert("identity".to_string(), Value::String(identity.clone()));
407                }
408                if let Some(session_id) = &agent.session_id {
409                    record.insert("session_id".to_string(), Value::String(session_id.clone()));
410                }
411                record.insert(
412                    "model_capabilities".to_string(),
413                    serde_json::to_value(&agent.model_capabilities).unwrap_or(Value::Null),
414                );
415                if let Some(response_phase) = &agent.response_phase {
416                    record.insert(
417                        "response_phase".to_string(),
418                        Value::String(response_phase.clone()),
419                    );
420                }
421                if is_aggregate_console {
422                    record.insert("addressable".to_string(), Value::Bool(true));
423                    record.insert(
424                        "affordances".to_string(),
425                        serde_json::json!({
426                            "addressable": true,
427                            "can_send_message": true,
428                            "can_retire": false,
429                            "can_respawn": false,
430                            "runtime_mode": "console_aggregator",
431                        }),
432                    );
433                }
434                if let Some(watched) = agent.watched {
435                    record.insert("watched".to_string(), Value::Bool(watched));
436                }
437                if let Some(alert_level) = &agent.alert_level {
438                    record.insert("alertLevel".to_string(), Value::String(alert_level.clone()));
439                }
440                if let Some(degraded) = agent.degraded {
441                    record.insert("degraded".to_string(), Value::Bool(degraded));
442                }
443                if let Some(degraded_reason) = &agent.degraded_reason {
444                    record.insert(
445                        "degradedReason".to_string(),
446                        Value::String(degraded_reason.clone()),
447                    );
448                }
449                Value::Object(record)
450            })
451            .collect()
452    } else {
453        sidebar_agents
454    };
455
456    let has_mob = live_snapshot.has_mob_runtime && !is_aggregate_console;
457    let can_send_messages = has_mob || is_aggregate_console;
458    let can_spawn_members = has_mob && !is_aggregate_console;
459    let can_wire_members = has_mob && !is_aggregate_console;
460    let can_retire_members = has_mob && !is_aggregate_console;
461    let identity_status_rows = build_identity_status_rows(&sidebar_agents);
462
463    // P3: Build per-profile capability hints from roster data.
464    let profile_capabilities: BTreeMap<String, Value> = {
465        let mut profiles: BTreeMap<String, (usize, bool, bool)> = BTreeMap::new();
466        for member in &live_snapshot.members {
467            let entry = profiles
468                .entry(member.role.clone())
469                .or_insert((0, true, false));
470            entry.0 += 1; // instance_count
471            // addressable = all instances addressable
472            let member_addressable = member
473                .labels
474                .get("addressable")
475                .map(|v| v != "false")
476                .unwrap_or(true);
477            entry.1 = entry.1 && member_addressable;
478            // has_wiring = any instance wired
479            entry.2 = entry.2 || !member.wired_to.is_empty();
480        }
481        profiles
482            .into_iter()
483            .map(|(profile, (count, addressable, has_wiring))| {
484                (
485                    profile,
486                    serde_json::json!({
487                        "instance_count": count,
488                        "addressable": addressable,
489                        "has_wiring": has_wiring,
490                    }),
491                )
492            })
493            .collect()
494    };
495
496    let console_title = console_config.title.as_deref().unwrap_or("Mob Console");
497    let mut body = serde_json::json!({
498        "contract_version": MOBKIT_CONTRACT_VERSION,
499        "runtime_id": live_snapshot.runtime_id,
500        "console_config": console_config,
501        "runtime_capabilities": {
502            "can_spawn_members": can_spawn_members,
503            "can_send_messages": can_send_messages,
504            "can_wire_members": can_wire_members,
505            "can_retire_members": can_retire_members,
506            "available_spawn_modes": if can_spawn_members {
507                vec!["module", "role"]
508            } else {
509                vec!["module"]
510            },
511            "profile_capabilities": profile_capabilities,
512        },
513        "base_panel": {
514            "panel_id": "console.home",
515            "title": console_title,
516            "route": CONSOLE_EXPERIENCE_ROUTE,
517            "capabilities": {
518                "can_render": true,
519                "surface": "console",
520            }
521        },
522        "module_panels": module_panels,
523        "agent_sidebar": {
524            "panel_id": "console.agent_sidebar",
525            "title": "Agents",
526            "schema_version": "1",
527            "refresh": {
528                "mode": "poll",
529                "interval_ms": 5000,
530            },
531            "source_method": if is_aggregate_console { "mobkit/console/list_identities" } else if has_mob { "mobkit/list_members" } else { "mobkit/status" },
532            "refresh_policy": {
533                "mode": "pull",
534                "poll_interval_ms": 5000,
535            },
536            "selection_contract": {
537                "selected_agent_id_field": "agent_id",
538                "selected_member_id_field": "member_id",
539                "emits_scope": "agent",
540                "supported_scopes": ["mob", "agent"],
541            },
542            "list_item_contract": {
543                "fields": ["agent_id", "member_id", "identity", "label", "kind", "role", "state", "model_capabilities", "response_phase", "wired_to", "labels", "group", "addressable", "affordances", "watched", "alertLevel", "degraded", "degradedReason"],
544                "agent_id_field": "agent_id",
545                "member_id_field": "member_id",
546                "group_by_field": "group",
547                "well_known_labels": {
548                    "display_name": "human-readable label; overrides member_id in sidebar",
549                    "addressable": "set \"false\" to hide send-message actions for internal agents",
550                    "singleton": "set \"true\" to prevent retire (e.g. review, summarizer agents)",
551                    "group": "sidebar group name; overrides profile-based grouping",
552                },
553                "refresh_projection": "source_method returns ConsoleMember rows (agent_identity, role, state, model_capabilities, wired_to, labels). Clients must project: agent_id=agent_identity, member_id=agent_identity, identity=labels.agent_identity||agent_identity, label=labels.display_name||agent_identity, group=labels.group||role, addressable=labels.addressable!='false', model_capabilities.image_input default false, affordances derived from labels.singleton and addressable.",
554            },
555            "live_snapshot": {
556                "agents": sidebar_agents,
557            }
558        },
559        "identity_status": {
560            "panel_id": "console.identity_status",
561            "title": "Identity Status",
562            "schema_version": "1",
563            "refresh": {
564                "mode": "poll",
565                "interval_ms": 5000,
566            },
567            "source_method": if is_aggregate_console { "mobkit/console/list_identities" } else if has_mob { "mobkit/list_members" } else { "mobkit/status" },
568            "rows": identity_status_rows,
569        },
570        "activity_feed": {
571            "panel_id": "console.activity_feed",
572            "title": "Activity",
573            "schema_version": "1",
574            "refresh": {
575                "mode": "stream",
576                "topic": "all_events",
577                "update_semantics": "append",
578            },
579            "transport": "sse",
580            "source_route": "/console/timeline/stream",
581            "request_contract": {
582                "last_event_id_header": "optional Last-Event-ID checkpoint from prior event_id",
583            },
584            "event_contract": {
585                "envelope_fields": ["event_id", "interaction_id", "identity", "event_type", "timestamp_ms", "data"],
586                "event_type_path": "event_type",
587                "frame_format": "id: <event_id>\\nevent: <event_type>\\ndata: <event_json>\\n\\n",
588            },
589            "keep_alive": {
590                "interval_ms": SSE_KEEP_ALIVE_INTERVAL_MS,
591                "event": SSE_KEEP_ALIVE_EVENT_NAME,
592                "comment_frame": SSE_KEEP_ALIVE_COMMENT_FRAME,
593            },
594            "filter_presets": [
595                { "id": "all", "label": "All" },
596                { "id": "watched-only", "label": "Watched only", "watchedOnly": true },
597                { "id": "critical", "label": "Critical", "alertLevels": ["critical"] }
598            ],
599            "active_preset_id": "all"
600        },
601        "chat_inspector": {
602            "panel_id": "console.chat_inspector",
603            "title": "Chat Inspector",
604            "schema_version": "1",
605            "refresh": {
606                "mode": "stream",
607                "topic": "selected_identity",
608                "update_semantics": "append",
609            },
610            "send_method": "mobkit/console/send",
611            "observe_route": "/console/timeline/stream",
612            "transport": "rpc+sse",
613            "request_contract": {
614                "identity": "required target identity",
615                "content": "required user text to send",
616                "origin": "required caller identifier for audit and routing",
617            },
618            "response_contract": {
619                "interaction_id": "per-turn console correlation token",
620                "identity": "echoed target identity",
621            },
622            "event_contract": {
623                "envelope_fields": ["event_id", "interaction_id", "identity", "event_type", "timestamp_ms", "data"],
624                "event_type_path": "event_type",
625                "interaction_id_field": "interaction_id",
626            }
627        },
628        "topology": {
629            "panel_id": "console.topology",
630            "title": "Topology",
631            "schema_version": "1",
632            "refresh": {
633                "mode": "poll",
634                "interval_ms": 5000,
635            },
636            "source_method": if is_aggregate_console { "mobkit/console/list_identities" } else { "mobkit/status" },
637            "route_method": if is_aggregate_console { Value::Null } else { serde_json::json!("mobkit/routing/routes/list") },
638            "refresh_policy": {
639                "mode": "pull",
640                "poll_interval_ms": 5000,
641            },
642            "graph_contract": {
643                "node_id_field": "identity",
644                "edge_fields": ["wired_to"],
645            },
646            "live_snapshot": if live_snapshot.members.is_empty() {
647                // Module-only runtime: topology nodes are loaded module IDs.
648                serde_json::json!({
649                    "nodes": &live_snapshot.loaded_modules,
650                    "node_count": live_snapshot.loaded_modules.len(),
651                })
652            } else {
653                // Mob runtime: identity-native topology from members.
654                serde_json::json!({
655                    "nodes": live_snapshot.members.iter().map(|member| {
656                        let addressable = member.state == "active" && member
657                            .labels
658                            .get("addressable")
659                            .map(|value| value != "false")
660                            .unwrap_or(true);
661                        serde_json::json!({
662                            "identity": member.agent_identity,
663                            "label": member.labels.get("display_name").cloned().unwrap_or_else(|| member.agent_identity.clone()),
664                            "role": member.role,
665                            "state": member.state,
666                            "wired_to": member.wired_to,
667                            "addressable": addressable,
668                        })
669                    }).collect::<Vec<_>>(),
670                    "node_count": live_snapshot.members.len(),
671                })
672            }
673        },
674        "health_overview": {
675            "panel_id": "console.health_overview",
676            "title": "Health",
677            "schema_version": "1",
678            "refresh": {
679                "mode": "poll",
680                "interval_ms": 5000,
681            },
682            "source_method": if is_aggregate_console { "mobkit/console/list_identities" } else { "mobkit/status" },
683            "activity_source_method": if is_aggregate_console { Value::Null } else { serde_json::json!(EVENTS_SUBSCRIBE_METHOD) },
684            "activity_source_route": if is_aggregate_console { serde_json::json!("/console/timeline/stream") } else { Value::Null },
685            "refresh_policy": {
686                "mode": "pull_and_stream",
687                "poll_interval_ms": 5000,
688            },
689            "status_contract": {
690                "running_field": "running",
691                "loaded_modules_field": "loaded_modules",
692            },
693            "live_snapshot": {
694                "running": live_snapshot.running,
695                "loaded_modules": &live_snapshot.loaded_modules,
696                "loaded_module_count": &live_snapshot.loaded_modules.len(),
697                "identities": identity_status_rows,
698            }
699        },
700        "flows": if is_aggregate_console {
701            serde_json::json!({
702                "panel_id": "console.flows",
703                "title": "Flows",
704                "schema_version": "1",
705                "available": false,
706                "reason": "flow scheduling is not exposed by the console aggregator surface",
707            })
708        } else {
709            serde_json::json!({
710                "panel_id": "console.flows",
711                "title": "Flows",
712                "schema_version": "1",
713                "refresh": {
714                    "mode": "poll",
715                    "interval_ms": 10000,
716                },
717                "evaluate_method": "mobkit/scheduling/evaluate",
718                "dispatch_method": "mobkit/scheduling/dispatch",
719                "refresh_policy": {
720                    "mode": "pull",
721                    "poll_interval_ms": 10000,
722                },
723                "request_contract": {
724                    "schedules": "caller-supplied array of ScheduleDefinition (schedule_id, interval|cron, timezone, enabled)",
725                    "tick_ms": "evaluation timestamp in epoch milliseconds",
726                },
727                "evaluate_response_contract": {
728                    "tick_ms": "u64 - echoed evaluation tick",
729                    "due_triggers": "array of ScheduleTrigger {schedule_id, interval, timezone, due_tick_ms}",
730                },
731                "dispatch_response_contract": {
732                    "tick_ms": "u64 - echoed dispatch tick",
733                    "due_count": "usize - number of triggers that were due",
734                    "dispatched": "array of ScheduleDispatch {claim_key, schedule_id, interval, timezone, due_tick_ms, tick_ms, event_id, supervisor_signal?, runtime_injection?, runtime_injection_error?}",
735                    "skipped_claims": "array of schedule_id strings skipped due to idempotent claim",
736                },
737                "note": "Flows require caller-supplied schedule definitions; the runtime does not persist a flow registry. Clients must maintain their own schedule configs."
738            })
739        },
740        "session_history": if has_mob {
741            serde_json::json!({
742                "panel_id": "console.session_history",
743                "title": "Session History",
744                "schema_version": "1",
745                "refresh": {
746                    "mode": "poll",
747                    "interval_ms": 5000,
748                },
749                "source_method": "mobkit/console/query_timeline",
750                "transport": "rpc",
751                "available": true,
752                "request_contract": {
753                    "session_id": "required current session id for the identity/member",
754                    "offset": "optional message offset from the start of the transcript",
755                    "limit": "optional max messages to return",
756                },
757                "response_contract": {
758                    "success": "result is SessionHistoryPage {session_id, message_count, offset, limit, has_more, messages}",
759                }
760            })
761        } else {
762            serde_json::json!({
763                "panel_id": "console.session_history",
764                "title": "Session History",
765                "schema_version": "1",
766                "refresh": {
767                    "mode": "poll",
768                    "interval_ms": 5000,
769                },
770                "available": false,
771                "reason": "session history is projected through the console timeline",
772            })
773        }
774    });
775    if let Some(fetch_timeout_ms) = console_policy.fetch_timeout_ms {
776        body["console_policy"] = serde_json::json!({
777            "fetch_timeout_ms": fetch_timeout_ms,
778        });
779    }
780    body
781}
782
783fn build_identity_status_rows(sidebar_agents: &[Value]) -> Vec<Value> {
784    sidebar_agents
785        .iter()
786        .map(|agent| {
787            let identity = agent
788                .get("identity")
789                .and_then(Value::as_str)
790                .or_else(|| agent.get("member_id").and_then(Value::as_str))
791                .unwrap_or_default();
792            let display_name = agent
793                .get("label")
794                .and_then(Value::as_str)
795                .filter(|label| *label != identity);
796            let role = agent.get("role").and_then(Value::as_str);
797            let state = agent
798                .get("state")
799                .and_then(Value::as_str)
800                .unwrap_or("unknown");
801            let labels = agent
802                .get("labels")
803                .cloned()
804                .unwrap_or_else(|| serde_json::json!({}));
805            let addressability = if agent
806                .get("addressable")
807                .and_then(Value::as_bool)
808                .unwrap_or(true)
809            {
810                "addressable"
811            } else {
812                "internal_only"
813            };
814            let mut row = serde_json::json!({
815                "identity": identity,
816                "state": state,
817                "addressability": addressability,
818                "labels": labels,
819            });
820            if let Some(display_name) = display_name {
821                row["display_name"] = Value::String(display_name.to_string());
822            }
823            if let Some(role) = role {
824                row["role"] = Value::String(role.to_string());
825            }
826            if let Some(generation) = agent.get("generation").and_then(Value::as_u64) {
827                row["generation"] = Value::from(generation);
828            }
829            if let Some(checkpoint_version) =
830                agent.get("checkpoint_version").and_then(Value::as_u64)
831            {
832                row["checkpoint_version"] = Value::from(checkpoint_version);
833            }
834            if let Some(lease_healthy) = agent.get("lease_healthy").and_then(Value::as_bool) {
835                row["lease_healthy"] = Value::from(lease_healthy);
836            }
837            if let Some(response_phase) = agent.get("response_phase").and_then(Value::as_str) {
838                row["response_phase"] = Value::String(response_phase.to_string());
839            }
840            if let Some(model_capabilities) = agent.get("model_capabilities") {
841                row["model_capabilities"] = model_capabilities.clone();
842            }
843            if let Some(session_id) = agent.get("session_id").and_then(Value::as_str) {
844                row["session_id"] = Value::String(session_id.to_string());
845            }
846            row
847        })
848        .collect()
849}
850
851fn split_path_and_query(path: &str) -> (&str, BTreeMap<String, String>) {
852    let (base, query) = path.split_once('?').unwrap_or((path, ""));
853    let mut params = BTreeMap::new();
854    for part in query.split('&') {
855        if part.is_empty() {
856            continue;
857        }
858        let (k, v) = part.split_once('=').unwrap_or((part, ""));
859        if !k.is_empty() {
860            params.insert(k.to_string(), v.to_string());
861        }
862    }
863    (base, params)
864}
865
866fn resolve_console_auth(
867    decisions: &RuntimeDecisionState,
868    explicit_auth: Option<&ConsoleAccessRequest>,
869    query_params: &BTreeMap<String, String>,
870) -> Result<Option<ConsoleAccessRequest>, ConsoleAuthResolutionError> {
871    if let Some(auth) = explicit_auth {
872        return Ok(Some(auth.clone()));
873    }
874
875    if !decisions.console.require_app_auth {
876        return Ok(None);
877    }
878
879    // Check query-param auth_token (also used as the bearer-header injection
880    // point by the HTTP handler — see console_json_handler).
881    match query_params.get("auth_token") {
882        Some(token) => resolve_console_auth_from_token(decisions, token).map(Some),
883        None => Ok(None),
884    }
885}
886
887/// Extract a bearer token from an `Authorization: Bearer <token>` header value.
888pub fn extract_bearer_token_from_header(header_value: &str) -> Option<&str> {
889    let token = header_value.strip_prefix("Bearer ")?;
890    if token.is_empty() { None } else { Some(token) }
891}
892
893/// Validate a bearer token against the console's trusted OIDC config AND
894/// the email allowlist / provider policy (via `enforce_console_route_access`).
895/// Returns `true` only if the token is valid AND the caller is authorized.
896pub fn validate_console_token(decisions: &RuntimeDecisionState, token: &str) -> bool {
897    let auth = match resolve_console_auth_from_token(decisions, token) {
898        Ok(auth) => auth,
899        Err(_) => return false,
900    };
901    crate::decisions::enforce_console_route_access(&decisions.auth, &decisions.console, &auth)
902        .is_ok()
903}
904
905fn resolve_console_auth_from_token(
906    decisions: &RuntimeDecisionState,
907    token: &str,
908) -> Result<ConsoleAccessRequest, ConsoleAuthResolutionError> {
909    if decisions.trusted_oidc.audience.trim().is_empty() {
910        return Err(ConsoleAuthResolutionError::InvalidTrustedOidcConfig);
911    }
912
913    let discovery = parse_oidc_discovery_json(&decisions.trusted_oidc.discovery_json)
914        .map_err(|_| ConsoleAuthResolutionError::InvalidTrustedOidcConfig)?;
915    let jwks = parse_jwks_json(&decisions.trusted_oidc.jwks_json)
916        .map_err(|_| ConsoleAuthResolutionError::InvalidTrustedOidcConfig)?;
917    let header =
918        inspect_jwt_header(token).map_err(|_| ConsoleAuthResolutionError::InvalidTokenHeader)?;
919
920    if header.alg == "HS256"
921        && !hs256_allowed_for_development_issuer(&discovery.issuer, &discovery.jwks_uri)
922    {
923        return Err(ConsoleAuthResolutionError::Hs256NotAllowed);
924    }
925
926    let key = select_jwk_for_token(&jwks, header.kid.as_deref(), &header.alg)
927        .map_err(|_| ConsoleAuthResolutionError::JwksKeyNotFound)?;
928    let verification_key = build_jwt_verification_key(key, &header.alg)
929        .map_err(|_| ConsoleAuthResolutionError::InvalidJwksKeyMaterial)?;
930
931    let now_epoch_seconds = SystemTime::now()
932        .duration_since(UNIX_EPOCH)
933        .unwrap_or_default()
934        .as_secs();
935    let claims = validate_jwt_with_verification_key(
936        token,
937        &verification_key,
938        &JwtClaimsValidationConfig {
939            issuer: Some(discovery.issuer),
940            audience: Some(decisions.trusted_oidc.audience.clone()),
941            now_epoch_seconds,
942            leeway_seconds: 30,
943        },
944    )
945    .map_err(|_| ConsoleAuthResolutionError::InvalidToken)?;
946
947    let principal = claims
948        .email
949        .or(claims.subject)
950        .ok_or(ConsoleAuthResolutionError::MissingTokenIdentity)?;
951    let provider =
952        if claims.actor_type.as_deref() == Some("service") || principal.starts_with("svc:") {
953            AuthProvider::ServiceIdentity
954        } else {
955            match claims.provider.as_deref() {
956                Some("google_oauth") => AuthProvider::GoogleOAuth,
957                Some("github_oauth") => AuthProvider::GitHubOAuth,
958                Some("generic_oidc") => AuthProvider::GenericOidc,
959                _ => AuthProvider::GenericOidc,
960            }
961        };
962
963    Ok(ConsoleAccessRequest {
964        provider,
965        email: principal,
966    })
967}
968
969#[derive(Debug, Clone, PartialEq, Eq)]
970enum ConsoleAuthResolutionError {
971    InvalidTrustedOidcConfig,
972    InvalidTokenHeader,
973    JwksKeyNotFound,
974    InvalidJwksKeyMaterial,
975    InvalidToken,
976    MissingTokenIdentity,
977    Hs256NotAllowed,
978}
979
980fn console_auth_error_reason(error: &ConsoleAuthResolutionError) -> &'static str {
981    match error {
982        ConsoleAuthResolutionError::InvalidTrustedOidcConfig => "invalid_trusted_oidc_config",
983        ConsoleAuthResolutionError::InvalidTokenHeader => "invalid_token_header",
984        ConsoleAuthResolutionError::JwksKeyNotFound => "jwks_key_not_found",
985        ConsoleAuthResolutionError::InvalidJwksKeyMaterial => "invalid_jwks_key_material",
986        ConsoleAuthResolutionError::InvalidToken => "invalid_token",
987        ConsoleAuthResolutionError::MissingTokenIdentity => "missing_token_identity",
988        ConsoleAuthResolutionError::Hs256NotAllowed => "hs256_not_allowed",
989    }
990}
991
992fn hs256_allowed_for_development_issuer(issuer: &str, jwks_uri: &str) -> bool {
993    match (extract_uri_host(issuer), extract_uri_host(jwks_uri)) {
994        (Some(issuer_host), Some(jwks_host)) => {
995            is_development_host(issuer_host) && is_development_host(jwks_host)
996        }
997        _ => false,
998    }
999}
1000
1001fn extract_uri_host(uri: &str) -> Option<&str> {
1002    let after_scheme = uri.split_once("://").map_or(uri, |(_, rest)| rest);
1003    let authority_with_path = after_scheme.split('/').next()?;
1004    let authority = authority_with_path
1005        .rsplit('@')
1006        .next()
1007        .unwrap_or(authority_with_path);
1008    if authority.is_empty() {
1009        return None;
1010    }
1011
1012    if let Some(stripped) = authority.strip_prefix('[') {
1013        let (ipv6_host, _) = stripped.split_once(']')?;
1014        return if ipv6_host.is_empty() {
1015            None
1016        } else {
1017            Some(ipv6_host)
1018        };
1019    }
1020
1021    let host = authority
1022        .split_once(':')
1023        .map_or(authority, |(hostname, _)| hostname);
1024    if host.is_empty() { None } else { Some(host) }
1025}
1026
1027fn is_development_host(host: &str) -> bool {
1028    let lowercase = host.to_ascii_lowercase();
1029    lowercase == "localhost"
1030        || lowercase == "127.0.0.1"
1031        || lowercase == "::1"
1032        || lowercase.ends_with(".localhost")
1033}
1034
1035fn auth_error_reason(error: &DecisionPolicyError) -> &'static str {
1036    match error {
1037        DecisionPolicyError::AuthProviderMismatch => "provider_mismatch",
1038        DecisionPolicyError::AuthProviderNotSupported => "provider_not_supported",
1039        DecisionPolicyError::EmailNotAllowlisted => "email_not_allowlisted",
1040        DecisionPolicyError::InvalidServiceIdentity => "invalid_service_identity",
1041        DecisionPolicyError::ServiceIdentityNotAllowlisted => "service_identity_not_allowlisted",
1042        _ => "policy_denied",
1043    }
1044}