Skip to main content

meerkat_mobkit/runtime/
console_ingress.rs

1//! Console ingress types and JSON request/response structures.
2
3use super::*;
4use crate::rpc::MOBKIT_CONTRACT_VERSION;
5
6/// Console-facing view of a single mob member.
7///
8/// Narrow projection of meerkat's roster enriched with cached diagnostics
9/// such as the current bridge session id when the console read model has one.
10/// Wire field names match the console JSON — `agent_identity`, `role`,
11/// `state` — so the admin UI reads them unchanged from the live snapshot.
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub struct ConsoleMember {
14    pub agent_identity: String,
15    pub role: String,
16    pub state: String,
17    #[serde(default)]
18    pub model_capabilities: ConsoleModelCapabilities,
19    #[serde(default, skip_serializing_if = "Option::is_none")]
20    pub runtime_mode: Option<String>,
21    #[serde(default, skip_serializing_if = "Option::is_none")]
22    pub session_id: Option<String>,
23    pub wired_to: Vec<String>,
24    #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
25    pub labels: std::collections::BTreeMap<String, String>,
26}
27
28#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
29pub struct ConsoleModelCapabilities {
30    #[serde(default)]
31    pub image_input: bool,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct ConsoleRestJsonRequest {
36    pub method: String,
37    pub path: String,
38    pub auth: Option<ConsoleAccessRequest>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct ConsoleRestJsonResponse {
43    pub status: u16,
44    pub body: Value,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48pub struct ConsoleAgentLiveSnapshot {
49    pub agent_id: String,
50    pub member_id: String,
51    pub label: String,
52    pub kind: String,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub identity: Option<String>,
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub role: Option<String>,
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub state: Option<String>,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub session_id: Option<String>,
61    #[serde(default)]
62    pub model_capabilities: ConsoleModelCapabilities,
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub response_phase: Option<String>,
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub watched: Option<bool>,
67    #[serde(
68        default,
69        skip_serializing_if = "Option::is_none",
70        rename = "alertLevel"
71    )]
72    pub alert_level: Option<String>,
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    pub degraded: Option<bool>,
75    #[serde(
76        default,
77        skip_serializing_if = "Option::is_none",
78        rename = "degradedReason"
79    )]
80    pub degraded_reason: Option<String>,
81}
82
83#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84pub struct ConsoleLiveSnapshot {
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub runtime_id: Option<String>,
87    pub running: bool,
88    pub loaded_modules: Vec<String>,
89    #[serde(default)]
90    pub agents: Vec<ConsoleAgentLiveSnapshot>,
91    pub members: Vec<ConsoleMember>,
92    pub has_mob_runtime: bool,
93}
94
95impl ConsoleLiveSnapshot {
96    pub fn new(
97        runtime_id: Option<String>,
98        running: bool,
99        loaded_modules: Vec<String>,
100        agents: Vec<ConsoleAgentLiveSnapshot>,
101        members: Vec<ConsoleMember>,
102        has_mob_runtime: bool,
103    ) -> Self {
104        let mut seen = BTreeSet::new();
105        let mut deduped_modules = Vec::new();
106        for module_id in loaded_modules {
107            if seen.insert(module_id.clone()) {
108                deduped_modules.push(module_id);
109            }
110        }
111        let mut seen_agents = BTreeSet::new();
112        let mut deduped_agents = Vec::new();
113        for agent in agents {
114            if seen_agents.insert(agent.agent_id.clone()) {
115                deduped_agents.push(agent);
116            }
117        }
118        Self {
119            runtime_id,
120            running,
121            loaded_modules: deduped_modules,
122            agents: deduped_agents,
123            members,
124            has_mob_runtime,
125        }
126    }
127}
128
129pub fn handle_console_rest_json_route(
130    decisions: &RuntimeDecisionState,
131    request: &ConsoleRestJsonRequest,
132) -> ConsoleRestJsonResponse {
133    handle_console_rest_json_route_with_snapshot(decisions, request, None)
134}
135
136pub fn handle_console_rest_json_route_with_snapshot(
137    decisions: &RuntimeDecisionState,
138    request: &ConsoleRestJsonRequest,
139    live_snapshot: Option<&ConsoleLiveSnapshot>,
140) -> ConsoleRestJsonResponse {
141    let (base_path, query_params) = split_path_and_query(&request.path);
142    if request.method != "GET"
143        || (base_path != CONSOLE_MODULES_ROUTE && base_path != CONSOLE_EXPERIENCE_ROUTE)
144    {
145        return ConsoleRestJsonResponse {
146            status: 404,
147            body: serde_json::json!({"error":"not_found"}),
148        };
149    }
150
151    let resolved_auth = match resolve_console_auth(decisions, request.auth.as_ref(), &query_params)
152    {
153        Ok(auth) => auth,
154        Err(error) => {
155            return ConsoleRestJsonResponse {
156                status: 401,
157                body: serde_json::json!({
158                    "error":"unauthorized",
159                    "reason": console_auth_error_reason(&error),
160                }),
161            };
162        }
163    };
164
165    match resolved_auth {
166        Some(auth) => {
167            if let Err(error) =
168                enforce_console_route_access(&decisions.auth, &decisions.console, &auth)
169            {
170                return ConsoleRestJsonResponse {
171                    status: 401,
172                    body: serde_json::json!({
173                        "error":"unauthorized",
174                        "reason": auth_error_reason(&error),
175                    }),
176                };
177            }
178        }
179        None if decisions.console.require_app_auth => {
180            return ConsoleRestJsonResponse {
181                status: 401,
182                body: serde_json::json!({
183                    "error":"unauthorized",
184                    "reason":"missing_credentials",
185                }),
186            };
187        }
188        None => {}
189    }
190
191    let modules: Vec<String> = decisions
192        .modules
193        .iter()
194        .map(|module| module.id.clone())
195        .collect();
196    let live_snapshot = live_snapshot
197        .cloned()
198        .unwrap_or_else(|| default_console_live_snapshot(decisions));
199    let body = if base_path == CONSOLE_EXPERIENCE_ROUTE {
200        build_console_experience_contract(&modules, &live_snapshot, &decisions.console)
201    } else {
202        serde_json::json!({
203            "contract_version": MOBKIT_CONTRACT_VERSION,
204            "modules": modules
205        })
206    };
207    ConsoleRestJsonResponse { status: 200, body }
208}
209
210fn default_console_live_snapshot(decisions: &RuntimeDecisionState) -> ConsoleLiveSnapshot {
211    let loaded_modules = decisions
212        .modules
213        .iter()
214        .map(|module| module.id.clone())
215        .collect::<Vec<_>>();
216    let agents = loaded_modules
217        .iter()
218        .map(|module_id| ConsoleAgentLiveSnapshot {
219            agent_id: module_id.clone(),
220            member_id: module_id.clone(),
221            label: module_id.clone(),
222            kind: "module_agent".to_string(),
223            identity: None,
224            role: None,
225            state: Some("idle".to_string()),
226            session_id: None,
227            model_capabilities: ConsoleModelCapabilities::default(),
228            response_phase: None,
229            watched: None,
230            alert_level: None,
231            degraded: None,
232            degraded_reason: None,
233        })
234        .collect::<Vec<_>>();
235    ConsoleLiveSnapshot::new(
236        None,
237        !decisions.modules.is_empty(),
238        loaded_modules,
239        agents,
240        Vec::new(),
241        false,
242    )
243}
244
245fn build_console_experience_contract(
246    modules: &[String],
247    live_snapshot: &ConsoleLiveSnapshot,
248    console_policy: &ConsolePolicy,
249) -> Value {
250    let console_config = &console_policy.ui;
251    let is_aggregate_console = live_snapshot.runtime_id.as_deref() == Some("console-aggregator");
252    fn has_extended_agent_contract(agent: &ConsoleAgentLiveSnapshot) -> bool {
253        agent.role.is_some()
254            || agent.session_id.is_some()
255            || agent.model_capabilities != ConsoleModelCapabilities::default()
256            || agent.response_phase.is_some()
257            || agent.watched.is_some()
258            || agent.alert_level.is_some()
259            || agent.degraded.is_some()
260            || agent.degraded_reason.is_some()
261            || agent.kind != "module_agent"
262            || agent.member_id != agent.agent_id
263            || agent.label != agent.agent_id
264    }
265
266    let module_panels = modules
267        .iter()
268        .map(|module_id| {
269            serde_json::json!({
270                "panel_id": format!("module.{module_id}"),
271                "module_id": module_id,
272                "title": format!("{module_id} module"),
273                "route": format!("/console/modules/{module_id}"),
274                "capabilities": {
275                    "can_render": true,
276                    "can_subscribe_activity": true,
277                }
278            })
279        })
280        .collect::<Vec<_>>();
281
282    // P0 fix: Build sidebar from the full mob roster (members) when a mob
283    // runtime is present, so multi-instance profiles (e.g. 5 profiles → 15
284    // agents) enumerate every individual agent, not just profile-level IDs.
285    // Fall back to loaded_modules for module-only runtimes.
286    let has_roster_members = live_snapshot.has_mob_runtime && !live_snapshot.members.is_empty();
287    let sidebar_agents: Vec<Value> = if has_roster_members {
288        let mut sorted_members: Vec<&ConsoleMember> = live_snapshot.members.iter().collect();
289        sorted_members.sort_by(|a, b| a.agent_identity.cmp(&b.agent_identity));
290        sorted_members
291                .iter()
292                .map(|member| {
293                    let label = member
294                        .labels
295                        .get("display_name")
296                        .cloned()
297                        .unwrap_or_else(|| member.agent_identity.clone());
298                    let is_active = member.state == "active";
299                    let addressable = is_active
300                        && member
301                        .labels
302                        .get("addressable")
303                        .map(|v| v != "false")
304                        .unwrap_or(true);
305                    let watched = member
306                        .labels
307                        .get("console_watched")
308                        .map(|value| value == "true");
309                    let alert_level = member
310                        .labels
311                        .get("console_alert_level")
312                        .filter(|value| matches!(value.as_str(), "elevated" | "critical"))
313                        .cloned();
314                    let degraded = member
315                        .labels
316                        .get("console_degraded")
317                        .map(|value| value == "true");
318                    let degraded_reason = member.labels.get("console_degraded_reason").cloned();
319                    let singleton = member
320                        .labels
321                        .get("singleton")
322                        .map(|v| v == "true")
323                        .unwrap_or(false);
324                    let group = member
325                        .labels
326                        .get("group")
327                        .cloned()
328                        .unwrap_or_else(|| member.role.clone());
329                    serde_json::json!({
330                        "agent_id": member.agent_identity,
331                        "member_id": member.agent_identity,
332                        "identity": member.agent_identity,
333                        "label": label,
334                        "kind": "mob_agent",
335                        "role": member.role,
336                        "state": member.state,
337                        "model_capabilities": member.model_capabilities,
338                        "session_id": member.session_id,
339                        "wired_to": member.wired_to,
340                        "labels": member.labels,
341                        "group": group,
342                        "addressable": addressable,
343                        "watched": watched,
344                        "alertLevel": alert_level,
345                        "degraded": degraded,
346                        "degradedReason": degraded_reason,
347                        "affordances": {
348                            "addressable": addressable,
349                            "can_send_message": addressable,
350                            "can_retire": is_active && !is_aggregate_console && !singleton,
351                            "can_respawn": !is_aggregate_console,
352                            "runtime_mode": if is_aggregate_console { "console_aggregator" } else { "mob_agent" },
353                        },
354                    })
355                })
356                .collect()
357    } else {
358        live_snapshot
359            .loaded_modules
360            .iter()
361            .map(|module_id| {
362                serde_json::json!({
363                    "agent_id": module_id,
364                    "member_id": module_id,
365                    "label": module_id,
366                    "kind": "module_agent",
367                })
368            })
369            .collect()
370    };
371
372    let sidebar_agents: Vec<Value> = if has_roster_members {
373        sidebar_agents
374    } else if live_snapshot.agents.iter().any(has_extended_agent_contract) {
375        live_snapshot
376            .agents
377            .iter()
378            .map(|agent| {
379                let mut record = serde_json::Map::new();
380                record.insert(
381                    "agent_id".to_string(),
382                    Value::String(agent.agent_id.clone()),
383                );
384                record.insert(
385                    "member_id".to_string(),
386                    Value::String(agent.member_id.clone()),
387                );
388                record.insert("label".to_string(), Value::String(agent.label.clone()));
389                record.insert("kind".to_string(), Value::String(agent.kind.clone()));
390                if let Some(role) = &agent.role {
391                    record.insert("role".to_string(), Value::String(role.clone()));
392                }
393                if let Some(state) = &agent.state {
394                    record.insert("state".to_string(), Value::String(state.clone()));
395                }
396                if let Some(identity) = &agent.identity {
397                    record.insert("identity".to_string(), Value::String(identity.clone()));
398                }
399                if let Some(session_id) = &agent.session_id {
400                    record.insert("session_id".to_string(), Value::String(session_id.clone()));
401                }
402                record.insert(
403                    "model_capabilities".to_string(),
404                    serde_json::to_value(&agent.model_capabilities).unwrap_or(Value::Null),
405                );
406                if let Some(response_phase) = &agent.response_phase {
407                    record.insert(
408                        "response_phase".to_string(),
409                        Value::String(response_phase.clone()),
410                    );
411                }
412                if is_aggregate_console {
413                    record.insert("addressable".to_string(), Value::Bool(true));
414                    record.insert(
415                        "affordances".to_string(),
416                        serde_json::json!({
417                            "addressable": true,
418                            "can_send_message": true,
419                            "can_retire": false,
420                            "can_respawn": false,
421                            "runtime_mode": "console_aggregator",
422                        }),
423                    );
424                }
425                if let Some(watched) = agent.watched {
426                    record.insert("watched".to_string(), Value::Bool(watched));
427                }
428                if let Some(alert_level) = &agent.alert_level {
429                    record.insert("alertLevel".to_string(), Value::String(alert_level.clone()));
430                }
431                if let Some(degraded) = agent.degraded {
432                    record.insert("degraded".to_string(), Value::Bool(degraded));
433                }
434                if let Some(degraded_reason) = &agent.degraded_reason {
435                    record.insert(
436                        "degradedReason".to_string(),
437                        Value::String(degraded_reason.clone()),
438                    );
439                }
440                Value::Object(record)
441            })
442            .collect()
443    } else {
444        sidebar_agents
445    };
446
447    let has_mob = live_snapshot.has_mob_runtime && !is_aggregate_console;
448    let can_send_messages = has_mob || is_aggregate_console;
449    let can_spawn_members = has_mob && !is_aggregate_console;
450    let can_wire_members = has_mob && !is_aggregate_console;
451    let can_retire_members = has_mob && !is_aggregate_console;
452    let identity_status_rows = build_identity_status_rows(&sidebar_agents);
453
454    // P3: Build per-profile capability hints from roster data.
455    let profile_capabilities: BTreeMap<String, Value> = {
456        let mut profiles: BTreeMap<String, (usize, bool, bool)> = BTreeMap::new();
457        for member in &live_snapshot.members {
458            let entry = profiles
459                .entry(member.role.clone())
460                .or_insert((0, true, false));
461            entry.0 += 1; // instance_count
462            // addressable = all instances addressable
463            let member_addressable = member
464                .labels
465                .get("addressable")
466                .map(|v| v != "false")
467                .unwrap_or(true);
468            entry.1 = entry.1 && member_addressable;
469            // has_wiring = any instance wired
470            entry.2 = entry.2 || !member.wired_to.is_empty();
471        }
472        profiles
473            .into_iter()
474            .map(|(profile, (count, addressable, has_wiring))| {
475                (
476                    profile,
477                    serde_json::json!({
478                        "instance_count": count,
479                        "addressable": addressable,
480                        "has_wiring": has_wiring,
481                    }),
482                )
483            })
484            .collect()
485    };
486
487    let console_title = console_config.title.as_deref().unwrap_or("Mob Console");
488    let mut body = serde_json::json!({
489        "contract_version": MOBKIT_CONTRACT_VERSION,
490        "runtime_id": live_snapshot.runtime_id,
491        "console_config": console_config,
492        "runtime_capabilities": {
493            "can_spawn_members": can_spawn_members,
494            "can_send_messages": can_send_messages,
495            "can_wire_members": can_wire_members,
496            "can_retire_members": can_retire_members,
497            "available_spawn_modes": if can_spawn_members {
498                vec!["module", "role"]
499            } else {
500                vec!["module"]
501            },
502            "profile_capabilities": profile_capabilities,
503        },
504        "base_panel": {
505            "panel_id": "console.home",
506            "title": console_title,
507            "route": CONSOLE_EXPERIENCE_ROUTE,
508            "capabilities": {
509                "can_render": true,
510                "surface": "console",
511            }
512        },
513        "module_panels": module_panels,
514        "agent_sidebar": {
515            "panel_id": "console.agent_sidebar",
516            "title": "Agents",
517            "schema_version": "1",
518            "refresh": {
519                "mode": "poll",
520                "interval_ms": 5000,
521            },
522            "source_method": if is_aggregate_console { "mobkit/console/list_identities" } else if has_mob { "mobkit/list_members" } else { "mobkit/status" },
523            "refresh_policy": {
524                "mode": "pull",
525                "poll_interval_ms": 5000,
526            },
527            "selection_contract": {
528                "selected_agent_id_field": "agent_id",
529                "selected_member_id_field": "member_id",
530                "emits_scope": "agent",
531                "supported_scopes": ["mob", "agent"],
532            },
533            "list_item_contract": {
534                "fields": ["agent_id", "member_id", "identity", "label", "kind", "role", "state", "model_capabilities", "response_phase", "wired_to", "labels", "group", "addressable", "affordances", "watched", "alertLevel", "degraded", "degradedReason"],
535                "agent_id_field": "agent_id",
536                "member_id_field": "member_id",
537                "group_by_field": "group",
538                "well_known_labels": {
539                    "display_name": "human-readable label; overrides member_id in sidebar",
540                    "addressable": "set \"false\" to hide send-message actions for internal agents",
541                    "singleton": "set \"true\" to prevent retire (e.g. review, summarizer agents)",
542                    "group": "sidebar group name; overrides profile-based grouping",
543                },
544                "refresh_projection": "source_method returns ConsoleMember rows (agent_identity, role, state, model_capabilities, wired_to, labels). Clients must project: agent_id=agent_identity, member_id=agent_identity, label=labels.display_name||agent_identity, group=labels.group||role, addressable=labels.addressable!='false', model_capabilities.image_input default false, affordances derived from labels.singleton and addressable.",
545            },
546            "live_snapshot": {
547                "agents": sidebar_agents,
548            }
549        },
550        "identity_status": {
551            "panel_id": "console.identity_status",
552            "title": "Identity Status",
553            "schema_version": "1",
554            "refresh": {
555                "mode": "poll",
556                "interval_ms": 5000,
557            },
558            "source_method": if is_aggregate_console { "mobkit/console/list_identities" } else if has_mob { "mobkit/list_members" } else { "mobkit/status" },
559            "rows": identity_status_rows,
560        },
561        "activity_feed": {
562            "panel_id": "console.activity_feed",
563            "title": "Activity",
564            "schema_version": "1",
565            "refresh": {
566                "mode": "stream",
567                "topic": "all_events",
568                "update_semantics": "append",
569            },
570            "transport": "sse",
571            "source_route": "/console/timeline/stream",
572            "request_contract": {
573                "last_event_id_header": "optional Last-Event-ID checkpoint from prior event_id",
574            },
575            "event_contract": {
576                "envelope_fields": ["event_id", "interaction_id", "identity", "event_type", "timestamp_ms", "data"],
577                "event_type_path": "event_type",
578                "frame_format": "id: <event_id>\\nevent: <event_type>\\ndata: <event_json>\\n\\n",
579            },
580            "keep_alive": {
581                "interval_ms": SSE_KEEP_ALIVE_INTERVAL_MS,
582                "event": SSE_KEEP_ALIVE_EVENT_NAME,
583                "comment_frame": SSE_KEEP_ALIVE_COMMENT_FRAME,
584            },
585            "filter_presets": [
586                { "id": "all", "label": "All" },
587                { "id": "watched-only", "label": "Watched only", "watchedOnly": true },
588                { "id": "critical", "label": "Critical", "alertLevels": ["critical"] }
589            ],
590            "active_preset_id": "all"
591        },
592        "chat_inspector": {
593            "panel_id": "console.chat_inspector",
594            "title": "Chat Inspector",
595            "schema_version": "1",
596            "refresh": {
597                "mode": "stream",
598                "topic": "selected_identity",
599                "update_semantics": "append",
600            },
601            "send_method": "mobkit/console/send",
602            "observe_route": "/console/timeline/stream",
603            "transport": "rpc+sse",
604            "request_contract": {
605                "identity": "required target identity",
606                "content": "required user text to send",
607                "origin": "required caller identifier for audit and routing",
608            },
609            "response_contract": {
610                "interaction_id": "per-turn console correlation token",
611                "identity": "echoed target identity",
612            },
613            "event_contract": {
614                "envelope_fields": ["event_id", "interaction_id", "identity", "event_type", "timestamp_ms", "data"],
615                "event_type_path": "event_type",
616                "interaction_id_field": "interaction_id",
617            }
618        },
619        "topology": {
620            "panel_id": "console.topology",
621            "title": "Topology",
622            "schema_version": "1",
623            "refresh": {
624                "mode": "poll",
625                "interval_ms": 5000,
626            },
627            "source_method": if is_aggregate_console { "mobkit/console/list_identities" } else { "mobkit/status" },
628            "route_method": if is_aggregate_console { Value::Null } else { serde_json::json!("mobkit/routing/routes/list") },
629            "refresh_policy": {
630                "mode": "pull",
631                "poll_interval_ms": 5000,
632            },
633            "graph_contract": {
634                "node_id_field": "identity",
635                "edge_fields": ["wired_to"],
636            },
637            "live_snapshot": if live_snapshot.members.is_empty() {
638                // Module-only runtime: topology nodes are loaded module IDs.
639                serde_json::json!({
640                    "nodes": &live_snapshot.loaded_modules,
641                    "node_count": live_snapshot.loaded_modules.len(),
642                })
643            } else {
644                // Mob runtime: identity-native topology from members.
645                serde_json::json!({
646                    "nodes": live_snapshot.members.iter().map(|member| {
647                        let addressable = member.state == "active" && member
648                            .labels
649                            .get("addressable")
650                            .map(|value| value != "false")
651                            .unwrap_or(true);
652                        serde_json::json!({
653                            "identity": member.agent_identity,
654                            "label": member.labels.get("display_name").cloned().unwrap_or_else(|| member.agent_identity.clone()),
655                            "role": member.role,
656                            "state": member.state,
657                            "wired_to": member.wired_to,
658                            "addressable": addressable,
659                        })
660                    }).collect::<Vec<_>>(),
661                    "node_count": live_snapshot.members.len(),
662                })
663            }
664        },
665        "health_overview": {
666            "panel_id": "console.health_overview",
667            "title": "Health",
668            "schema_version": "1",
669            "refresh": {
670                "mode": "poll",
671                "interval_ms": 5000,
672            },
673            "source_method": if is_aggregate_console { "mobkit/console/list_identities" } else { "mobkit/status" },
674            "activity_source_method": if is_aggregate_console { Value::Null } else { serde_json::json!(EVENTS_SUBSCRIBE_METHOD) },
675            "activity_source_route": if is_aggregate_console { serde_json::json!("/console/timeline/stream") } else { Value::Null },
676            "refresh_policy": {
677                "mode": "pull_and_stream",
678                "poll_interval_ms": 5000,
679            },
680            "status_contract": {
681                "running_field": "running",
682                "loaded_modules_field": "loaded_modules",
683            },
684            "live_snapshot": {
685                "running": live_snapshot.running,
686                "loaded_modules": &live_snapshot.loaded_modules,
687                "loaded_module_count": &live_snapshot.loaded_modules.len(),
688                "identities": identity_status_rows,
689            }
690        },
691        "flows": if is_aggregate_console {
692            serde_json::json!({
693                "panel_id": "console.flows",
694                "title": "Flows",
695                "schema_version": "1",
696                "available": false,
697                "reason": "flow scheduling is not exposed by the console aggregator surface",
698            })
699        } else {
700            serde_json::json!({
701                "panel_id": "console.flows",
702                "title": "Flows",
703                "schema_version": "1",
704                "refresh": {
705                    "mode": "poll",
706                    "interval_ms": 10000,
707                },
708                "evaluate_method": "mobkit/scheduling/evaluate",
709                "dispatch_method": "mobkit/scheduling/dispatch",
710                "refresh_policy": {
711                    "mode": "pull",
712                    "poll_interval_ms": 10000,
713                },
714                "request_contract": {
715                    "schedules": "caller-supplied array of ScheduleDefinition (schedule_id, interval|cron, timezone, enabled)",
716                    "tick_ms": "evaluation timestamp in epoch milliseconds",
717                },
718                "evaluate_response_contract": {
719                    "tick_ms": "u64 - echoed evaluation tick",
720                    "due_triggers": "array of ScheduleTrigger {schedule_id, interval, timezone, due_tick_ms}",
721                },
722                "dispatch_response_contract": {
723                    "tick_ms": "u64 - echoed dispatch tick",
724                    "due_count": "usize - number of triggers that were due",
725                    "dispatched": "array of ScheduleDispatch {claim_key, schedule_id, interval, timezone, due_tick_ms, tick_ms, event_id, supervisor_signal?, runtime_injection?, runtime_injection_error?}",
726                    "skipped_claims": "array of schedule_id strings skipped due to idempotent claim",
727                },
728                "note": "Flows require caller-supplied schedule definitions; the runtime does not persist a flow registry. Clients must maintain their own schedule configs."
729            })
730        },
731        "session_history": if has_mob {
732            serde_json::json!({
733                "panel_id": "console.session_history",
734                "title": "Session History",
735                "schema_version": "1",
736                "refresh": {
737                    "mode": "poll",
738                    "interval_ms": 5000,
739                },
740                "source_method": "mobkit/console/query_timeline",
741                "transport": "rpc",
742                "available": true,
743                "request_contract": {
744                    "session_id": "required current session id for the identity/member",
745                    "offset": "optional message offset from the start of the transcript",
746                    "limit": "optional max messages to return",
747                },
748                "response_contract": {
749                    "success": "result is SessionHistoryPage {session_id, message_count, offset, limit, has_more, messages}",
750                }
751            })
752        } else {
753            serde_json::json!({
754                "panel_id": "console.session_history",
755                "title": "Session History",
756                "schema_version": "1",
757                "refresh": {
758                    "mode": "poll",
759                    "interval_ms": 5000,
760                },
761                "available": false,
762                "reason": "session history is projected through the console timeline",
763            })
764        }
765    });
766    if let Some(fetch_timeout_ms) = console_policy.fetch_timeout_ms {
767        body["console_policy"] = serde_json::json!({
768            "fetch_timeout_ms": fetch_timeout_ms,
769        });
770    }
771    body
772}
773
774fn build_identity_status_rows(sidebar_agents: &[Value]) -> Vec<Value> {
775    sidebar_agents
776        .iter()
777        .map(|agent| {
778            let identity = agent
779                .get("identity")
780                .and_then(Value::as_str)
781                .or_else(|| agent.get("member_id").and_then(Value::as_str))
782                .unwrap_or_default();
783            let display_name = agent
784                .get("label")
785                .and_then(Value::as_str)
786                .filter(|label| *label != identity);
787            let role = agent.get("role").and_then(Value::as_str);
788            let state = agent
789                .get("state")
790                .and_then(Value::as_str)
791                .unwrap_or("unknown");
792            let labels = agent
793                .get("labels")
794                .cloned()
795                .unwrap_or_else(|| serde_json::json!({}));
796            let addressability = if agent
797                .get("addressable")
798                .and_then(Value::as_bool)
799                .unwrap_or(true)
800            {
801                "addressable"
802            } else {
803                "internal_only"
804            };
805            let mut row = serde_json::json!({
806                "identity": identity,
807                "state": state,
808                "addressability": addressability,
809                "labels": labels,
810            });
811            if let Some(display_name) = display_name {
812                row["display_name"] = Value::String(display_name.to_string());
813            }
814            if let Some(role) = role {
815                row["role"] = Value::String(role.to_string());
816            }
817            if let Some(generation) = agent.get("generation").and_then(Value::as_u64) {
818                row["generation"] = Value::from(generation);
819            }
820            if let Some(checkpoint_version) =
821                agent.get("checkpoint_version").and_then(Value::as_u64)
822            {
823                row["checkpoint_version"] = Value::from(checkpoint_version);
824            }
825            if let Some(lease_healthy) = agent.get("lease_healthy").and_then(Value::as_bool) {
826                row["lease_healthy"] = Value::from(lease_healthy);
827            }
828            if let Some(response_phase) = agent.get("response_phase").and_then(Value::as_str) {
829                row["response_phase"] = Value::String(response_phase.to_string());
830            }
831            if let Some(model_capabilities) = agent.get("model_capabilities") {
832                row["model_capabilities"] = model_capabilities.clone();
833            }
834            if let Some(session_id) = agent.get("session_id").and_then(Value::as_str) {
835                row["session_id"] = Value::String(session_id.to_string());
836            }
837            row
838        })
839        .collect()
840}
841
842fn split_path_and_query(path: &str) -> (&str, BTreeMap<String, String>) {
843    let (base, query) = path.split_once('?').unwrap_or((path, ""));
844    let mut params = BTreeMap::new();
845    for part in query.split('&') {
846        if part.is_empty() {
847            continue;
848        }
849        let (k, v) = part.split_once('=').unwrap_or((part, ""));
850        if !k.is_empty() {
851            params.insert(k.to_string(), v.to_string());
852        }
853    }
854    (base, params)
855}
856
857fn resolve_console_auth(
858    decisions: &RuntimeDecisionState,
859    explicit_auth: Option<&ConsoleAccessRequest>,
860    query_params: &BTreeMap<String, String>,
861) -> Result<Option<ConsoleAccessRequest>, ConsoleAuthResolutionError> {
862    if let Some(auth) = explicit_auth {
863        return Ok(Some(auth.clone()));
864    }
865
866    if !decisions.console.require_app_auth {
867        return Ok(None);
868    }
869
870    // Check query-param auth_token (also used as the bearer-header injection
871    // point by the HTTP handler — see console_json_handler).
872    match query_params.get("auth_token") {
873        Some(token) => resolve_console_auth_from_token(decisions, token).map(Some),
874        None => Ok(None),
875    }
876}
877
878/// Extract a bearer token from an `Authorization: Bearer <token>` header value.
879pub fn extract_bearer_token_from_header(header_value: &str) -> Option<&str> {
880    let token = header_value.strip_prefix("Bearer ")?;
881    if token.is_empty() { None } else { Some(token) }
882}
883
884/// Validate a bearer token against the console's trusted OIDC config AND
885/// the email allowlist / provider policy (via `enforce_console_route_access`).
886/// Returns `true` only if the token is valid AND the caller is authorized.
887pub fn validate_console_token(decisions: &RuntimeDecisionState, token: &str) -> bool {
888    let auth = match resolve_console_auth_from_token(decisions, token) {
889        Ok(auth) => auth,
890        Err(_) => return false,
891    };
892    crate::decisions::enforce_console_route_access(&decisions.auth, &decisions.console, &auth)
893        .is_ok()
894}
895
896fn resolve_console_auth_from_token(
897    decisions: &RuntimeDecisionState,
898    token: &str,
899) -> Result<ConsoleAccessRequest, ConsoleAuthResolutionError> {
900    if decisions.trusted_oidc.audience.trim().is_empty() {
901        return Err(ConsoleAuthResolutionError::InvalidTrustedOidcConfig);
902    }
903
904    let discovery = parse_oidc_discovery_json(&decisions.trusted_oidc.discovery_json)
905        .map_err(|_| ConsoleAuthResolutionError::InvalidTrustedOidcConfig)?;
906    let jwks = parse_jwks_json(&decisions.trusted_oidc.jwks_json)
907        .map_err(|_| ConsoleAuthResolutionError::InvalidTrustedOidcConfig)?;
908    let header =
909        inspect_jwt_header(token).map_err(|_| ConsoleAuthResolutionError::InvalidTokenHeader)?;
910
911    if header.alg == "HS256"
912        && !hs256_allowed_for_development_issuer(&discovery.issuer, &discovery.jwks_uri)
913    {
914        return Err(ConsoleAuthResolutionError::Hs256NotAllowed);
915    }
916
917    let key = select_jwk_for_token(&jwks, header.kid.as_deref(), &header.alg)
918        .map_err(|_| ConsoleAuthResolutionError::JwksKeyNotFound)?;
919    let verification_key = build_jwt_verification_key(key, &header.alg)
920        .map_err(|_| ConsoleAuthResolutionError::InvalidJwksKeyMaterial)?;
921
922    let now_epoch_seconds = SystemTime::now()
923        .duration_since(UNIX_EPOCH)
924        .unwrap_or_default()
925        .as_secs();
926    let claims = validate_jwt_with_verification_key(
927        token,
928        &verification_key,
929        &JwtClaimsValidationConfig {
930            issuer: Some(discovery.issuer),
931            audience: Some(decisions.trusted_oidc.audience.clone()),
932            now_epoch_seconds,
933            leeway_seconds: 30,
934        },
935    )
936    .map_err(|_| ConsoleAuthResolutionError::InvalidToken)?;
937
938    let principal = claims
939        .email
940        .or(claims.subject)
941        .ok_or(ConsoleAuthResolutionError::MissingTokenIdentity)?;
942    let provider =
943        if claims.actor_type.as_deref() == Some("service") || principal.starts_with("svc:") {
944            AuthProvider::ServiceIdentity
945        } else {
946            match claims.provider.as_deref() {
947                Some("google_oauth") => AuthProvider::GoogleOAuth,
948                Some("github_oauth") => AuthProvider::GitHubOAuth,
949                Some("generic_oidc") => AuthProvider::GenericOidc,
950                _ => AuthProvider::GenericOidc,
951            }
952        };
953
954    Ok(ConsoleAccessRequest {
955        provider,
956        email: principal,
957    })
958}
959
960#[derive(Debug, Clone, PartialEq, Eq)]
961enum ConsoleAuthResolutionError {
962    InvalidTrustedOidcConfig,
963    InvalidTokenHeader,
964    JwksKeyNotFound,
965    InvalidJwksKeyMaterial,
966    InvalidToken,
967    MissingTokenIdentity,
968    Hs256NotAllowed,
969}
970
971fn console_auth_error_reason(error: &ConsoleAuthResolutionError) -> &'static str {
972    match error {
973        ConsoleAuthResolutionError::InvalidTrustedOidcConfig => "invalid_trusted_oidc_config",
974        ConsoleAuthResolutionError::InvalidTokenHeader => "invalid_token_header",
975        ConsoleAuthResolutionError::JwksKeyNotFound => "jwks_key_not_found",
976        ConsoleAuthResolutionError::InvalidJwksKeyMaterial => "invalid_jwks_key_material",
977        ConsoleAuthResolutionError::InvalidToken => "invalid_token",
978        ConsoleAuthResolutionError::MissingTokenIdentity => "missing_token_identity",
979        ConsoleAuthResolutionError::Hs256NotAllowed => "hs256_not_allowed",
980    }
981}
982
983fn hs256_allowed_for_development_issuer(issuer: &str, jwks_uri: &str) -> bool {
984    match (extract_uri_host(issuer), extract_uri_host(jwks_uri)) {
985        (Some(issuer_host), Some(jwks_host)) => {
986            is_development_host(issuer_host) && is_development_host(jwks_host)
987        }
988        _ => false,
989    }
990}
991
992fn extract_uri_host(uri: &str) -> Option<&str> {
993    let after_scheme = uri.split_once("://").map_or(uri, |(_, rest)| rest);
994    let authority_with_path = after_scheme.split('/').next()?;
995    let authority = authority_with_path
996        .rsplit('@')
997        .next()
998        .unwrap_or(authority_with_path);
999    if authority.is_empty() {
1000        return None;
1001    }
1002
1003    if let Some(stripped) = authority.strip_prefix('[') {
1004        let (ipv6_host, _) = stripped.split_once(']')?;
1005        return if ipv6_host.is_empty() {
1006            None
1007        } else {
1008            Some(ipv6_host)
1009        };
1010    }
1011
1012    let host = authority
1013        .split_once(':')
1014        .map_or(authority, |(hostname, _)| hostname);
1015    if host.is_empty() { None } else { Some(host) }
1016}
1017
1018fn is_development_host(host: &str) -> bool {
1019    let lowercase = host.to_ascii_lowercase();
1020    lowercase == "localhost"
1021        || lowercase == "127.0.0.1"
1022        || lowercase == "::1"
1023        || lowercase.ends_with(".localhost")
1024}
1025
1026fn auth_error_reason(error: &DecisionPolicyError) -> &'static str {
1027    match error {
1028        DecisionPolicyError::AuthProviderMismatch => "provider_mismatch",
1029        DecisionPolicyError::AuthProviderNotSupported => "provider_not_supported",
1030        DecisionPolicyError::EmailNotAllowlisted => "email_not_allowlisted",
1031        DecisionPolicyError::InvalidServiceIdentity => "invalid_service_identity",
1032        DecisionPolicyError::ServiceIdentityNotAllowlisted => "service_identity_not_allowlisted",
1033        _ => "policy_denied",
1034    }
1035}