Skip to main content

meerkat_mobkit/
http_console.rs

1//! HTTP routes for the admin console REST API.
2
3use async_stream::stream;
4use axum::extract::{DefaultBodyLimit, Multipart, Path as AxumPath, Query, State};
5use axum::http::{HeaderMap, HeaderValue, StatusCode, Uri, header};
6use axum::response::sse::{Event, KeepAlive, Sse};
7use axum::response::{IntoResponse, Redirect};
8use axum::routing::{get, post};
9use axum::{Json, Router};
10use base64::Engine;
11use futures::future::join_all;
12use meerkat_core::ContentInput;
13use meerkat_core::comms::TrustedPeerDescriptor;
14use meerkat_mob::MobState;
15use meerkat_mob::ids::{MeerkatId, MobId};
16use meerkat_mob::launch::MemberLaunchMode;
17use meerkat_mob::runtime::reconcile::MemberFilter;
18use meerkat_mob::{MobHandle, PeerTarget, ProfileName, SpawnMemberSpec};
19
20use crate::mob_handle_runtime::{
21    member_entry_to_json, model_capabilities_for_member_entry, model_capabilities_for_role,
22};
23use serde_json::{Value, json};
24use std::collections::{BTreeMap, BTreeSet};
25use std::convert::Infallible;
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use crate::blob_store::{BinaryBlobPayload, BinaryBlobStore, is_valid_blob_id_value};
30use crate::console_aggregator::{
31    AllowAllConsoleVisibilityPolicy, ConsoleCursor, ConsoleFrame, ConsoleLogResult,
32    ConsoleLogStore, ConsoleReplayUnavailable, ConsoleSendError, ConsoleSendRequest,
33    ConsoleTimelineEvent, ConsoleTimelineQuery, ConsoleVisibilityPolicy,
34    HideImplicitDelegateMembersConsoleVisibilityPolicy, MobKitConsoleAggregator,
35};
36use crate::contact_directory::ContactDirectory;
37use crate::http_sse::{DEFAULT_KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TEXT};
38use crate::mob_handle_runtime::{MEMBER_STATE_ACTIVE, MEMBER_STATE_RETIRING, MobRuntime};
39use crate::rpc::{JSONRPC_VERSION, JsonRpcError, JsonRpcRequest, JsonRpcResponse};
40use crate::runtime::MobkitRuntimeHandle;
41use crate::runtime::{
42    ConsoleAgentLiveSnapshot, ConsoleLiveSnapshot, ConsoleMember, ConsoleModelCapabilities,
43    ConsoleRestJsonRequest, DeliveryHistoryRequest, GatingDecideRequest, GatingDecision,
44    RuntimeDecisionState, extract_bearer_token_from_header,
45    handle_console_rest_json_route_with_snapshot, validate_console_token,
46};
47use crate::runtime::{MetadataScope, RuntimeMetadataTable, labels_to_json_value};
48use crate::unified_runtime::console_events::ConsoleEventStore;
49use crate::unified_runtime::mob_events::MobEventsStore;
50use crate::unified_runtime::{EventLogStore, EventQuery};
51
52#[derive(Clone)]
53pub struct ConsoleJsonState {
54    pub decisions: RuntimeDecisionState,
55    pub runtime: Option<MobRuntime>,
56    pub module_runtime: Option<std::sync::Arc<tokio::sync::Mutex<MobkitRuntimeHandle>>>,
57    pub contact_directory: Option<ContactDirectory>,
58    pub event_log: Option<std::sync::Arc<dyn EventLogStore>>,
59    /// Local gateway signing identity. Plumbed in so the console RPC
60    /// dispatch can answer `mobkit/peer_pubkey` and stamp non-inproc
61    /// `cross_mob/wire_local` descriptors with a real pubkey.
62    pub gateway_peer_keys: Option<crate::auth::peer_keys::GatewayPeerKeys>,
63    pub(crate) identity_runtime: Option<Arc<crate::identity_first::IdentityRuntime>>,
64    pub(crate) console_events: Option<ConsoleEventStore>,
65    pub(crate) console_aggregator: Option<MobKitConsoleAggregator>,
66    pub(crate) mob_events: Option<MobEventsStore>,
67    pub(crate) metadata_table: Option<std::sync::Arc<RuntimeMetadataTable>>,
68    pub(crate) visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
69    pub(crate) snapshot_read_model: ConsoleSnapshotReadModel,
70}
71
72#[derive(Clone, Default)]
73pub(crate) struct ConsoleSnapshotReadModel {
74    inner: Arc<tokio::sync::RwLock<ConsoleSnapshotReadModelState>>,
75    /// Mutex held by whichever task is currently running a refresh.
76    /// Background refreshes (from `refresh_soon`) skip when the lock
77    /// is contended; cold-cache request waiters acquire it via
78    /// `lock_owned().await`, which is the actual "the in-flight
79    /// refresh has finished" signal. See `prime_now`.
80    refresh_lock: Arc<tokio::sync::Mutex<()>>,
81    /// `true` once at least one refresh has populated `inner` with
82    /// real data. Snapshot reads gate on this so a cold cache never
83    /// returns an empty member list to the first request.
84    primed: Arc<std::sync::atomic::AtomicBool>,
85}
86
87#[derive(Clone, Default)]
88struct ConsoleSnapshotReadModelState {
89    running: Option<bool>,
90    session_id_by_identity: BTreeMap<String, String>,
91    session_owner_by_id: BTreeMap<String, String>,
92    /// Pre-projected primary-mob console members. The background refresh
93    /// populates this from `handle.list_all_members()` + projection; the
94    /// snapshot hot path just clones from here so it never touches
95    /// `MobHandle` async methods.
96    primary_members: Vec<ConsoleMember>,
97    /// Pre-projected delegate-mob member groups, one Vec per delegate mob,
98    /// each already carrying its host_identity / source_mob_id label
99    /// fixups. The snapshot hot path extends `members` with these instead
100    /// of walking delegate handles per-request.
101    delegate_member_groups: Vec<Vec<ConsoleMember>>,
102}
103
104impl ConsoleSnapshotReadModel {
105    /// Returns the current cached snapshot. On a cold cache (no
106    /// refresh has completed yet) the request thread drives the
107    /// first refresh inline — or, if a background refresh task
108    /// holds the lock, waits for it to finish before reading.
109    /// Either way, snapshot endpoints never see an empty member
110    /// list before the read model has been populated.
111    async fn snapshot(&self, runtime: &MobRuntime) -> ConsoleSnapshotReadModelState {
112        if !self.primed.load(std::sync::atomic::Ordering::Acquire) {
113            self.prime_now(runtime).await;
114        }
115        self.inner.read().await.clone()
116    }
117
118    /// Cold-cache priming. Acquires `refresh_lock` via the awaiting
119    /// (FIFO) path:
120    ///
121    /// - If no refresh task currently holds the lock, we acquire
122    ///   it immediately and run the refresh inline. Subsequent
123    ///   waiters that come in while we're running will queue
124    ///   behind us in the same lock.
125    /// - If a refresh task (spawned by `refresh_soon`) holds the
126    ///   lock, our `lock_owned().await` parks until the task
127    ///   drops the guard. By construction, the task only drops
128    ///   the guard *after* writing `inner` and setting `primed`.
129    ///   So when we acquire the lock, the cache is already
130    ///   populated and the second `primed` check returns early
131    ///   without redoing the work.
132    ///
133    /// No `Notify` is involved, so there's no lost-wake race to
134    /// reason about: the lock release is the signal, and `tokio`'s
135    /// Mutex enforces FIFO acquisition fairness so a `try_lock`
136    /// caller can't barge past a queued `lock_owned` waiter.
137    async fn prime_now(&self, runtime: &MobRuntime) {
138        if self.primed.load(std::sync::atomic::Ordering::Acquire) {
139            return;
140        }
141        let _guard = self.refresh_lock.clone().lock_owned().await;
142        if self.primed.load(std::sync::atomic::Ordering::Acquire) {
143            return;
144        }
145        let refreshed = collect_console_snapshot_read_model(runtime).await;
146        *self.inner.write().await = refreshed;
147        self.primed
148            .store(true, std::sync::atomic::Ordering::Release);
149        // _guard drops here, releasing the lock and waking the next
150        // queued cold-cache waiter (if any). They'll see `primed`
151        // true after acquiring and return early.
152    }
153
154    /// Fire-and-forget background refresh. If a refresh is already
155    /// in flight (lock contended) we skip — the in-flight one is
156    /// enough. The request hot path doesn't call this; it goes
157    /// through `prime_now` on cold cache so it always gets a
158    /// populated snapshot. `refresh_soon` exists to keep a hot
159    /// cache fresh over time without blocking response requests.
160    fn refresh_soon(&self, runtime: MobRuntime) {
161        let Ok(runtime_handle) = tokio::runtime::Handle::try_current() else {
162            return;
163        };
164        let Ok(guard) = self.refresh_lock.clone().try_lock_owned() else {
165            return;
166        };
167        let inner = Arc::clone(&self.inner);
168        let primed = Arc::clone(&self.primed);
169        runtime_handle.spawn(async move {
170            let _guard = guard;
171            let refreshed = collect_console_snapshot_read_model(&runtime).await;
172            *inner.write().await = refreshed;
173            primed.store(true, std::sync::atomic::Ordering::Release);
174            // _guard drops; cold-cache waiters parked on
175            // `lock_owned().await` in `prime_now` wake here and
176            // observe `primed = true` after acquiring.
177        });
178    }
179}
180
181const CONSOLE_FRONTEND_INDEX_HTML: &str = include_str!("../console-dist/index.html");
182const CONSOLE_FRONTEND_APP_JS: &str = include_str!("../console-dist/console-app.js");
183const CONSOLE_FRONTEND_APP_CSS: &str = include_str!("../console-dist/console-app.css");
184const MAX_MULTIPART_IMAGE_BYTES: usize = 25 * 1024 * 1024;
185const MAX_MULTIPART_IMAGES: usize = 4;
186const MAX_MULTIPART_BODY_BYTES: usize =
187    (MAX_MULTIPART_IMAGE_BYTES * MAX_MULTIPART_IMAGES) + 1024 * 1024;
188
189pub fn console_json_router(decisions: RuntimeDecisionState) -> Router {
190    console_json_router_with_state(ConsoleJsonState {
191        decisions,
192        runtime: None,
193        module_runtime: None,
194        contact_directory: None,
195        event_log: None,
196        gateway_peer_keys: None,
197        identity_runtime: None,
198        console_events: None,
199        console_aggregator: None,
200        mob_events: None,
201        metadata_table: None,
202        visibility_policy: Arc::new(HideImplicitDelegateMembersConsoleVisibilityPolicy),
203        snapshot_read_model: ConsoleSnapshotReadModel::default(),
204    })
205}
206
207pub fn console_json_router_with_aggregator(
208    decisions: RuntimeDecisionState,
209    console_aggregator: MobKitConsoleAggregator,
210) -> Router {
211    console_json_router_with_state(ConsoleJsonState {
212        decisions,
213        runtime: None,
214        module_runtime: None,
215        contact_directory: None,
216        event_log: None,
217        gateway_peer_keys: None,
218        identity_runtime: None,
219        console_events: None,
220        console_aggregator: Some(console_aggregator),
221        mob_events: None,
222        metadata_table: None,
223        visibility_policy: Arc::new(HideImplicitDelegateMembersConsoleVisibilityPolicy),
224        snapshot_read_model: ConsoleSnapshotReadModel::default(),
225    })
226}
227
228pub fn console_json_router_with_runtime(
229    decisions: RuntimeDecisionState,
230    runtime: MobRuntime,
231    contact_directory: Option<ContactDirectory>,
232    event_log: Option<std::sync::Arc<dyn EventLogStore>>,
233) -> Router {
234    console_json_router_with_runtime_and_events(
235        decisions,
236        runtime,
237        None,
238        contact_directory,
239        event_log,
240        None,
241        None,
242        None,
243        None,
244        None,
245        None,
246    )
247}
248
249#[allow(clippy::too_many_arguments)]
250pub(crate) fn console_json_router_with_runtime_and_events(
251    decisions: RuntimeDecisionState,
252    runtime: MobRuntime,
253    module_runtime: Option<std::sync::Arc<tokio::sync::Mutex<MobkitRuntimeHandle>>>,
254    contact_directory: Option<ContactDirectory>,
255    event_log: Option<std::sync::Arc<dyn EventLogStore>>,
256    gateway_peer_keys: Option<crate::auth::peer_keys::GatewayPeerKeys>,
257    console_events: Option<ConsoleEventStore>,
258    console_log_store: Option<std::sync::Arc<dyn ConsoleLogStore>>,
259    mob_events: Option<MobEventsStore>,
260    metadata_table: Option<std::sync::Arc<RuntimeMetadataTable>>,
261    identity_runtime: Option<Arc<crate::identity_first::IdentityRuntime>>,
262) -> Router {
263    console_json_router_with_runtime_events_and_policy(
264        decisions,
265        runtime,
266        module_runtime,
267        contact_directory,
268        event_log,
269        gateway_peer_keys,
270        console_events,
271        console_log_store,
272        mob_events,
273        metadata_table,
274        identity_runtime,
275        Arc::new(HideImplicitDelegateMembersConsoleVisibilityPolicy),
276    )
277}
278
279#[allow(clippy::too_many_arguments)]
280pub(crate) fn console_json_router_with_runtime_events_and_policy(
281    decisions: RuntimeDecisionState,
282    runtime: MobRuntime,
283    module_runtime: Option<std::sync::Arc<tokio::sync::Mutex<MobkitRuntimeHandle>>>,
284    contact_directory: Option<ContactDirectory>,
285    event_log: Option<std::sync::Arc<dyn EventLogStore>>,
286    gateway_peer_keys: Option<crate::auth::peer_keys::GatewayPeerKeys>,
287    console_events: Option<ConsoleEventStore>,
288    console_log_store: Option<std::sync::Arc<dyn ConsoleLogStore>>,
289    mob_events: Option<MobEventsStore>,
290    metadata_table: Option<std::sync::Arc<RuntimeMetadataTable>>,
291    identity_runtime: Option<Arc<crate::identity_first::IdentityRuntime>>,
292    visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
293) -> Router {
294    let console_aggregator = console_events.clone().map(|events| {
295        if let Some(store) = console_log_store {
296            let aggregator = MobKitConsoleAggregator::new(store);
297            aggregator.register_runtime_handles_with_policy(
298                "default",
299                "",
300                runtime.clone(),
301                events,
302                visibility_policy.clone(),
303            );
304            aggregator
305        } else {
306            let aggregator = MobKitConsoleAggregator::in_memory();
307            aggregator.register_runtime_handles_with_policy(
308                "default",
309                "",
310                runtime.clone(),
311                events,
312                visibility_policy.clone(),
313            );
314            aggregator
315        }
316    });
317    let snapshot_read_model = ConsoleSnapshotReadModel::default();
318    snapshot_read_model.refresh_soon(runtime.clone());
319    console_json_router_with_state(ConsoleJsonState {
320        decisions,
321        runtime: Some(runtime),
322        module_runtime,
323        contact_directory,
324        event_log,
325        gateway_peer_keys,
326        identity_runtime,
327        console_events,
328        console_aggregator,
329        mob_events,
330        metadata_table,
331        visibility_policy,
332        snapshot_read_model,
333    })
334}
335
336pub fn console_frontend_router() -> Router {
337    Router::new()
338        .route("/", get(|| async { Redirect::temporary("/console") }))
339        .route("/favicon.ico", get(|| async { StatusCode::NO_CONTENT }))
340        .route("/console", get(console_frontend_index_handler))
341        .route("/console/", get(console_frontend_index_handler))
342        .route(
343            "/console/assets/console-app.js",
344            get(console_frontend_app_js_handler),
345        )
346        .route(
347            "/console/assets/console-app.css",
348            get(console_frontend_app_css_handler),
349        )
350}
351
352fn console_json_router_with_state(state: ConsoleJsonState) -> Router {
353    let router = Router::new()
354        .route("/console/experience", get(console_json_handler))
355        .route("/console/modules", get(console_json_handler))
356        .route("/console/identities", get(console_identities_handler))
357        .route("/console/timeline", get(console_timeline_handler))
358        .route(
359            "/console/timeline/stream",
360            get(console_timeline_stream_handler),
361        )
362        .route(
363            "/console/identity/{identity}/stream",
364            get(console_identity_timeline_stream_handler),
365        )
366        .route("/console/send", post(console_send_handler))
367        .route("/console/rpc", post(console_rpc_handler))
368        .route(
369            "/console/rpc/multipart",
370            post(console_rpc_multipart_handler)
371                .layer(DefaultBodyLimit::max(MAX_MULTIPART_BODY_BYTES)),
372        )
373        .route("/blobs/{blob_id}", get(blob_get_handler));
374    router.with_state(state)
375}
376
377pub async fn console_json_handler(
378    State(state): State<ConsoleJsonState>,
379    headers: HeaderMap,
380    uri: Uri,
381) -> impl IntoResponse {
382    let mut path = uri
383        .path_and_query()
384        .map(|path_and_query| path_and_query.as_str().to_string())
385        .unwrap_or_else(|| uri.path().to_string());
386
387    // If the request carries a Bearer token and the URL doesn't already have
388    // an auth_token query param, inject it so the console-ingress auth
389    // resolver can validate it through the existing query-param path.
390    //
391    // JWT tokens use base64url characters (A-Za-z0-9_-.) plus optional '='
392    // padding. We percent-encode the bearer when injecting so opaque
393    // bearer tokens containing `&`, `=`, `+`, `%`, etc. (legal under
394    // RFC 6750 §2.1) survive the round trip — pre-fix, an `&` made
395    // injection skip and authentication fail. Substring detection of
396    // an existing `auth_token=` is now key-aware via form_urlencoded
397    // so `xauth_token=` doesn't masquerade as the real key.
398    let already_has_token = path
399        .split_once('?')
400        .map(|(_, q)| form_urlencoded::parse(q.as_bytes()).any(|(key, _)| key == "auth_token"))
401        .unwrap_or(false);
402    if !already_has_token
403        && let Some(bearer) = headers
404            .get(header::AUTHORIZATION)
405            .and_then(|v| v.to_str().ok())
406            .and_then(extract_bearer_token_from_header)
407    {
408        let encoded: String = form_urlencoded::byte_serialize(bearer.as_bytes()).collect();
409        let sep = if path.contains('?') { '&' } else { '?' };
410        path = format!("{path}{sep}auth_token={encoded}");
411    }
412
413    let config_module_ids: Vec<String> = state
414        .decisions
415        .modules
416        .iter()
417        .map(|m| m.id.clone())
418        .collect();
419    let live_snapshot = match &state.runtime {
420        Some(runtime) => {
421            state.snapshot_read_model.refresh_soon(runtime.clone());
422            Some(
423                build_live_snapshot(
424                    runtime,
425                    &config_module_ids,
426                    state.console_events.as_ref(),
427                    state.visibility_policy.as_ref(),
428                    &state.snapshot_read_model,
429                )
430                .await,
431            )
432        }
433        None => match &state.console_aggregator {
434            Some(aggregator) => build_aggregator_live_snapshot(aggregator, &config_module_ids)
435                .await
436                .ok(),
437            None => None,
438        },
439    }
440    .map(|mut snapshot| {
441        apply_console_visibility_policy(&mut snapshot, state.visibility_policy.as_ref());
442        snapshot
443    });
444
445    let response = handle_console_rest_json_route_with_snapshot(
446        &state.decisions,
447        &ConsoleRestJsonRequest {
448            method: "GET".to_string(),
449            path,
450            auth: None,
451        },
452        live_snapshot.as_ref(),
453    );
454    let status = StatusCode::from_u16(response.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
455    (status, Json::<Value>(response.body))
456}
457
458pub async fn console_rpc_handler(
459    State(state): State<ConsoleJsonState>,
460    headers: HeaderMap,
461    uri: Uri,
462    Json(request): Json<Value>,
463) -> impl IntoResponse {
464    // Parse the request early so we can check the method for auth gating.
465    let parsed_request = match serde_json::from_value::<JsonRpcRequest>(request) {
466        Ok(req) => req,
467        Err(_) => {
468            return (
469                StatusCode::OK,
470                Json::<Value>(serde_json::json!({
471                    "jsonrpc": JSONRPC_VERSION,
472                    "id": Value::Null,
473                    "error": { "code": -32600, "message": "Invalid Request" }
474                })),
475            );
476        }
477    };
478
479    // Auth enforcement:
480    // - When require_app_auth is true: validate bearer token (OIDC + allowlist)
481    // - When require_app_auth is false: only allow read-only methods
482    //   (mutating operations require auth to be configured)
483    if !console_request_authorized(&state, &headers, &uri) {
484        return (
485            StatusCode::UNAUTHORIZED,
486            Json::<Value>(serde_json::json!({
487                "jsonrpc": JSONRPC_VERSION,
488                "id": parsed_request.id.unwrap_or(Value::Null),
489                "error": {
490                    "code": -32600,
491                    "message": "unauthorized: console rpc requires a valid auth token",
492                }
493            })),
494        );
495    }
496    // No auth configured: all methods allowed. The operator has explicitly
497    // opted out of authentication (require_app_auth = false), so the console
498    // is an open local deployment where every RPC method should work.
499
500    // By this point the request is always authorized:
501    // - require_app_auth=true: an invalid token already returned 401 above.
502    // - require_app_auth=false: all methods are permitted unconditionally.
503    // Either way, capabilities should reflect that all methods are available.
504    let is_authenticated = true;
505    let Some(runtime) = &state.runtime else {
506        let response_value = handle_console_aggregator_rpc(
507            state.console_aggregator.clone(),
508            parsed_request,
509            is_authenticated,
510        )
511        .await;
512        return (StatusCode::OK, Json::<Value>(response_value));
513    };
514
515    let response_value = Box::pin(handle_console_runtime_rpc(
516        runtime,
517        state.module_runtime.clone(),
518        state.contact_directory.as_ref(),
519        state.gateway_peer_keys.as_ref(),
520        state.console_events.clone(),
521        state.console_aggregator.clone(),
522        state.identity_runtime.clone(),
523        state.metadata_table.clone(),
524        state.mob_events.clone(),
525        parsed_request,
526        is_authenticated,
527    ))
528    .await;
529    (StatusCode::OK, Json::<Value>(response_value))
530}
531
532#[derive(Debug, serde::Deserialize)]
533struct ConsoleTimelineHttpQuery {
534    #[serde(default)]
535    identity: Option<String>,
536    #[serde(default)]
537    conversation_id: Option<String>,
538    #[serde(default)]
539    after: Option<String>,
540    #[serde(default)]
541    limit: Option<usize>,
542}
543
544async fn console_identities_handler(
545    State(state): State<ConsoleJsonState>,
546    headers: HeaderMap,
547    uri: Uri,
548) -> impl IntoResponse {
549    if !console_request_authorized(&state, &headers, &uri) {
550        return console_json_error(
551            StatusCode::UNAUTHORIZED,
552            "unauthorized",
553            "console identities require a valid auth token",
554        );
555    }
556    let Some(aggregator) = &state.console_aggregator else {
557        return console_json_error(
558            StatusCode::NOT_FOUND,
559            "unavailable",
560            "console aggregator unavailable",
561        );
562    };
563    let aggregator = aggregator.clone();
564    match aggregator.list_identities().await {
565        Ok(identities) => (
566            StatusCode::OK,
567            Json::<Value>(json!({ "identities": identities })),
568        )
569            .into_response(),
570        Err(err) => console_json_error(
571            StatusCode::INTERNAL_SERVER_ERROR,
572            "internal_error",
573            &err.to_string(),
574        ),
575    }
576}
577
578async fn console_timeline_handler(
579    State(state): State<ConsoleJsonState>,
580    headers: HeaderMap,
581    uri: Uri,
582    Query(query): Query<ConsoleTimelineHttpQuery>,
583) -> impl IntoResponse {
584    if !console_request_authorized(&state, &headers, &uri) {
585        return console_json_error(
586            StatusCode::UNAUTHORIZED,
587            "unauthorized",
588            "console timeline requires a valid auth token",
589        );
590    }
591    let Some(aggregator) = &state.console_aggregator else {
592        return console_json_error(
593            StatusCode::NOT_FOUND,
594            "unavailable",
595            "console aggregator unavailable",
596        );
597    };
598    let timeline_query = timeline_query_from_http(query, None);
599    match aggregator.query_timeline(timeline_query).await {
600        Ok(page) => (
601            StatusCode::OK,
602            Json::<Value>(serde_json::to_value(page).unwrap_or_else(|_| json!({ "frames": [] }))),
603        )
604            .into_response(),
605        Err(err) => {
606            console_json_error(StatusCode::CONFLICT, "replay_unavailable", &err.to_string())
607        }
608    }
609}
610
611async fn console_send_handler(
612    State(state): State<ConsoleJsonState>,
613    headers: HeaderMap,
614    uri: Uri,
615    Json(request): Json<ConsoleSendRequest>,
616) -> impl IntoResponse {
617    if !console_request_authorized(&state, &headers, &uri) {
618        return console_json_error(
619            StatusCode::UNAUTHORIZED,
620            "unauthorized",
621            "console send requires a valid auth token",
622        );
623    }
624    let Some(aggregator) = &state.console_aggregator else {
625        return console_json_error(
626            StatusCode::NOT_FOUND,
627            "unavailable",
628            "console aggregator unavailable",
629        );
630    };
631    if let Some(identity_runtime) = &state.identity_runtime {
632        return match console_send_identity_first(
633            aggregator,
634            identity_runtime,
635            state.console_events.as_ref(),
636            request,
637        )
638        .await
639        {
640            Ok(accepted) => (
641                StatusCode::OK,
642                Json::<Value>(
643                    serde_json::to_value(accepted).unwrap_or_else(|_| json!({ "accepted": true })),
644                ),
645            )
646                .into_response(),
647            Err(err) => console_send_error_response(err),
648        };
649    }
650    match aggregator.send(request).await {
651        Ok(accepted) => (
652            StatusCode::OK,
653            Json::<Value>(
654                serde_json::to_value(accepted).unwrap_or_else(|_| json!({ "accepted": true })),
655            ),
656        )
657            .into_response(),
658        Err(err) => console_send_error_response(err),
659    }
660}
661
662async fn console_send_identity_first(
663    aggregator: &MobKitConsoleAggregator,
664    identity_runtime: &crate::identity_first::IdentityRuntime,
665    console_events: Option<&ConsoleEventStore>,
666    mut request: ConsoleSendRequest,
667) -> Result<crate::console_aggregator::ConsoleInteractionAccepted, ConsoleSendError> {
668    let requested_identity = request.identity.clone();
669    let parsed_identity = crate::identity_first::AgentIdentity::parse(request.identity.as_str())
670        .map_err(|err| ConsoleSendError::InvalidRequest(format!("invalid identity: {err}")))?;
671    let content: ContentInput = serde_json::from_value(request.content.clone())
672        .map_err(|err| ConsoleSendError::InvalidContent(err.to_string()))?;
673    if let ContentInput::Text(text) = &content
674        && text.trim().is_empty()
675    {
676        return Err(ConsoleSendError::InvalidContent(
677            "content must be non-empty".to_string(),
678        ));
679    }
680    if let ContentInput::Blocks(blocks) = &content
681        && blocks.is_empty()
682    {
683        return Err(ConsoleSendError::InvalidContent(
684            "content blocks must be non-empty".to_string(),
685        ));
686    }
687
688    let (identity, status) = match identity_runtime.status(&parsed_identity).await {
689        Ok(status) => (parsed_identity, status),
690        Err(original_err) => {
691            let Some(canonical_identity) =
692                resolve_console_send_identity_alias(aggregator, &requested_identity).await
693            else {
694                return Err(identity_runtime_error_to_console_send_error(
695                    requested_identity.as_str(),
696                    original_err,
697                ));
698            };
699            let identity = crate::identity_first::AgentIdentity::parse(canonical_identity.as_str())
700                .map_err(|err| {
701                    ConsoleSendError::InvalidRequest(format!("invalid aliased identity: {err}"))
702                })?;
703            let status = identity_runtime.status(&identity).await.map_err(|_| {
704                identity_runtime_error_to_console_send_error(
705                    requested_identity.as_str(),
706                    original_err,
707                )
708            })?;
709            request.identity = canonical_identity;
710            (identity, status)
711        }
712    };
713    let session_id = status
714        .session_id
715        .as_ref()
716        .map(std::string::ToString::to_string);
717    let runtime_member_id = status
718        .agent_runtime_id
719        .as_ref()
720        .map(|id| id.as_str().to_string());
721    let accepted = aggregator
722        .reserve_identity_first_interaction(request.clone(), session_id.as_deref())
723        .await?;
724
725    if let Some(events) = console_events {
726        events
727            .reserve_interaction_value(
728                identity.as_str(),
729                runtime_member_id.as_deref(),
730                &accepted.interaction_id,
731                &request.origin,
732                request.content.clone(),
733            )
734            .await
735            .map_err(ConsoleSendError::State)?;
736    }
737
738    match identity_runtime.send(&identity, &content).await {
739        Ok(_) => Ok(accepted),
740        Err(err) => {
741            let _ = aggregator
742                .mark_interaction_delivery_failed(&accepted.input_frame_id)
743                .await;
744            if let Some(events) = console_events {
745                events
746                    .record_lifecycle(
747                        identity.as_str(),
748                        "interaction_failed",
749                        json!({
750                            "interaction_id": accepted.interaction_id,
751                            "origin": request.origin,
752                            "error": err.to_string(),
753                        }),
754                    )
755                    .await;
756            }
757            Err(identity_runtime_error_to_console_send_error(
758                identity.as_str(),
759                err,
760            ))
761        }
762    }
763}
764
765async fn resolve_console_send_identity_alias(
766    aggregator: &MobKitConsoleAggregator,
767    requested_identity: &str,
768) -> Option<String> {
769    let identities = aggregator.list_identities().await.ok()?;
770    identities
771        .into_iter()
772        .find(|record| record.runtime_member_id == requested_identity)
773        .map(|record| record.identity)
774}
775
776fn identity_runtime_error_to_console_send_error(
777    identity: &str,
778    err: crate::identity_first::IdentityRuntimeError,
779) -> ConsoleSendError {
780    match err {
781        crate::identity_first::IdentityRuntimeError::UnknownIdentity(_) => {
782            ConsoleSendError::UnknownIdentity(identity.to_string())
783        }
784        crate::identity_first::IdentityRuntimeError::NotAddressable(_) => {
785            ConsoleSendError::NotAddressable(identity.to_string())
786        }
787        crate::identity_first::IdentityRuntimeError::InvalidState { .. } => {
788            ConsoleSendError::Retired(identity.to_string())
789        }
790        other => ConsoleSendError::Dispatch(other.to_string()),
791    }
792}
793
794async fn console_timeline_stream_handler(
795    State(state): State<ConsoleJsonState>,
796    headers: HeaderMap,
797    uri: Uri,
798    Query(query): Query<ConsoleTimelineHttpQuery>,
799) -> impl IntoResponse {
800    if !console_request_authorized(&state, &headers, &uri) {
801        return console_json_error(
802            StatusCode::UNAUTHORIZED,
803            "unauthorized",
804            "console timeline stream requires a valid auth token",
805        );
806    }
807    let Some(aggregator) = &state.console_aggregator else {
808        return console_json_error(
809            StatusCode::NOT_FOUND,
810            "unavailable",
811            "console aggregator unavailable",
812        );
813    };
814    let aggregator = aggregator.clone();
815    let last_event_id = headers
816        .get("last-event-id")
817        .and_then(|value| value.to_str().ok())
818        .map(str::trim)
819        .filter(|value| !value.is_empty())
820        .map(ToString::to_string);
821    let timeline_query = timeline_query_from_http(query, last_event_id);
822    let mut rx = aggregator.subscribe();
823    let (snapshot_frames, snapshot_cursor) =
824        match query_timeline_snapshot(&aggregator, timeline_query.clone()).await {
825            Ok(snapshot) => snapshot,
826            Err(_) => {
827                let latest_cursor = aggregator.latest_cursor().await.ok().flatten();
828                let requested_cursor = timeline_query
829                    .after
830                    .as_ref()
831                    .map(ToString::to_string)
832                    .unwrap_or_default();
833                return (
834                    StatusCode::CONFLICT,
835                    Json::<Value>(
836                        serde_json::to_value(ConsoleReplayUnavailable {
837                            error: "replay_unavailable".to_string(),
838                            requested_cursor,
839                            latest_cursor,
840                        })
841                        .unwrap_or_else(|_| json!({ "error": "replay_unavailable" })),
842                    ),
843                )
844                    .into_response();
845            }
846        };
847    let identity = timeline_query.identity.clone();
848    let conversation_id = timeline_query.conversation_id.clone();
849    let snapshot_after = timeline_query.after.clone();
850    let stream = stream! {
851        if let Some(event) = sse_event_from_timeline_event(&ConsoleTimelineEvent::SnapshotStarted { after: snapshot_after }) {
852            yield Ok::<Event, Infallible>(event);
853        }
854        let mut latest_cursor = snapshot_cursor;
855        for frame in snapshot_frames {
856            latest_cursor = Some(frame.cursor.clone());
857            if let Some(event) = sse_event_from_timeline_event(&ConsoleTimelineEvent::ConsoleFrame { frame }) {
858                yield Ok::<Event, Infallible>(event);
859            }
860        }
861        if let Some(event) = sse_event_from_timeline_event(&ConsoleTimelineEvent::SnapshotComplete { cursor: latest_cursor.clone() }) {
862            yield Ok::<Event, Infallible>(event);
863        }
864        loop {
865            match rx.recv().await {
866                Ok(event) if timeline_event_matches(&event, identity.as_deref(), conversation_id.as_deref()) => {
867                    if !aggregator.timeline_event_visible(&event).await {
868                        continue;
869                    }
870                    if let Some(event_cursor) = timeline_event_cursor(&event)
871                        && let Some(current_cursor) = latest_cursor.as_ref()
872                        && !cursor_is_after(event_cursor, current_cursor)
873                    {
874                        continue;
875                    }
876                    if let Some(sse) = sse_event_from_timeline_event(&event) {
877                        if let Some(event_cursor) = timeline_event_cursor(&event) {
878                            latest_cursor = Some(event_cursor.clone());
879                        }
880                        yield Ok::<Event, Infallible>(sse);
881                    }
882                }
883                Ok(_) => {}
884                Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
885                    let event = ConsoleTimelineEvent::ReplayUnavailable {
886                        requested_cursor: format!("lagged:{skipped}"),
887                        latest_cursor: None,
888                    };
889                    if let Some(sse) = sse_event_from_timeline_event(&event) {
890                        yield Ok::<Event, Infallible>(sse);
891                    }
892                }
893                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
894            }
895        }
896    };
897    Sse::new(stream)
898        .keep_alive(
899            KeepAlive::new()
900                .interval(DEFAULT_KEEP_ALIVE_INTERVAL)
901                .text(KEEP_ALIVE_TEXT),
902        )
903        .into_response()
904}
905
906async fn console_identity_timeline_stream_handler(
907    State(state): State<ConsoleJsonState>,
908    headers: HeaderMap,
909    uri: Uri,
910    AxumPath(identity): AxumPath<String>,
911    Query(mut query): Query<ConsoleTimelineHttpQuery>,
912) -> impl IntoResponse {
913    query.identity = Some(identity);
914    Box::pin(console_timeline_stream_handler(
915        State(state),
916        headers,
917        uri,
918        Query(query),
919    ))
920    .await
921    .into_response()
922}
923
924fn timeline_query_from_http(
925    query: ConsoleTimelineHttpQuery,
926    fallback_after: Option<String>,
927) -> ConsoleTimelineQuery {
928    let after = query.after.or(fallback_after).map(ConsoleCursor::from);
929    ConsoleTimelineQuery {
930        identity: query
931            .identity
932            .map(|value| value.trim().to_string())
933            .filter(|value| !value.is_empty()),
934        conversation_id: query
935            .conversation_id
936            .map(|value| value.trim().to_string())
937            .filter(|value| !value.is_empty()),
938        after,
939        limit: query.limit.unwrap_or(200),
940    }
941}
942
943async fn query_timeline_snapshot(
944    aggregator: &MobKitConsoleAggregator,
945    mut query: ConsoleTimelineQuery,
946) -> ConsoleLogResult<(Vec<ConsoleFrame>, Option<ConsoleCursor>)> {
947    const MAX_SNAPSHOT_PAGES: usize = 100;
948    const STORE_PAGE_LIMIT: usize = 1_000;
949    const DEFAULT_SNAPSHOT_LIMIT: usize = 200;
950    let mut frames = Vec::new();
951    let mut latest_cursor = query.after.clone();
952    if query.after.is_none() {
953        query.limit = if query.limit == 0 {
954            DEFAULT_SNAPSHOT_LIMIT
955        } else {
956            query.limit
957        }
958        .clamp(1, STORE_PAGE_LIMIT);
959        return query_fresh_timeline_snapshot(aggregator, query, STORE_PAGE_LIMIT).await;
960    }
961    query.limit = STORE_PAGE_LIMIT;
962    let query_identity = query.identity.clone();
963    for page_idx in 0..MAX_SNAPSHOT_PAGES {
964        let page = aggregator.store().query_frames(query.clone()).await?;
965        if page.frames.is_empty() {
966            break;
967        }
968        latest_cursor = page.next_cursor.clone();
969        let page_len = page.frames.len();
970        frames.extend(
971            visible_snapshot_frames(aggregator, page.frames, query_identity.as_deref()).await?,
972        );
973        query.after = latest_cursor.clone();
974        if page_len < STORE_PAGE_LIMIT {
975            break;
976        }
977        if page_idx + 1 == MAX_SNAPSHOT_PAGES {
978            return Err(Box::new(std::io::Error::other(
979                "timeline replay exceeded maximum snapshot pages",
980            )));
981        }
982    }
983    Ok((frames, latest_cursor))
984}
985
986async fn query_fresh_timeline_snapshot(
987    aggregator: &MobKitConsoleAggregator,
988    mut query: ConsoleTimelineQuery,
989    store_page_limit: usize,
990) -> ConsoleLogResult<(Vec<ConsoleFrame>, Option<ConsoleCursor>)> {
991    let requested_limit = query.limit;
992    query.limit = store_page_limit;
993    let query_identity = query.identity.clone();
994    let mut latest_cursor = None;
995    let mut tail = std::collections::VecDeque::with_capacity(requested_limit);
996    loop {
997        let page = aggregator.store().query_frames(query.clone()).await?;
998        if page.frames.is_empty() {
999            break;
1000        }
1001        latest_cursor = page.next_cursor.clone();
1002        let page_len = page.frames.len();
1003        for frame in
1004            visible_snapshot_frames(aggregator, page.frames, query_identity.as_deref()).await?
1005        {
1006            if tail.len() >= requested_limit {
1007                tail.pop_front();
1008            }
1009            tail.push_back(frame);
1010        }
1011        query.after = latest_cursor.clone();
1012        if page_len < query.limit {
1013            break;
1014        }
1015    }
1016    Ok((tail.into_iter().collect(), latest_cursor))
1017}
1018
1019async fn visible_snapshot_frames(
1020    aggregator: &MobKitConsoleAggregator,
1021    frames: Vec<ConsoleFrame>,
1022    identity: Option<&str>,
1023) -> ConsoleLogResult<Vec<ConsoleFrame>> {
1024    let mut visible = Vec::with_capacity(frames.len());
1025    for frame in frames {
1026        if aggregator
1027            .timeline_frame_visible_for_query(&frame, identity)
1028            .await
1029        {
1030            visible.push(frame);
1031        }
1032    }
1033    Ok(visible)
1034}
1035
1036fn console_json_error(status: StatusCode, error: &str, message: &str) -> axum::response::Response {
1037    (
1038        status,
1039        Json::<Value>(json!({
1040            "error": error,
1041            "message": message,
1042        })),
1043    )
1044        .into_response()
1045}
1046
1047fn console_send_error_response(err: ConsoleSendError) -> axum::response::Response {
1048    let (status, code) = match &err {
1049        ConsoleSendError::UnknownIdentity(_) => (StatusCode::NOT_FOUND, "unknown_identity"),
1050        ConsoleSendError::NotAddressable(_) => (StatusCode::CONFLICT, "not_addressable"),
1051        ConsoleSendError::Retired(_) => (StatusCode::CONFLICT, "retired"),
1052        ConsoleSendError::InvalidContent(_)
1053        | ConsoleSendError::InvalidHandlingMode(_)
1054        | ConsoleSendError::InvalidRequest(_) => (StatusCode::BAD_REQUEST, "invalid_request"),
1055        ConsoleSendError::IdempotencyConflict(_) => (StatusCode::CONFLICT, "idempotency_conflict"),
1056        ConsoleSendError::State(_) | ConsoleSendError::Dispatch(_) | ConsoleSendError::Log(_) => {
1057            (StatusCode::INTERNAL_SERVER_ERROR, "internal_error")
1058        }
1059    };
1060    console_json_error(status, code, &err.to_string())
1061}
1062
1063fn console_send_rpc_code(err: &ConsoleSendError) -> i64 {
1064    match err {
1065        ConsoleSendError::UnknownIdentity(_) => -32001,
1066        ConsoleSendError::NotAddressable(_) => -32002,
1067        ConsoleSendError::InvalidContent(_)
1068        | ConsoleSendError::InvalidHandlingMode(_)
1069        | ConsoleSendError::InvalidRequest(_) => -32602,
1070        ConsoleSendError::IdempotencyConflict(_) => -32009,
1071        ConsoleSendError::Retired(_) => -32004,
1072        ConsoleSendError::State(_) | ConsoleSendError::Dispatch(_) | ConsoleSendError::Log(_) => {
1073            -32000
1074        }
1075    }
1076}
1077
1078fn console_send_rpc_error(response_id: Value, err: ConsoleSendError) -> Value {
1079    response_value(
1080        response_id,
1081        None,
1082        Some(JsonRpcError {
1083            code: console_send_rpc_code(&err),
1084            message: err.to_string(),
1085            data: None,
1086        }),
1087    )
1088}
1089
1090fn timeline_event_matches(
1091    event: &ConsoleTimelineEvent,
1092    identity: Option<&str>,
1093    conversation_id: Option<&str>,
1094) -> bool {
1095    let frame = match event {
1096        ConsoleTimelineEvent::ConsoleFrame { frame }
1097        | ConsoleTimelineEvent::FrameUpdated { frame } => frame,
1098        ConsoleTimelineEvent::SnapshotStarted { .. }
1099        | ConsoleTimelineEvent::SnapshotComplete { .. }
1100        | ConsoleTimelineEvent::ReplayUnavailable { .. } => return true,
1101    };
1102    if identity.is_some_and(|value| frame.identity != value) {
1103        return false;
1104    }
1105    if conversation_id.is_some_and(|value| frame.conversation_id.as_deref() != Some(value)) {
1106        return false;
1107    }
1108    true
1109}
1110
1111fn timeline_event_cursor(event: &ConsoleTimelineEvent) -> Option<&ConsoleCursor> {
1112    match event {
1113        ConsoleTimelineEvent::ConsoleFrame { frame }
1114        | ConsoleTimelineEvent::FrameUpdated { frame } => Some(&frame.cursor),
1115        ConsoleTimelineEvent::SnapshotStarted { .. }
1116        | ConsoleTimelineEvent::SnapshotComplete { .. }
1117        | ConsoleTimelineEvent::ReplayUnavailable { .. } => None,
1118    }
1119}
1120
1121fn cursor_is_after(candidate: &ConsoleCursor, current: &ConsoleCursor) -> bool {
1122    match (candidate.seq(), current.seq()) {
1123        (Some(candidate), Some(current)) => candidate > current,
1124        _ => candidate > current,
1125    }
1126}
1127
1128fn sse_event_from_timeline_event(event: &ConsoleTimelineEvent) -> Option<Event> {
1129    let (event_name, id) = match event {
1130        ConsoleTimelineEvent::SnapshotStarted { .. } => ("snapshot_started", None),
1131        ConsoleTimelineEvent::ConsoleFrame { frame } => (
1132            if frame.kind == "frame_updated" {
1133                "frame_updated"
1134            } else {
1135                "console_frame"
1136            },
1137            Some(frame.cursor.to_string()),
1138        ),
1139        ConsoleTimelineEvent::FrameUpdated { frame } => {
1140            ("frame_updated", Some(frame.cursor.to_string()))
1141        }
1142        ConsoleTimelineEvent::SnapshotComplete { cursor } => (
1143            "snapshot_complete",
1144            cursor.as_ref().map(ToString::to_string),
1145        ),
1146        ConsoleTimelineEvent::ReplayUnavailable { .. } => ("replay_unavailable", None),
1147    };
1148    let data = match serde_json::to_string(event) {
1149        Ok(value) => value,
1150        Err(_) => return None,
1151    };
1152    let mut sse = Event::default().event(event_name).data(data);
1153    if let Some(id) = id {
1154        sse = sse.id(id);
1155    }
1156    Some(sse)
1157}
1158
1159pub async fn console_rpc_multipart_handler(
1160    State(state): State<ConsoleJsonState>,
1161    headers: HeaderMap,
1162    uri: Uri,
1163    mut multipart: Multipart,
1164) -> impl IntoResponse {
1165    if !console_request_authorized(&state, &headers, &uri) {
1166        return (
1167            StatusCode::UNAUTHORIZED,
1168            Json::<Value>(serde_json::json!({
1169                "jsonrpc": JSONRPC_VERSION,
1170                "id": Value::Null,
1171                "error": {
1172                    "code": -32600,
1173                    "message": "unauthorized: console rpc requires a valid auth token",
1174                }
1175            })),
1176        );
1177    }
1178
1179    let mut payload: Option<String> = None;
1180    let mut files: std::collections::BTreeMap<String, MultipartImageUpload> =
1181        std::collections::BTreeMap::new();
1182
1183    while let Some(mut field) = match multipart.next_field().await {
1184        Ok(field) => field,
1185        Err(err) => {
1186            return (
1187                StatusCode::BAD_REQUEST,
1188                Json::<Value>(json_rpc_error_value(
1189                    Value::Null,
1190                    -32602,
1191                    format!("invalid multipart body: {err}"),
1192                )),
1193            );
1194        }
1195    } {
1196        let name = field.name().unwrap_or("").to_string();
1197        if name == "payload" {
1198            if payload.is_some() {
1199                return (
1200                    StatusCode::BAD_REQUEST,
1201                    Json::<Value>(json_rpc_error_value(
1202                        Value::Null,
1203                        -32602,
1204                        "duplicate payload part",
1205                    )),
1206                );
1207            }
1208            payload = match field.text().await {
1209                Ok(text) => Some(text),
1210                Err(err) => {
1211                    return (
1212                        StatusCode::BAD_REQUEST,
1213                        Json::<Value>(json_rpc_error_value(
1214                            Value::Null,
1215                            -32602,
1216                            format!("invalid payload part: {err}"),
1217                        )),
1218                    );
1219                }
1220            };
1221            continue;
1222        }
1223
1224        let Some(upload_id) = name.strip_prefix("file:").filter(|id| !id.is_empty()) else {
1225            return (
1226                StatusCode::BAD_REQUEST,
1227                Json::<Value>(json_rpc_error_value(
1228                    Value::Null,
1229                    -32602,
1230                    format!("unexpected multipart field: {name}"),
1231                )),
1232            );
1233        };
1234        if files.len() >= MAX_MULTIPART_IMAGES {
1235            return (
1236                StatusCode::BAD_REQUEST,
1237                Json::<Value>(json_rpc_error_value(
1238                    Value::Null,
1239                    -32602,
1240                    format!("too many image attachments; max {MAX_MULTIPART_IMAGES}"),
1241                )),
1242            );
1243        }
1244        if files.contains_key(upload_id) {
1245            return (
1246                StatusCode::BAD_REQUEST,
1247                Json::<Value>(json_rpc_error_value(
1248                    Value::Null,
1249                    -32602,
1250                    format!("duplicate file part for upload_id {upload_id}"),
1251                )),
1252            );
1253        }
1254        let media_type = field
1255            .content_type()
1256            .map(str::to_string)
1257            .unwrap_or_else(|| "application/octet-stream".to_string());
1258        if !is_allowed_image_media_type(&media_type) {
1259            return (
1260                StatusCode::BAD_REQUEST,
1261                Json::<Value>(json_rpc_error_value(
1262                    Value::Null,
1263                    -32602,
1264                    format!("unsupported image media type: {media_type}"),
1265                )),
1266            );
1267        }
1268        let mut bytes = bytes::BytesMut::new();
1269        loop {
1270            let chunk = match field.chunk().await {
1271                Ok(chunk) => chunk,
1272                Err(err) => {
1273                    return (
1274                        StatusCode::BAD_REQUEST,
1275                        Json::<Value>(json_rpc_error_value(
1276                            Value::Null,
1277                            -32602,
1278                            format!("invalid file part {upload_id}: {err}"),
1279                        )),
1280                    );
1281                }
1282            };
1283            let Some(chunk) = chunk else {
1284                break;
1285            };
1286            if bytes.len() + chunk.len() > MAX_MULTIPART_IMAGE_BYTES {
1287                return (
1288                    StatusCode::BAD_REQUEST,
1289                    Json::<Value>(json_rpc_error_value(
1290                        Value::Null,
1291                        -32602,
1292                        format!("image attachment {upload_id} exceeds 25 MiB"),
1293                    )),
1294                );
1295            }
1296            bytes.extend_from_slice(&chunk);
1297        }
1298        files.insert(
1299            upload_id.to_string(),
1300            MultipartImageUpload {
1301                media_type,
1302                bytes: bytes.freeze(),
1303            },
1304        );
1305    }
1306
1307    let payload = match payload {
1308        Some(payload) => payload,
1309        None => {
1310            return (
1311                StatusCode::BAD_REQUEST,
1312                Json::<Value>(json_rpc_error_value(
1313                    Value::Null,
1314                    -32602,
1315                    "payload part required",
1316                )),
1317            );
1318        }
1319    };
1320    let mut parsed_request = match serde_json::from_str::<JsonRpcRequest>(&payload) {
1321        Ok(req) => req,
1322        Err(err) => {
1323            return (
1324                StatusCode::OK,
1325                Json::<Value>(json_rpc_error_value(
1326                    Value::Null,
1327                    -32600,
1328                    format!("Invalid Request: {err}"),
1329                )),
1330            );
1331        }
1332    };
1333    let response_id = parsed_request.id.clone().unwrap_or(Value::Null);
1334    match parsed_request.method.as_str() {
1335        "mobkit/console/send" => {
1336            let Some(aggregator) = &state.console_aggregator else {
1337                return (
1338                    StatusCode::OK,
1339                    Json::<Value>(invalid_params(
1340                        response_id,
1341                        "mobkit/console/send multipart requires a console aggregator",
1342                    )),
1343                );
1344            };
1345            let Some(identity) = parsed_request
1346                .params
1347                .get("identity")
1348                .and_then(Value::as_str)
1349            else {
1350                return (
1351                    StatusCode::OK,
1352                    Json::<Value>(invalid_params(response_id, "identity required")),
1353                );
1354            };
1355            let binary_blob_store = match aggregator.binary_blob_store_for_identity(identity).await
1356            {
1357                Ok(Some(store)) => store,
1358                Ok(None) => {
1359                    return (
1360                        StatusCode::OK,
1361                        Json::<Value>(invalid_params(
1362                            response_id,
1363                            "binary blob store unavailable for identity",
1364                        )),
1365                    );
1366                }
1367                Err(err) => {
1368                    return (
1369                        StatusCode::OK,
1370                        Json::<Value>(console_send_rpc_error(response_id, err)),
1371                    );
1372                }
1373            };
1374            if let Err(message) = externalize_image_upload_placeholders(
1375                &mut parsed_request.params,
1376                files,
1377                binary_blob_store,
1378            )
1379            .await
1380            {
1381                return (
1382                    StatusCode::OK,
1383                    Json::<Value>(invalid_params(response_id, message)),
1384                );
1385            }
1386        }
1387        "mobkit/blob/upload" => {
1388            let Some(runtime) = &state.runtime else {
1389                return (
1390                    StatusCode::NOT_FOUND,
1391                    Json::<Value>(json_rpc_error_value(
1392                        response_id,
1393                        -32600,
1394                        "mobkit/blob/upload multipart requires a unified runtime",
1395                    )),
1396                );
1397            };
1398            let Some(binary_blob_store) = runtime.binary_blob_store() else {
1399                return (
1400                    StatusCode::INTERNAL_SERVER_ERROR,
1401                    Json::<Value>(json_rpc_error_value(
1402                        response_id,
1403                        -32000,
1404                        "binary blob store unavailable",
1405                    )),
1406                );
1407            };
1408            let result = match externalize_single_image_upload(
1409                &parsed_request.params,
1410                files,
1411                binary_blob_store,
1412            )
1413            .await
1414            {
1415                Ok(result) => result,
1416                Err(message) => {
1417                    return (
1418                        StatusCode::OK,
1419                        Json::<Value>(invalid_params(response_id, message)),
1420                    );
1421                }
1422            };
1423            return (
1424                StatusCode::OK,
1425                Json::<Value>(response_value(response_id, Some(result), None)),
1426            );
1427        }
1428        _ => {
1429            return (
1430                StatusCode::OK,
1431                Json::<Value>(invalid_params(
1432                    response_id,
1433                    "multipart RPC supports mobkit/console/send and mobkit/blob/upload only",
1434                )),
1435            );
1436        }
1437    }
1438    let response_value = if parsed_request.method == "mobkit/console/send"
1439        && state.runtime.is_none()
1440    {
1441        handle_console_aggregator_rpc(state.console_aggregator.clone(), parsed_request, true).await
1442    } else {
1443        let Some(runtime) = &state.runtime else {
1444            return (
1445                StatusCode::NOT_FOUND,
1446                Json::<Value>(json_rpc_error_value(
1447                    response_id,
1448                    -32600,
1449                    "console rpc multipart requires a unified runtime",
1450                )),
1451            );
1452        };
1453        Box::pin(handle_console_runtime_rpc(
1454            runtime,
1455            state.module_runtime.clone(),
1456            state.contact_directory.as_ref(),
1457            state.gateway_peer_keys.as_ref(),
1458            state.console_events.clone(),
1459            state.console_aggregator.clone(),
1460            state.identity_runtime.clone(),
1461            state.metadata_table.clone(),
1462            state.mob_events.clone(),
1463            parsed_request,
1464            true,
1465        ))
1466        .await
1467    };
1468    (StatusCode::OK, Json::<Value>(response_value))
1469}
1470
1471pub async fn blob_get_handler(
1472    State(state): State<ConsoleJsonState>,
1473    headers: HeaderMap,
1474    uri: Uri,
1475    AxumPath(blob_id): AxumPath<String>,
1476) -> impl IntoResponse {
1477    if !console_request_authorized(&state, &headers, &uri) {
1478        return (
1479            StatusCode::UNAUTHORIZED,
1480            Json::<Value>(serde_json::json!({ "error": "unauthorized" })),
1481        )
1482            .into_response();
1483    }
1484    if !is_valid_blob_id_value(&blob_id) {
1485        return (
1486            StatusCode::BAD_REQUEST,
1487            Json::<Value>(serde_json::json!({ "error": "invalid_blob_id" })),
1488        )
1489            .into_response();
1490    }
1491    let blob_id = meerkat_core::BlobId::from(blob_id.as_str());
1492    let mut stores: Vec<std::sync::Arc<dyn BinaryBlobStore>> = Vec::new();
1493    if let Some(runtime) = &state.runtime
1494        && let Some(store) = runtime.binary_blob_store()
1495    {
1496        stores.push(store);
1497    }
1498    if let Some(aggregator) = &state.console_aggregator {
1499        stores.extend(aggregator.binary_blob_stores());
1500    }
1501    if stores.is_empty() {
1502        return (
1503            StatusCode::NOT_FOUND,
1504            Json::<Value>(serde_json::json!({ "error": "blob_store_unavailable" })),
1505        )
1506            .into_response();
1507    }
1508    for store in stores {
1509        match store.get_bytes(&blob_id).await {
1510            Ok(payload) => return blob_payload_response(payload),
1511            Err(meerkat_core::BlobStoreError::NotFound(_)) => continue,
1512            Err(err) => {
1513                return (
1514                    StatusCode::INTERNAL_SERVER_ERROR,
1515                    Json::<Value>(serde_json::json!({ "error": err.to_string() })),
1516                )
1517                    .into_response();
1518            }
1519        }
1520    }
1521    (
1522        StatusCode::NOT_FOUND,
1523        Json::<Value>(serde_json::json!({ "error": "blob_not_found" })),
1524    )
1525        .into_response()
1526}
1527
1528fn blob_payload_response(payload: BinaryBlobPayload) -> axum::response::Response {
1529    let mut response_headers = HeaderMap::new();
1530    let content_type = HeaderValue::from_str(&payload.media_type)
1531        .unwrap_or_else(|_| HeaderValue::from_static("application/octet-stream"));
1532    response_headers.insert(header::CONTENT_TYPE, content_type);
1533    if let Ok(content_length) = HeaderValue::from_str(&payload.size.to_string()) {
1534        response_headers.insert(header::CONTENT_LENGTH, content_length);
1535    }
1536    response_headers.insert(
1537        header::CACHE_CONTROL,
1538        HeaderValue::from_static("private, max-age=31536000, immutable"),
1539    );
1540    (StatusCode::OK, response_headers, payload.data).into_response()
1541}
1542
1543fn console_request_authorized(state: &ConsoleJsonState, headers: &HeaderMap, uri: &Uri) -> bool {
1544    if !state.decisions.console.require_app_auth {
1545        return true;
1546    }
1547    console_request_token(headers, uri)
1548        .is_some_and(|token| validate_console_token(&state.decisions, &token))
1549}
1550
1551fn console_request_token(headers: &HeaderMap, uri: &Uri) -> Option<String> {
1552    let bearer_token = headers
1553        .get(header::AUTHORIZATION)
1554        .and_then(|v| v.to_str().ok())
1555        .and_then(extract_bearer_token_from_header)
1556        .map(String::from);
1557    // Parse with form_urlencoded so percent-encoded tokens decode and
1558    // `xauth_token=` substring shadowing does NOT match the real key.
1559    let query_token = uri.query().and_then(|q| {
1560        form_urlencoded::parse(q.as_bytes())
1561            .find(|(key, _)| key == "auth_token")
1562            .map(|(_, value)| value.into_owned())
1563    });
1564    bearer_token.or(query_token)
1565}
1566
1567#[derive(Debug)]
1568struct MultipartImageUpload {
1569    media_type: String,
1570    bytes: bytes::Bytes,
1571}
1572
1573fn json_rpc_error_value(id: Value, code: i64, message: impl Into<String>) -> Value {
1574    serde_json::json!({
1575        "jsonrpc": JSONRPC_VERSION,
1576        "id": id,
1577        "error": {
1578            "code": code,
1579            "message": message.into(),
1580        }
1581    })
1582}
1583
1584fn is_allowed_image_media_type(media_type: &str) -> bool {
1585    matches!(
1586        media_type,
1587        "image/png" | "image/jpeg" | "image/webp" | "image/gif"
1588    )
1589}
1590
1591fn image_upload_part_name<'a>(
1592    object: &'a serde_json::Map<String, Value>,
1593    context: &str,
1594) -> Result<&'a str, String> {
1595    object
1596        .get("upload_id")
1597        .or_else(|| object.get("part_name"))
1598        .and_then(Value::as_str)
1599        .map(str::trim)
1600        .filter(|value| !value.is_empty())
1601        .ok_or_else(|| format!("{context}.upload_id or {context}.part_name is required"))
1602}
1603
1604async fn externalize_image_upload_placeholders(
1605    params: &mut Value,
1606    files: std::collections::BTreeMap<String, MultipartImageUpload>,
1607    blob_store: std::sync::Arc<dyn crate::blob_store::BinaryBlobStore>,
1608) -> Result<(), String> {
1609    let Some(content) = params.get_mut("content") else {
1610        return Err("multipart payload params.content is required".to_string());
1611    };
1612    let mut placeholders = std::collections::BTreeMap::<String, String>::new();
1613    collect_image_upload_placeholders(content, &mut placeholders)?;
1614    if placeholders.is_empty() {
1615        return Err(
1616            "multipart payload must contain at least one image_upload placeholder".to_string(),
1617        );
1618    }
1619    if placeholders.len() > MAX_MULTIPART_IMAGES {
1620        return Err(format!(
1621            "too many image_upload placeholders; max {MAX_MULTIPART_IMAGES}"
1622        ));
1623    }
1624    for upload_id in files.keys() {
1625        if !placeholders.contains_key(upload_id) {
1626            return Err(format!(
1627                "file part has no matching image_upload placeholder: {upload_id}"
1628            ));
1629        }
1630    }
1631    for upload_id in placeholders.keys() {
1632        if !files.contains_key(upload_id) {
1633            return Err(format!(
1634                "image_upload placeholder missing file part: {upload_id}"
1635            ));
1636        }
1637    }
1638
1639    let mut refs = std::collections::BTreeMap::<String, Value>::new();
1640    for (upload_id, file) in files {
1641        let declared_media_type = placeholders
1642            .get(&upload_id)
1643            .cloned()
1644            .unwrap_or_else(|| file.media_type.clone());
1645        if !is_allowed_image_media_type(&declared_media_type) {
1646            return Err(format!(
1647                "unsupported image media type in placeholder {upload_id}: {declared_media_type}"
1648            ));
1649        }
1650        if declared_media_type != file.media_type {
1651            return Err(format!(
1652                "media type mismatch for {upload_id}: placeholder {declared_media_type}, file {}",
1653                file.media_type
1654            ));
1655        }
1656        let blob_ref = blob_store
1657            .put_bytes(&file.media_type, file.bytes)
1658            .await
1659            .map_err(|err| format!("failed to store image {upload_id}: {err}"))?;
1660        refs.insert(
1661            upload_id,
1662            serde_json::json!({
1663                "type": "image",
1664                "media_type": blob_ref.media_type,
1665                "source": "blob",
1666                "blob_id": blob_ref.blob_id,
1667            }),
1668        );
1669    }
1670    replace_image_upload_placeholders(content, &refs)?;
1671    if let Some(object) = params.as_object_mut() {
1672        object.remove("message");
1673    }
1674    Ok(())
1675}
1676
1677async fn externalize_single_image_upload(
1678    params: &Value,
1679    files: std::collections::BTreeMap<String, MultipartImageUpload>,
1680    blob_store: std::sync::Arc<dyn crate::blob_store::BinaryBlobStore>,
1681) -> Result<Value, String> {
1682    let upload = params.get("upload").unwrap_or(params);
1683    if upload
1684        .get("type")
1685        .and_then(Value::as_str)
1686        .is_some_and(|kind| kind != "image_upload")
1687    {
1688        return Err("upload.type must be image_upload".to_string());
1689    }
1690    let upload_object = upload
1691        .as_object()
1692        .ok_or_else(|| "upload must be an object".to_string())?;
1693    let upload_id = image_upload_part_name(upload_object, "upload")?;
1694    let Some(file) = files.get(upload_id) else {
1695        return Err(format!(
1696            "image_upload placeholder missing file part: {upload_id}"
1697        ));
1698    };
1699    if files.len() != 1 {
1700        return Err("mobkit/blob/upload accepts exactly one file part".to_string());
1701    }
1702    let declared_media_type = upload
1703        .get("media_type")
1704        .and_then(Value::as_str)
1705        .unwrap_or(file.media_type.as_str());
1706    if !is_allowed_image_media_type(declared_media_type) {
1707        return Err(format!(
1708            "unsupported image media type in upload {upload_id}: {declared_media_type}"
1709        ));
1710    }
1711    if declared_media_type != file.media_type {
1712        return Err(format!(
1713            "media type mismatch for {upload_id}: placeholder {declared_media_type}, file {}",
1714            file.media_type
1715        ));
1716    }
1717    let size = file.bytes.len() as u64;
1718    let blob_ref = blob_store
1719        .put_bytes(&file.media_type, file.bytes.clone())
1720        .await
1721        .map_err(|err| format!("failed to store image {upload_id}: {err}"))?;
1722    Ok(json!({
1723        "blob_id": blob_ref.blob_id,
1724        "media_type": blob_ref.media_type,
1725        "size": size,
1726    }))
1727}
1728
1729fn collect_image_upload_placeholders(
1730    value: &Value,
1731    placeholders: &mut std::collections::BTreeMap<String, String>,
1732) -> Result<(), String> {
1733    match value {
1734        Value::Array(items) => {
1735            for item in items {
1736                collect_image_upload_placeholders(item, placeholders)?;
1737            }
1738        }
1739        Value::Object(object) => {
1740            if object.get("type").and_then(Value::as_str) == Some("image_upload") {
1741                let upload_id = image_upload_part_name(object, "image_upload")?;
1742                let media_type = object
1743                    .get("media_type")
1744                    .and_then(Value::as_str)
1745                    .map(str::trim)
1746                    .filter(|value| !value.is_empty())
1747                    .ok_or_else(|| format!("image_upload {upload_id} requires media_type"))?;
1748                if placeholders
1749                    .insert(upload_id.to_string(), media_type.to_string())
1750                    .is_some()
1751                {
1752                    return Err(format!("duplicate image_upload placeholder: {upload_id}"));
1753                }
1754            } else {
1755                for child in object.values() {
1756                    collect_image_upload_placeholders(child, placeholders)?;
1757                }
1758            }
1759        }
1760        _ => {}
1761    }
1762    Ok(())
1763}
1764
1765fn replace_image_upload_placeholders(
1766    value: &mut Value,
1767    refs: &std::collections::BTreeMap<String, Value>,
1768) -> Result<(), String> {
1769    match value {
1770        Value::Array(items) => {
1771            for item in items {
1772                replace_image_upload_placeholders(item, refs)?;
1773            }
1774        }
1775        Value::Object(object) => {
1776            if object.get("type").and_then(Value::as_str) == Some("image_upload") {
1777                let upload_id = image_upload_part_name(object, "image_upload")?;
1778                let replacement = refs
1779                    .get(upload_id)
1780                    .ok_or_else(|| format!("missing blob replacement for {upload_id}"))?;
1781                *value = replacement.clone();
1782            } else {
1783                for child in object.values_mut() {
1784                    replace_image_upload_placeholders(child, refs)?;
1785                }
1786            }
1787        }
1788        _ => {}
1789    }
1790    Ok(())
1791}
1792
1793fn response_value(id: Value, result: Option<Value>, error: Option<JsonRpcError>) -> Value {
1794    serde_json::to_value(JsonRpcResponse {
1795        jsonrpc: JSONRPC_VERSION.to_string(),
1796        id,
1797        result,
1798        error,
1799    })
1800    .unwrap_or_else(|_| {
1801        serde_json::json!({
1802            "jsonrpc": JSONRPC_VERSION,
1803            "id": Value::Null,
1804            "error": {
1805                "code": -32603,
1806                "message": "serialization failed",
1807            }
1808        })
1809    })
1810}
1811
1812fn invalid_params(id: Value, message: impl Into<String>) -> Value {
1813    response_value(
1814        id,
1815        None,
1816        Some(JsonRpcError {
1817            code: -32602,
1818            message: message.into(),
1819            data: None,
1820        }),
1821    )
1822}
1823
1824async fn member_entry_to_console_json(
1825    runtime: &MobRuntime,
1826    entry: &meerkat_mob::runtime::MobMemberListEntry,
1827) -> Value {
1828    let mut value = member_entry_to_json(entry);
1829    if let Some(object) = value.as_object_mut() {
1830        object.insert(
1831            "model_capabilities".to_string(),
1832            serde_json::to_value(model_capabilities_for_member_entry(
1833                runtime.handle().definition(),
1834                entry,
1835            ))
1836            .unwrap_or(Value::Null),
1837        );
1838    }
1839    value
1840}
1841
1842fn internal_error(id: Value, message: impl Into<String>) -> Value {
1843    response_value(
1844        id,
1845        None,
1846        Some(JsonRpcError {
1847            code: -32000,
1848            message: message.into(),
1849            data: None,
1850        }),
1851    )
1852}
1853
1854/// Render a stale-cursor failure as a JSON-RPC envelope with code
1855/// `-32010`, a typed error body the SDKs can parse into the
1856/// `MobEventsStaleError` exception, and a `data` field carrying both
1857/// cursors so callers can rewind to the current frontier.
1858fn stale_event_cursor_response(id: Value, after_cursor: u64, latest_cursor: u64) -> Value {
1859    response_value(
1860        id,
1861        None,
1862        Some(JsonRpcError {
1863            code: crate::rpc::MOB_EVENTS_STALE_CURSOR_CODE,
1864            message: format!(
1865                "stale mob event cursor: requested {after_cursor}, latest {latest_cursor}"
1866            ),
1867            data: Some(serde_json::json!({
1868                "error": "event_query_stale",
1869                "after_cursor": after_cursor,
1870                "latest_cursor": latest_cursor,
1871            })),
1872        }),
1873    )
1874}
1875
1876fn parse_console_helper_options(
1877    options_val: Option<&Value>,
1878) -> Result<meerkat_mob::HelperOptions, String> {
1879    crate::rpc::mob_methods::parse_helper_options(options_val)
1880}
1881
1882fn member_is_addressable(member: &meerkat_mob::runtime::MobMemberListEntry) -> bool {
1883    member
1884        .labels
1885        .get("addressable")
1886        .map(|value: &String| !value.eq_ignore_ascii_case("false"))
1887        .unwrap_or(true)
1888}
1889
1890fn member_addressability(member: &meerkat_mob::runtime::MobMemberListEntry) -> &'static str {
1891    if member_is_addressable(member) {
1892        "addressable"
1893    } else {
1894        "internal_only"
1895    }
1896}
1897
1898fn console_identity_status_json(
1899    member: &meerkat_mob::runtime::MobMemberListEntry,
1900    session_id: Option<String>,
1901    response_phase: Option<String>,
1902) -> Value {
1903    json!({
1904        "identity": member.agent_identity.to_string(),
1905        "state": member.state,
1906        "role": member.role.to_string(),
1907        "addressability": member_addressability(member),
1908        "display_name": member.labels.get("display_name"),
1909        "labels": member.labels,
1910        "agent_runtime_id": member.binding_atoms().0.to_string(),
1911        "session_id": session_id,
1912        "generation": Value::Null,
1913        "checkpoint_version": Value::Null,
1914        "lease_healthy": Value::Null,
1915        "lease": Value::Null,
1916        "response_phase": response_phase,
1917    })
1918}
1919
1920fn console_identity_inspect_json(
1921    member: &meerkat_mob::runtime::MobMemberListEntry,
1922    session_id: Option<String>,
1923    response_phase: Option<String>,
1924) -> Value {
1925    let peers: Vec<String> = member.wired_to.iter().map(ToString::to_string).collect();
1926    json!({
1927        "identity": member.agent_identity.to_string(),
1928        "state": member.state,
1929        "role": member.role.to_string(),
1930        "addressability": member_addressability(member),
1931        "display_name": member.labels.get("display_name"),
1932        "labels": member.labels,
1933        "lease_healthy": Value::Null,
1934        "lease": Value::Null,
1935        "continuity": {
1936            "generation": Value::Null,
1937            "checkpoint_version": Value::Null,
1938            "session_id": session_id,
1939            "agent_runtime_id": member.binding_atoms().0.to_string(),
1940        },
1941        "topology_peers": peers,
1942        "output_preview": Value::Null,
1943        "response_phase": response_phase,
1944    })
1945}
1946
1947/// Resolve a mob member by identity plus its current bridge session id.
1948///
1949/// Returns `None` if no member with the given identity exists.
1950async fn lookup_member_with_session(
1951    handle: &MobHandle,
1952    identity: &MeerkatId,
1953) -> Option<(meerkat_mob::runtime::MobMemberListEntry, Option<String>)> {
1954    let entries = handle.list_members_including_retiring().await;
1955    let entry = entries
1956        .into_iter()
1957        .find(|e| &e.agent_identity == identity)?;
1958    let session_id = handle
1959        .resolve_bridge_session_id(identity)
1960        .await
1961        .map(|s| s.to_string());
1962    Some((entry, session_id))
1963}
1964
1965#[allow(clippy::too_many_arguments)]
1966async fn handle_console_aggregator_rpc(
1967    console_aggregator: Option<MobKitConsoleAggregator>,
1968    request: JsonRpcRequest,
1969    is_authenticated: bool,
1970) -> Value {
1971    let response_id = request.id.clone().unwrap_or(Value::Null);
1972    match request.method.as_str() {
1973        "mobkit/capabilities" => response_value(
1974            response_id,
1975            Some(json!({
1976                "methods": [
1977                    "mobkit/capabilities",
1978                    "mobkit/console/list_identities",
1979                    "mobkit/console/inspect_identity",
1980                    "mobkit/console/query_timeline",
1981                    "mobkit/retire",
1982                    "mobkit/reset_all",
1983                    "mobkit/console/send",
1984                ],
1985                "authenticated": is_authenticated,
1986                "features": {
1987                    "console_aggregator": console_aggregator.is_some(),
1988                    "multi_runtime_console": console_aggregator.is_some(),
1989                }
1990            })),
1991            None,
1992        ),
1993        "mobkit/console/list_identities" => {
1994            let Some(aggregator) = &console_aggregator else {
1995                return console_aggregator_unavailable(response_id);
1996            };
1997            match aggregator.list_identities().await {
1998                Ok(identities) => {
1999                    response_value(response_id, Some(json!({ "identities": identities })), None)
2000                }
2001                Err(err) => internal_error(response_id, format!("list_identities failed: {err}")),
2002            }
2003        }
2004        "mobkit/console/inspect_identity" => {
2005            let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2006                return invalid_params(response_id, "identity required");
2007            };
2008            let Some(aggregator) = &console_aggregator else {
2009                return console_aggregator_unavailable(response_id);
2010            };
2011            match aggregator.inspect_identity(identity).await {
2012                Ok(Some(inspection)) => response_value(
2013                    response_id,
2014                    Some(serde_json::to_value(inspection).unwrap_or(Value::Null)),
2015                    None,
2016                ),
2017                Ok(None) => response_value(
2018                    response_id,
2019                    None,
2020                    Some(JsonRpcError {
2021                        code: -32001,
2022                        message: format!("unknown identity: {identity}"),
2023                        data: None,
2024                    }),
2025                ),
2026                Err(err) => internal_error(response_id, format!("inspect_identity failed: {err}")),
2027            }
2028        }
2029        "mobkit/console/query_timeline" => {
2030            let query: ConsoleTimelineQuery = match serde_json::from_value(request.params.clone()) {
2031                Ok(query) => query,
2032                Err(err) => {
2033                    return invalid_params(response_id, format!("invalid query params: {err}"));
2034                }
2035            };
2036            let Some(aggregator) = &console_aggregator else {
2037                return console_aggregator_unavailable(response_id);
2038            };
2039            match aggregator.query_timeline(query).await {
2040                Ok(page) => response_value(
2041                    response_id,
2042                    Some(serde_json::to_value(page).unwrap_or(Value::Null)),
2043                    None,
2044                ),
2045                Err(err) => response_value(
2046                    response_id,
2047                    None,
2048                    Some(JsonRpcError {
2049                        code: -32010,
2050                        message: format!("query_timeline failed: {err}"),
2051                        data: Some(json!({ "kind": "replay_unavailable" })),
2052                    }),
2053                ),
2054            }
2055        }
2056        "mobkit/console/send" => {
2057            let send_request: ConsoleSendRequest =
2058                match serde_json::from_value(request.params.clone()) {
2059                    Ok(request) => request,
2060                    Err(err) => {
2061                        return invalid_params(response_id, format!("invalid send params: {err}"));
2062                    }
2063                };
2064            let Some(aggregator) = &console_aggregator else {
2065                return console_aggregator_unavailable(response_id);
2066            };
2067            match aggregator.send(send_request).await {
2068                Ok(accepted) => response_value(
2069                    response_id,
2070                    Some(serde_json::to_value(accepted).unwrap_or(Value::Null)),
2071                    None,
2072                ),
2073                Err(err) => response_value(
2074                    response_id,
2075                    None,
2076                    Some(JsonRpcError {
2077                        code: console_send_rpc_code(&err),
2078                        message: err.to_string(),
2079                        data: None,
2080                    }),
2081                ),
2082            }
2083        }
2084        "mobkit/retire" => {
2085            let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2086                return invalid_params(response_id, "identity required");
2087            };
2088            let Some(aggregator) = &console_aggregator else {
2089                return console_aggregator_unavailable(response_id);
2090            };
2091            match aggregator.retire_identity(identity).await {
2092                Ok(true) => {
2093                    response_value(response_id, Some(json!({ "identity": identity })), None)
2094                }
2095                Ok(false) => response_value(
2096                    response_id,
2097                    None,
2098                    Some(JsonRpcError {
2099                        code: -32001,
2100                        message: format!("unknown identity: {identity}"),
2101                        data: None,
2102                    }),
2103                ),
2104                Err(err) => internal_error(response_id, format!("retire failed: {err}")),
2105            }
2106        }
2107        "mobkit/reset_all" => {
2108            let Some(aggregator) = &console_aggregator else {
2109                return console_aggregator_unavailable(response_id);
2110            };
2111            match aggregator.list_identities_fresh().await {
2112                Ok(identities) => {
2113                    let mut retired = Vec::new();
2114                    let mut failed = Vec::new();
2115                    for identity in identities {
2116                        match aggregator.retire_identity(&identity.identity).await {
2117                            Ok(true) => retired.push(identity.identity),
2118                            Ok(false) => failed.push(json!({
2119                                "identity": identity.identity,
2120                                "error": "unknown identity",
2121                            })),
2122                            Err(err) => failed.push(json!({
2123                                "identity": identity.identity,
2124                                "error": err.to_string(),
2125                            })),
2126                        }
2127                    }
2128                    if let Err(err) = aggregator.clear_timeline_frames().await {
2129                        failed.push(json!({
2130                            "identity": "_console_timeline",
2131                            "error": err.to_string(),
2132                        }));
2133                    }
2134                    response_value(
2135                        response_id,
2136                        Some(json!({
2137                            "retired": retired,
2138                            "failed": failed,
2139                        })),
2140                        None,
2141                    )
2142                }
2143                Err(err) => internal_error(response_id, format!("reset_all failed: {err}")),
2144            }
2145        }
2146        _ => response_value(
2147            response_id,
2148            None,
2149            Some(JsonRpcError {
2150                code: -32601,
2151                message: "Method not found".to_string(),
2152                data: None,
2153            }),
2154        ),
2155    }
2156}
2157
2158fn console_aggregator_unavailable(response_id: Value) -> Value {
2159    response_value(
2160        response_id,
2161        None,
2162        Some(JsonRpcError {
2163            code: -32004,
2164            message: "console aggregator unavailable".to_string(),
2165            data: None,
2166        }),
2167    )
2168}
2169
2170#[allow(clippy::too_many_arguments)]
2171async fn handle_console_runtime_rpc(
2172    runtime: &MobRuntime,
2173    module_runtime: Option<std::sync::Arc<tokio::sync::Mutex<MobkitRuntimeHandle>>>,
2174    contact_directory: Option<&ContactDirectory>,
2175    gateway_peer_keys: Option<&crate::auth::peer_keys::GatewayPeerKeys>,
2176    console_events: Option<ConsoleEventStore>,
2177    console_aggregator: Option<MobKitConsoleAggregator>,
2178    identity_runtime: Option<Arc<crate::identity_first::IdentityRuntime>>,
2179    metadata_table: Option<std::sync::Arc<RuntimeMetadataTable>>,
2180    mob_events: Option<MobEventsStore>,
2181    request: JsonRpcRequest,
2182    is_authenticated: bool,
2183) -> Value {
2184    let response_id = request.id.clone().unwrap_or(Value::Null);
2185
2186    match request.method.as_str() {
2187        "mobkit/capabilities" => {
2188            let mut methods = vec![
2189                "mobkit/status",
2190                "mobkit/capabilities",
2191                "mobkit/list_members",
2192                "mobkit/get_member",
2193                "mobkit/find_members",
2194                "mobkit/member_status",
2195                "mobkit/collect_completed",
2196                "mobkit/blob/get",
2197                "mobkit/wait_ready",
2198                "mobkit/flow_status",
2199                "mobkit/list_flows",
2200                "mobkit/list_runs",
2201                "mobkit/console/list_identities",
2202                "mobkit/console/inspect_identity",
2203                "mobkit/console/query_timeline",
2204                "mobkit/mob_events/query",
2205                "mobkit/mob_events/subscribe",
2206                "mobkit/cross_mob/peer_info",
2207                "mobkit/cross_mob/directory",
2208                "mobkit/peer_pubkey",
2209            ];
2210            if module_runtime.is_some() {
2211                methods.extend_from_slice(&[
2212                    "mobkit/status_identity",
2213                    "mobkit/inspect_identity",
2214                    "mobkit/respawn",
2215                    "mobkit/reset",
2216                    "mobkit/routing/routes/list",
2217                    "mobkit/delivery/history",
2218                    "mobkit/gating/pending",
2219                    "mobkit/gating/audit",
2220                    "mobkit/gating/decide",
2221                ]);
2222            }
2223            if is_authenticated {
2224                methods.extend_from_slice(&[
2225                    "mobkit/retire",
2226                    "mobkit/reset_all",
2227                    "mobkit/console/send",
2228                    "mobkit/blob/upload",
2229                    "mobkit/ensure_member",
2230                    "mobkit/retire_member",
2231                    "mobkit/respawn_member",
2232                    "mobkit/force_cancel_member",
2233                    "mobkit/cancel_flow",
2234                    "mobkit/run_flow",
2235                    "mobkit/spawn_helper",
2236                    "mobkit/fork_helper",
2237                    "mobkit/attach_existing_session",
2238                    "mobkit/reconcile_edges",
2239                    "mobkit/cross_mob/wire_local",
2240                    "mobkit/cross_mob/unwire_local",
2241                ]);
2242            }
2243            if metadata_table.is_some() {
2244                methods.extend_from_slice(&["mobkit/mob_labels/get", "mobkit/run_labels/get"]);
2245                if is_authenticated {
2246                    methods.extend_from_slice(&[
2247                        "mobkit/mob_labels/set",
2248                        "mobkit/mob_labels/delete",
2249                        "mobkit/run_labels/set",
2250                        "mobkit/run_labels/delete",
2251                    ]);
2252                }
2253            }
2254            response_value(
2255                response_id,
2256                Some(serde_json::json!({
2257                    "contract_version": crate::rpc::MOBKIT_CONTRACT_VERSION,
2258                    "methods": methods,
2259                    // The console routes to MobRuntime directly and has no
2260                    // access to the module runtime, so loaded_modules is always [].
2261                    "loaded_modules": serde_json::json!([]),
2262                    "runtime_capabilities": {
2263                        "can_send_messages": is_authenticated,
2264                        "can_retire_members": is_authenticated,
2265                        "can_spawn_members": is_authenticated,
2266                    }
2267                })),
2268                None,
2269            )
2270        }
2271        "mobkit/status" => {
2272            let mob_state = runtime.handle().status().await.ok();
2273            response_value(
2274                response_id,
2275                Some(serde_json::json!({
2276                    "contract_version": crate::rpc::MOBKIT_CONTRACT_VERSION,
2277                    "running": matches!(mob_state, Some(MobState::Creating | MobState::Running)),
2278                    // Console routes to MobRuntime directly — no module runtime available.
2279                    // Return [] to keep StatusResult.loaded_modules schema-consistent.
2280                    "loaded_modules": serde_json::json!([]),
2281                })),
2282                None,
2283            )
2284        }
2285        "mobkit/console/list_identities" => {
2286            let Some(aggregator) = &console_aggregator else {
2287                return response_value(
2288                    response_id,
2289                    None,
2290                    Some(JsonRpcError {
2291                        code: -32004,
2292                        message: "console aggregator unavailable".to_string(),
2293                        data: None,
2294                    }),
2295                );
2296            };
2297            match aggregator.list_identities().await {
2298                Ok(identities) => {
2299                    response_value(response_id, Some(json!({ "identities": identities })), None)
2300                }
2301                Err(err) => internal_error(response_id, format!("list_identities failed: {err}")),
2302            }
2303        }
2304        "mobkit/console/inspect_identity" => {
2305            let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2306                return invalid_params(response_id, "identity required");
2307            };
2308            let Some(aggregator) = &console_aggregator else {
2309                return response_value(
2310                    response_id,
2311                    None,
2312                    Some(JsonRpcError {
2313                        code: -32004,
2314                        message: "console aggregator unavailable".to_string(),
2315                        data: None,
2316                    }),
2317                );
2318            };
2319            match aggregator.inspect_identity(identity).await {
2320                Ok(Some(inspection)) => response_value(
2321                    response_id,
2322                    Some(serde_json::to_value(inspection).unwrap_or(Value::Null)),
2323                    None,
2324                ),
2325                Ok(None) => response_value(
2326                    response_id,
2327                    None,
2328                    Some(JsonRpcError {
2329                        code: -32001,
2330                        message: format!("unknown identity: {identity}"),
2331                        data: None,
2332                    }),
2333                ),
2334                Err(err) => internal_error(response_id, format!("inspect_identity failed: {err}")),
2335            }
2336        }
2337        "mobkit/console/query_timeline" => {
2338            let query: ConsoleTimelineQuery = match serde_json::from_value(request.params.clone()) {
2339                Ok(query) => query,
2340                Err(err) => {
2341                    return invalid_params(response_id, format!("invalid query params: {err}"));
2342                }
2343            };
2344            let Some(aggregator) = &console_aggregator else {
2345                return response_value(
2346                    response_id,
2347                    None,
2348                    Some(JsonRpcError {
2349                        code: -32004,
2350                        message: "console aggregator unavailable".to_string(),
2351                        data: None,
2352                    }),
2353                );
2354            };
2355            match aggregator.query_timeline(query).await {
2356                Ok(page) => response_value(
2357                    response_id,
2358                    Some(serde_json::to_value(page).unwrap_or(Value::Null)),
2359                    None,
2360                ),
2361                Err(err) => response_value(
2362                    response_id,
2363                    None,
2364                    Some(JsonRpcError {
2365                        code: -32010,
2366                        message: format!("query_timeline failed: {err}"),
2367                        data: Some(json!({ "kind": "replay_unavailable" })),
2368                    }),
2369                ),
2370            }
2371        }
2372        "mobkit/console/send" => {
2373            let send_request: ConsoleSendRequest =
2374                match serde_json::from_value(request.params.clone()) {
2375                    Ok(request) => request,
2376                    Err(err) => {
2377                        return invalid_params(response_id, format!("invalid send params: {err}"));
2378                    }
2379                };
2380            let Some(aggregator) = &console_aggregator else {
2381                return response_value(
2382                    response_id,
2383                    None,
2384                    Some(JsonRpcError {
2385                        code: -32004,
2386                        message: "console aggregator unavailable".to_string(),
2387                        data: None,
2388                    }),
2389                );
2390            };
2391            if let Some(identity_runtime) = &identity_runtime {
2392                return match console_send_identity_first(
2393                    aggregator,
2394                    identity_runtime,
2395                    console_events.as_ref(),
2396                    send_request,
2397                )
2398                .await
2399                {
2400                    Ok(accepted) => response_value(
2401                        response_id,
2402                        Some(serde_json::to_value(accepted).unwrap_or(Value::Null)),
2403                        None,
2404                    ),
2405                    Err(err) => response_value(
2406                        response_id,
2407                        None,
2408                        Some(JsonRpcError {
2409                            code: console_send_rpc_code(&err),
2410                            message: err.to_string(),
2411                            data: None,
2412                        }),
2413                    ),
2414                };
2415            }
2416            match aggregator.send(send_request).await {
2417                Ok(accepted) => response_value(
2418                    response_id,
2419                    Some(serde_json::to_value(accepted).unwrap_or(Value::Null)),
2420                    None,
2421                ),
2422                Err(err) => response_value(
2423                    response_id,
2424                    None,
2425                    Some(JsonRpcError {
2426                        code: console_send_rpc_code(&err),
2427                        message: err.to_string(),
2428                        data: None,
2429                    }),
2430                ),
2431            }
2432        }
2433        "mobkit/blob/get" => {
2434            let Some(blob_id) = request
2435                .params
2436                .get("blob_id")
2437                .or_else(|| request.params.get("id"))
2438                .and_then(Value::as_str)
2439            else {
2440                return invalid_params(response_id, "blob_id required");
2441            };
2442            if !is_valid_blob_id_value(blob_id) {
2443                return invalid_params(response_id, "invalid blob_id");
2444            }
2445            let Some(store) = runtime.binary_blob_store() else {
2446                return internal_error(response_id, "binary blob store unavailable");
2447            };
2448            match store.get_bytes(&meerkat_core::BlobId::from(blob_id)).await {
2449                Ok(payload) => response_value(
2450                    response_id,
2451                    Some(serde_json::json!({
2452                        "blob_id": payload.blob_id,
2453                        "media_type": payload.media_type,
2454                        "size": payload.size,
2455                        "data": base64::engine::general_purpose::STANDARD.encode(payload.data.as_ref()),
2456                    })),
2457                    None,
2458                ),
2459                Err(meerkat_core::BlobStoreError::NotFound(_)) => response_value(
2460                    response_id,
2461                    None,
2462                    Some(JsonRpcError {
2463                        code: -32001,
2464                        message: format!("blob not found: {blob_id}"),
2465                        data: Some(json!({ "kind": "not_found", "blob_id": blob_id })),
2466                    }),
2467                ),
2468                Err(err) => internal_error(response_id, format!("blob get failed: {err}")),
2469            }
2470        }
2471        "mobkit/list_members" => {
2472            let handle = runtime.handle();
2473            let entries = handle.list_members_including_retiring().await;
2474            let mut members = Vec::with_capacity(entries.len());
2475            for entry in &entries {
2476                members.push(member_entry_to_console_json(runtime, entry).await);
2477            }
2478            response_value(response_id, Some(Value::Array(members)), None)
2479        }
2480        "mobkit/get_member" => {
2481            let Some(member_id) = request.params.get("member_id").and_then(Value::as_str) else {
2482                return invalid_params(response_id, "member_id required");
2483            };
2484            let handle = runtime.handle();
2485            let identity = MeerkatId::from(member_id);
2486            let entries = handle.list_members_including_retiring().await;
2487            match entries.into_iter().find(|e| e.agent_identity == identity) {
2488                Some(entry) => response_value(
2489                    response_id,
2490                    Some(member_entry_to_console_json(runtime, &entry).await),
2491                    None,
2492                ),
2493                None => invalid_params(response_id, format!("member not found: {member_id}")),
2494            }
2495        }
2496        "mobkit/find_members" => {
2497            let Some(label_key) = request.params.get("label_key").and_then(Value::as_str) else {
2498                return invalid_params(response_id, "label_key required");
2499            };
2500            let Some(label_value) = request.params.get("label_value").and_then(Value::as_str)
2501            else {
2502                return invalid_params(response_id, "label_value required");
2503            };
2504            let handle = runtime.handle();
2505            let filter = MemberFilter {
2506                labels: std::collections::BTreeMap::from([(
2507                    label_key.to_string(),
2508                    label_value.to_string(),
2509                )]),
2510                role: None,
2511                state: None,
2512            };
2513            let entries = handle.list_members_matching(filter).await;
2514            let mut matches = Vec::with_capacity(entries.len());
2515            for entry in &entries {
2516                matches.push(member_entry_to_console_json(runtime, entry).await);
2517            }
2518            response_value(response_id, Some(Value::Array(matches)), None)
2519        }
2520        "mobkit/status_identity" => {
2521            let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2522                return invalid_params(response_id, "identity required");
2523            };
2524            let handle = runtime.handle();
2525            let mid = MeerkatId::from(identity);
2526            let Some((member, session_id)) = lookup_member_with_session(&handle, &mid).await else {
2527                return invalid_params(response_id, format!("identity not found: {identity}"));
2528            };
2529            let phase = if let Some(store) = &console_events {
2530                store.response_phase_for_identity(identity).await
2531            } else {
2532                None
2533            };
2534            response_value(
2535                response_id,
2536                Some(console_identity_status_json(&member, session_id, phase)),
2537                None,
2538            )
2539        }
2540        "mobkit/inspect_identity" => {
2541            let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2542                return invalid_params(response_id, "identity required");
2543            };
2544            let handle = runtime.handle();
2545            let mid = MeerkatId::from(identity);
2546            let Some((member, session_id)) = lookup_member_with_session(&handle, &mid).await else {
2547                return invalid_params(response_id, format!("identity not found: {identity}"));
2548            };
2549            let phase = if let Some(store) = &console_events {
2550                store.response_phase_for_identity(identity).await
2551            } else {
2552                None
2553            };
2554            response_value(
2555                response_id,
2556                Some(console_identity_inspect_json(&member, session_id, phase)),
2557                None,
2558            )
2559        }
2560        "mobkit/retire" => {
2561            let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2562                return invalid_params(response_id, "identity required");
2563            };
2564            if let Some(aggregator) = &console_aggregator {
2565                return match aggregator.retire_identity(identity).await {
2566                    Ok(true) => {
2567                        if let Some(store) = &console_events {
2568                            store
2569                                .record_lifecycle(identity, "identity_retired", json!({}))
2570                                .await;
2571                        }
2572                        response_value(response_id, Some(json!({ "identity": identity })), None)
2573                    }
2574                    Ok(false) => response_value(
2575                        response_id,
2576                        None,
2577                        Some(JsonRpcError {
2578                            code: -32001,
2579                            message: format!("unknown identity: {identity}"),
2580                            data: None,
2581                        }),
2582                    ),
2583                    Err(err) => internal_error(response_id, format!("retire failed: {err}")),
2584                };
2585            }
2586            match runtime.handle().retire(MeerkatId::from(identity)).await {
2587                Ok(()) => {
2588                    if let Some(store) = &console_events {
2589                        store
2590                            .record_lifecycle(identity, "identity_retired", json!({}))
2591                            .await;
2592                    }
2593                    response_value(response_id, Some(json!({ "identity": identity })), None)
2594                }
2595                Err(err) => internal_error(response_id, format!("retire failed: {err}")),
2596            }
2597        }
2598        "mobkit/respawn" => {
2599            let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2600                return invalid_params(response_id, "identity required");
2601            };
2602            let handle = runtime.handle();
2603            let mid = MeerkatId::from(identity);
2604            match handle.respawn(mid.clone(), None).await {
2605                Ok(_receipt) => {
2606                    if let Some(store) = &console_events {
2607                        store
2608                            .record_lifecycle(identity, "identity_respawned", json!({}))
2609                            .await;
2610                    }
2611                    let body = match lookup_member_with_session(&handle, &mid).await {
2612                        Some((entry, session_id)) => {
2613                            console_identity_status_json(&entry, session_id, None)
2614                        }
2615                        None => json!({ "identity": identity }),
2616                    };
2617                    response_value(response_id, Some(body), None)
2618                }
2619                Err(err) => internal_error(response_id, format!("respawn failed: {err}")),
2620            }
2621        }
2622        "mobkit/reset" => {
2623            let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2624                return invalid_params(response_id, "identity required");
2625            };
2626            let handle = runtime.handle();
2627            let mid = MeerkatId::from(identity);
2628            match handle.respawn(mid.clone(), None).await {
2629                Ok(_receipt) => {
2630                    if let Some(store) = &console_events {
2631                        store
2632                            .record_lifecycle(identity, "identity_reset", json!({}))
2633                            .await;
2634                    }
2635                    let body = match lookup_member_with_session(&handle, &mid).await {
2636                        Some((entry, session_id)) => {
2637                            console_identity_status_json(&entry, session_id, None)
2638                        }
2639                        None => json!({ "identity": identity }),
2640                    };
2641                    response_value(response_id, Some(body), None)
2642                }
2643                Err(err) => internal_error(response_id, format!("reset failed: {err}")),
2644            }
2645        }
2646        "mobkit/reset_all" => {
2647            match Box::pin(reset_all_live_console_agents(
2648                runtime,
2649                console_events.as_ref(),
2650                console_aggregator.as_ref(),
2651            ))
2652            .await
2653            {
2654                Ok(body) => response_value(response_id, Some(body), None),
2655                Err(err) => internal_error(response_id, format!("reset_all failed: {err}")),
2656            }
2657        }
2658        "mobkit/routing/routes/list" => {
2659            let Some(module_runtime) = &module_runtime else {
2660                return response_value(
2661                    response_id,
2662                    None,
2663                    Some(JsonRpcError {
2664                        code: -32601,
2665                        message: "Method not found".to_string(),
2666                        data: None,
2667                    }),
2668                );
2669            };
2670            let routes = module_runtime.lock().await.list_runtime_routes();
2671            response_value(response_id, Some(json!({ "routes": routes })), None)
2672        }
2673        "mobkit/delivery/history" => {
2674            let Some(module_runtime) = &module_runtime else {
2675                return response_value(
2676                    response_id,
2677                    None,
2678                    Some(JsonRpcError {
2679                        code: -32601,
2680                        message: "Method not found".to_string(),
2681                        data: None,
2682                    }),
2683                );
2684            };
2685            let limit = request
2686                .params
2687                .get("limit")
2688                .and_then(Value::as_u64)
2689                .unwrap_or(50) as usize;
2690            let history = module_runtime
2691                .lock()
2692                .await
2693                .delivery_history(DeliveryHistoryRequest {
2694                    recipient: None,
2695                    sink: None,
2696                    limit,
2697                });
2698            response_value(
2699                response_id,
2700                Some(serde_json::to_value(history).unwrap_or(Value::Null)),
2701                None,
2702            )
2703        }
2704        "mobkit/gating/pending" => {
2705            let Some(module_runtime) = &module_runtime else {
2706                return response_value(
2707                    response_id,
2708                    None,
2709                    Some(JsonRpcError {
2710                        code: -32601,
2711                        message: "Method not found".to_string(),
2712                        data: None,
2713                    }),
2714                );
2715            };
2716            let pending = module_runtime.lock().await.list_gating_pending();
2717            response_value(response_id, Some(json!({ "pending": pending })), None)
2718        }
2719        "mobkit/gating/audit" => {
2720            let Some(module_runtime) = &module_runtime else {
2721                return response_value(
2722                    response_id,
2723                    None,
2724                    Some(JsonRpcError {
2725                        code: -32601,
2726                        message: "Method not found".to_string(),
2727                        data: None,
2728                    }),
2729                );
2730            };
2731            let limit = request
2732                .params
2733                .get("limit")
2734                .and_then(Value::as_u64)
2735                .unwrap_or(50) as usize;
2736            let entries = module_runtime.lock().await.gating_audit_entries(limit);
2737            response_value(response_id, Some(json!({ "entries": entries })), None)
2738        }
2739        "mobkit/gating/decide" => {
2740            let Some(module_runtime) = &module_runtime else {
2741                return response_value(
2742                    response_id,
2743                    None,
2744                    Some(JsonRpcError {
2745                        code: -32601,
2746                        message: "Method not found".to_string(),
2747                        data: None,
2748                    }),
2749                );
2750            };
2751            let Some(pending_id) = request.params.get("pending_id").and_then(Value::as_str) else {
2752                return invalid_params(response_id, "pending_id required");
2753            };
2754            let Some(approver_id) = request.params.get("approver_id").and_then(Value::as_str)
2755            else {
2756                return invalid_params(response_id, "approver_id required");
2757            };
2758            let Some(raw_decision) = request.params.get("decision").and_then(Value::as_str) else {
2759                return invalid_params(response_id, "decision required");
2760            };
2761            let decision = match raw_decision {
2762                "approve" => GatingDecision::Approve,
2763                "reject" | "deny" => GatingDecision::Reject,
2764                "escalate" => GatingDecision::Escalate,
2765                _ => {
2766                    return invalid_params(
2767                        response_id,
2768                        format!("unsupported decision: {raw_decision}"),
2769                    );
2770                }
2771            };
2772            let reason = request
2773                .params
2774                .get("reason")
2775                .and_then(Value::as_str)
2776                .map(ToString::to_string);
2777            match module_runtime
2778                .lock()
2779                .await
2780                .decide_gating_action(GatingDecideRequest {
2781                    pending_id: pending_id.to_string(),
2782                    approver_id: approver_id.to_string(),
2783                    decision,
2784                    reason,
2785                }) {
2786                Ok(result) => response_value(
2787                    response_id,
2788                    Some(serde_json::to_value(result).unwrap_or(Value::Null)),
2789                    None,
2790                ),
2791                Err(err) => invalid_params(response_id, format!("gating decision failed: {err}")),
2792            }
2793        }
2794        "mobkit/ensure_member" => {
2795            let Some(role) = request.params.get("role").and_then(Value::as_str) else {
2796                return invalid_params(response_id, "role required");
2797            };
2798            let Some(agent_identity) = request.params.get("agent_identity").and_then(Value::as_str)
2799            else {
2800                return invalid_params(response_id, "agent_identity required");
2801            };
2802            let labels = match request.params.get("labels") {
2803                None | Some(Value::Null) => std::collections::BTreeMap::new(),
2804                Some(value) => match serde_json::from_value(value.clone()) {
2805                    Ok(map) => map,
2806                    Err(err) => {
2807                        return invalid_params(response_id, format!("invalid labels: {err}"));
2808                    }
2809                },
2810            };
2811            let context = request.params.get("context").cloned();
2812            let resume_session_id = match request.params.get("resume_session_id") {
2813                None => None,
2814                Some(Value::Null) => None,
2815                Some(v) => match v.as_str() {
2816                    Some(s) => match meerkat_core::types::SessionId::parse(s) {
2817                        Ok(sid) => Some(sid),
2818                        Err(_) => {
2819                            return invalid_params(
2820                                response_id,
2821                                format!("invalid resume_session_id: {s:?}"),
2822                            );
2823                        }
2824                    },
2825                    None => {
2826                        return invalid_params(
2827                            response_id,
2828                            "resume_session_id must be a string".to_string(),
2829                        );
2830                    }
2831                },
2832            };
2833            let additional_instructions = match request.params.get("additional_instructions") {
2834                None | Some(Value::Null) => None,
2835                Some(Value::Array(arr)) => {
2836                    let mut strs = Vec::with_capacity(arr.len());
2837                    for (i, entry) in arr.iter().enumerate() {
2838                        match entry.as_str() {
2839                            Some(s) => strs.push(s.to_string()),
2840                            None => {
2841                                return invalid_params(
2842                                    response_id,
2843                                    format!("additional_instructions[{i}] must be a string"),
2844                                );
2845                            }
2846                        }
2847                    }
2848                    if strs.is_empty() { None } else { Some(strs) }
2849                }
2850                Some(_) => {
2851                    return invalid_params(
2852                        response_id,
2853                        "additional_instructions must be an array of strings",
2854                    );
2855                }
2856            };
2857            let mut spec =
2858                SpawnMemberSpec::new(ProfileName::from(role), MeerkatId::from(agent_identity));
2859            if !labels.is_empty() {
2860                spec = spec.with_labels(labels);
2861            }
2862            if let Some(ctx) = context {
2863                spec = spec.with_context(ctx);
2864            }
2865            if let Some(sid) = resume_session_id {
2866                spec = spec.with_resume_bridge_session_id(sid);
2867            }
2868            if let Some(instructions) = additional_instructions {
2869                spec = spec.with_additional_instructions(instructions);
2870            }
2871            let handle = runtime.handle();
2872            let mid = spec.identity.clone();
2873            match handle.ensure_member(spec).await {
2874                Ok(_outcome) => {
2875                    let body = match lookup_member_with_session(&handle, &mid).await {
2876                        Some((entry, _sid)) => member_entry_to_json(&entry),
2877                        None => Value::Null,
2878                    };
2879                    response_value(response_id, Some(body), None)
2880                }
2881                Err(err) => internal_error(response_id, format!("ensure_member failed: {err}")),
2882            }
2883        }
2884        "mobkit/retire_member" => {
2885            let Some(member_id) = request.params.get("member_id").and_then(Value::as_str) else {
2886                return invalid_params(response_id, "member_id required");
2887            };
2888            if let Some(aggregator) = &console_aggregator {
2889                return match aggregator.retire_identity(member_id).await {
2890                    Ok(true) => response_value(
2891                        response_id,
2892                        Some(serde_json::json!({ "accepted": true })),
2893                        None,
2894                    ),
2895                    Ok(false) => response_value(
2896                        response_id,
2897                        None,
2898                        Some(JsonRpcError {
2899                            code: -32001,
2900                            message: format!("unknown identity: {member_id}"),
2901                            data: None,
2902                        }),
2903                    ),
2904                    Err(err) => internal_error(response_id, format!("retire_member failed: {err}")),
2905                };
2906            }
2907            match runtime.handle().retire(MeerkatId::from(member_id)).await {
2908                Ok(()) => response_value(
2909                    response_id,
2910                    Some(serde_json::json!({ "accepted": true })),
2911                    None,
2912                ),
2913                Err(err) => internal_error(response_id, format!("retire_member failed: {err}")),
2914            }
2915        }
2916        "mobkit/respawn_member" => {
2917            let Some(member_id) = request.params.get("member_id").and_then(Value::as_str) else {
2918                return invalid_params(response_id, "member_id required");
2919            };
2920            match runtime
2921                .handle()
2922                .respawn(MeerkatId::from(member_id), None)
2923                .await
2924            {
2925                Ok(_receipt) => response_value(
2926                    response_id,
2927                    Some(serde_json::json!({ "accepted": true })),
2928                    None,
2929                ),
2930                Err(err) => internal_error(response_id, format!("respawn_member failed: {err}")),
2931            }
2932        }
2933        "mobkit/reconcile_edges" => response_value(
2934            response_id,
2935            Some(serde_json::json!({
2936                "status": "noop",
2937                "reason": "console runtime routes directly to MobRuntime",
2938            })),
2939            None,
2940        ),
2941        "mobkit/mob_events/query" | "mobkit/mob_events/subscribe" => {
2942            let query: EventQuery = if request.params.is_null() {
2943                EventQuery::default()
2944            } else {
2945                match serde_json::from_value(request.params.clone()) {
2946                    Ok(q) => q,
2947                    Err(err) => {
2948                        return invalid_params(response_id, format!("invalid query params: {err}"));
2949                    }
2950                }
2951            };
2952            let Some(store) = mob_events.as_ref() else {
2953                return response_value(
2954                    response_id,
2955                    Some(serde_json::json!({
2956                        "events": [],
2957                        "next_after_seq": Value::Null,
2958                    })),
2959                    None,
2960                );
2961            };
2962            let events_view = runtime.handle().events();
2963            // Capture latest_cursor at handshake so the SSE continuation
2964            // URL still covers the empty-snapshot case without losing
2965            // events between the JSON-RPC response and the SSE connect.
2966            let latest_at_handshake = events_view.latest_cursor().await.unwrap_or(0);
2967            let result = crate::unified_runtime::mob_events::query_ledger_with_filter(
2968                &events_view,
2969                store,
2970                &query,
2971            )
2972            .await;
2973            match result {
2974                Ok(events) => {
2975                    let last_cursor = events.last().map(|event| event.cursor);
2976                    let body = if request.method == "mobkit/mob_events/subscribe" {
2977                        let subscribe_url = crate::unified_runtime::mob_events::build_subscribe_url(
2978                            &query,
2979                            last_cursor,
2980                            latest_at_handshake,
2981                        );
2982                        serde_json::json!({
2983                            "stream": "mob_events",
2984                            "events": events,
2985                            "next_after_seq": last_cursor,
2986                            "subscribe_url": subscribe_url,
2987                            "keep_alive": {
2988                                "interval_ms": 15_000_u64,
2989                                "event": "keep_alive",
2990                            },
2991                        })
2992                    } else {
2993                        serde_json::json!({
2994                            "events": events,
2995                            "next_after_seq": last_cursor,
2996                        })
2997                    };
2998                    response_value(response_id, Some(body), None)
2999                }
3000                Err(crate::unified_runtime::mob_events::MobEventsQueryError::Stale {
3001                    after_cursor,
3002                    latest_cursor,
3003                }) => stale_event_cursor_response(response_id, after_cursor, latest_cursor),
3004                Err(err) => internal_error(response_id, format!("mob_events query failed: {err}")),
3005            }
3006        }
3007        // 0.5 API methods
3008        "mobkit/member_status" => {
3009            let Some(member_id) = request.params.get("member_id").and_then(Value::as_str) else {
3010                return invalid_params(response_id, "member_id required");
3011            };
3012            match runtime
3013                .handle()
3014                .member_status(&MeerkatId::from(member_id))
3015                .await
3016            {
3017                Ok(snapshot) => response_value(
3018                    response_id,
3019                    Some(serde_json::to_value(&snapshot).unwrap_or(Value::Null)),
3020                    None,
3021                ),
3022                Err(err) => internal_error(response_id, format!("member_status failed: {err}")),
3023            }
3024        }
3025        "mobkit/force_cancel_member" => {
3026            let Some(member_id) = request.params.get("member_id").and_then(Value::as_str) else {
3027                return invalid_params(response_id, "member_id required");
3028            };
3029            match runtime
3030                .handle()
3031                .force_cancel_member(MeerkatId::from(member_id))
3032                .await
3033            {
3034                Ok(()) => response_value(
3035                    response_id,
3036                    Some(serde_json::json!({ "accepted": true })),
3037                    None,
3038                ),
3039                Err(err) => {
3040                    internal_error(response_id, format!("force_cancel_member failed: {err}"))
3041                }
3042            }
3043        }
3044        "mobkit/wait_ready" => {
3045            let timeout = request
3046                .params
3047                .get("timeout_ms")
3048                .and_then(Value::as_u64)
3049                .map(std::time::Duration::from_millis);
3050            match runtime.handle().wait_for_ready(timeout).await {
3051                Ok(ready) => {
3052                    let entries: Vec<Value> = ready
3053                        .into_iter()
3054                        .map(|(identity, snapshot)| {
3055                            serde_json::json!({
3056                                "agent_identity": identity.to_string(),
3057                                "snapshot": serde_json::to_value(&snapshot)
3058                                    .unwrap_or(Value::Null),
3059                            })
3060                        })
3061                        .collect();
3062                    response_value(
3063                        response_id,
3064                        Some(serde_json::json!({
3065                            "ready": entries,
3066                            "timeout": false,
3067                        })),
3068                        None,
3069                    )
3070                }
3071                Err(err) => {
3072                    let message = err.to_string();
3073                    if message.to_lowercase().contains("timeout") {
3074                        response_value(
3075                            response_id,
3076                            Some(serde_json::json!({
3077                                "ready": Vec::<Value>::new(),
3078                                "timeout": true,
3079                            })),
3080                            None,
3081                        )
3082                    } else {
3083                        internal_error(response_id, format!("wait_for_ready failed: {message}"))
3084                    }
3085                }
3086            }
3087        }
3088        "mobkit/collect_completed" => {
3089            let completed = runtime.handle().collect_completed().await;
3090            let entries: Vec<Value> = completed
3091                .into_iter()
3092                .map(|(mid, snapshot)| {
3093                    serde_json::json!({
3094                        "member_id": mid.to_string(),
3095                        "snapshot": serde_json::to_value(&snapshot).unwrap_or(Value::Null),
3096                    })
3097                })
3098                .collect();
3099            response_value(
3100                response_id,
3101                Some(serde_json::json!({ "completed": entries })),
3102                None,
3103            )
3104        }
3105        "mobkit/cancel_flow" => {
3106            let Some(run_id) = request.params.get("run_id").and_then(Value::as_str) else {
3107                return invalid_params(response_id, "run_id required");
3108            };
3109            let run_id: meerkat_mob::RunId = match run_id.parse() {
3110                Ok(id) => id,
3111                Err(_) => return invalid_params(response_id, "invalid run_id format"),
3112            };
3113            match runtime.handle().cancel_flow(run_id).await {
3114                Ok(()) => response_value(
3115                    response_id,
3116                    Some(serde_json::json!({ "accepted": true })),
3117                    None,
3118                ),
3119                Err(err) => internal_error(response_id, format!("cancel_flow failed: {err}")),
3120            }
3121        }
3122        "mobkit/flow_status" => {
3123            let Some(run_id) = request.params.get("run_id").and_then(Value::as_str) else {
3124                return invalid_params(response_id, "run_id required");
3125            };
3126            let run_id: meerkat_mob::RunId = match run_id.parse() {
3127                Ok(id) => id,
3128                Err(_) => return invalid_params(response_id, "invalid run_id format"),
3129            };
3130            match runtime.handle().flow_status(run_id).await {
3131                Ok(Some(mob_run)) => response_value(
3132                    response_id,
3133                    Some(serde_json::to_value(&mob_run).unwrap_or(Value::Null)),
3134                    None,
3135                ),
3136                Ok(None) => response_value(response_id, Some(Value::Null), None),
3137                Err(err) => internal_error(response_id, format!("flow_status failed: {err}")),
3138            }
3139        }
3140        "mobkit/list_flows" => {
3141            let flows: Vec<String> = runtime
3142                .handle()
3143                .list_flows()
3144                .into_iter()
3145                .map(|id| id.to_string())
3146                .collect();
3147            response_value(
3148                response_id,
3149                Some(serde_json::json!({ "flows": flows })),
3150                None,
3151            )
3152        }
3153        "mobkit/list_runs" => {
3154            let flow_id = request
3155                .params
3156                .get("flow_id")
3157                .and_then(Value::as_str)
3158                .filter(|value| !value.is_empty())
3159                .map(meerkat_mob::FlowId::from);
3160            match runtime.handle().list_runs(flow_id.as_ref()).await {
3161                Ok(runs) => response_value(
3162                    response_id,
3163                    Some(serde_json::json!({
3164                        "runs": serde_json::to_value(&runs).unwrap_or(Value::Null),
3165                    })),
3166                    None,
3167                ),
3168                Err(err) => internal_error(response_id, format!("list_runs failed: {err}")),
3169            }
3170        }
3171        "mobkit/run_flow" => {
3172            let Some(flow_id_str) = request.params.get("flow_id").and_then(Value::as_str) else {
3173                return invalid_params(response_id, "flow_id required");
3174            };
3175            if flow_id_str.is_empty() {
3176                return invalid_params(response_id, "flow_id required");
3177            }
3178            let flow_id = meerkat_mob::FlowId::from(flow_id_str);
3179            let flow_params = request.params.get("params").cloned().unwrap_or(Value::Null);
3180            match runtime.handle().run_flow(flow_id, flow_params).await {
3181                Ok(run_id) => response_value(
3182                    response_id,
3183                    Some(serde_json::json!({ "run_id": run_id.to_string() })),
3184                    None,
3185                ),
3186                Err(err) => invalid_params(response_id, format!("run_flow failed: {err}")),
3187            }
3188        }
3189        "mobkit/spawn_helper" => {
3190            let Some(agent_identity) = request.params.get("agent_identity").and_then(Value::as_str)
3191            else {
3192                return invalid_params(response_id, "agent_identity required");
3193            };
3194            let Some(task) = request.params.get("task").and_then(Value::as_str) else {
3195                return invalid_params(response_id, "task required");
3196            };
3197            let options = match parse_console_helper_options(request.params.get("options")) {
3198                Ok(opts) => opts,
3199                Err(msg) => return invalid_params(response_id, msg),
3200            };
3201            let handle = runtime.handle();
3202            match handle
3203                .spawn_helper(MeerkatId::from(agent_identity), task, options)
3204                .await
3205            {
3206                Ok(result) => {
3207                    // Meerkat 0.6 retires the helper before `spawn_helper`
3208                    // returns, so a post-hoc `resolve_bridge_session_id`
3209                    // call would come back `None`. We drop `session_id`
3210                    // from the response rather than emit a misleading null.
3211                    response_value(
3212                        response_id,
3213                        Some(serde_json::json!({
3214                            "output": result.output,
3215                            "tokens_used": result.tokens_used,
3216                        })),
3217                        None,
3218                    )
3219                }
3220                Err(err) => internal_error(response_id, format!("spawn_helper failed: {err}")),
3221            }
3222        }
3223        "mobkit/fork_helper" => {
3224            let Some(source) = request
3225                .params
3226                .get("source_member_id")
3227                .and_then(Value::as_str)
3228            else {
3229                return invalid_params(response_id, "source_member_id required");
3230            };
3231            let Some(agent_identity) = request.params.get("agent_identity").and_then(Value::as_str)
3232            else {
3233                return invalid_params(response_id, "agent_identity required");
3234            };
3235            let Some(task) = request.params.get("task").and_then(Value::as_str) else {
3236                return invalid_params(response_id, "task required");
3237            };
3238            let fork_context = match request.params.get("fork_context") {
3239                Some(v) if !v.is_null() => {
3240                    match serde_json::from_value::<meerkat_mob::launch::ForkContext>(v.clone()) {
3241                        Ok(ctx) => ctx,
3242                        Err(err) => {
3243                            return invalid_params(
3244                                response_id,
3245                                format!("invalid fork_context: {err}"),
3246                            );
3247                        }
3248                    }
3249                }
3250                _ => meerkat_mob::launch::ForkContext::default(),
3251            };
3252            let options = match parse_console_helper_options(request.params.get("options")) {
3253                Ok(opts) => opts,
3254                Err(msg) => return invalid_params(response_id, msg),
3255            };
3256            let handle = runtime.handle();
3257            match handle
3258                .fork_helper(
3259                    &MeerkatId::from(source),
3260                    MeerkatId::from(agent_identity),
3261                    task,
3262                    fork_context,
3263                    options,
3264                )
3265                .await
3266            {
3267                Ok(result) => {
3268                    // See `spawn_helper`: meerkat 0.6 retires the forked
3269                    // helper before returning, so session_id is omitted
3270                    // rather than silently null.
3271                    response_value(
3272                        response_id,
3273                        Some(serde_json::json!({
3274                            "output": result.output,
3275                            "tokens_used": result.tokens_used,
3276                        })),
3277                        None,
3278                    )
3279                }
3280                Err(err) => internal_error(response_id, format!("fork_helper failed: {err}")),
3281            }
3282        }
3283        "mobkit/attach_existing_session" => {
3284            let Some(role) = request.params.get("role").and_then(Value::as_str) else {
3285                return invalid_params(response_id, "role required");
3286            };
3287            let Some(agent_identity) = request.params.get("agent_identity").and_then(Value::as_str)
3288            else {
3289                return invalid_params(response_id, "agent_identity required");
3290            };
3291            let Some(session_id_str) = request.params.get("session_id").and_then(Value::as_str)
3292            else {
3293                return invalid_params(response_id, "session_id required");
3294            };
3295            let bridge_session_id = match meerkat_core::types::SessionId::parse(session_id_str) {
3296                Ok(s) => s,
3297                Err(_) => return invalid_params(response_id, "invalid session_id format"),
3298            };
3299            let mid = MeerkatId::from(agent_identity);
3300            let spec = SpawnMemberSpec::new(ProfileName::from(role), mid.clone())
3301                .with_launch_mode(MemberLaunchMode::Resume { bridge_session_id });
3302            let handle = runtime.handle();
3303            match handle.spawn_spec(spec).await {
3304                Ok(_) => match handle.member_status(&mid).await {
3305                    Ok(snapshot) => response_value(
3306                        response_id,
3307                        Some(serde_json::to_value(&snapshot).unwrap_or(Value::Null)),
3308                        None,
3309                    ),
3310                    Err(err) => internal_error(
3311                        response_id,
3312                        format!("attach_existing_session status lookup failed: {err}"),
3313                    ),
3314                },
3315                Err(err) => internal_error(
3316                    response_id,
3317                    format!("attach_existing_session failed: {err}"),
3318                ),
3319            }
3320        }
3321        "mobkit/cross_mob/wire_local" => {
3322            handle_console_wire_local(runtime, &request.params, response_id, true).await
3323        }
3324        "mobkit/cross_mob/unwire_local" => {
3325            handle_console_wire_local(runtime, &request.params, response_id, false).await
3326        }
3327        "mobkit/peer_pubkey" => match gateway_peer_keys {
3328            Some(keys) => response_value(
3329                response_id,
3330                Some(serde_json::json!({ "pubkey_b64": keys.pubkey_b64() })),
3331                None,
3332            ),
3333            None => response_value(
3334                response_id,
3335                None,
3336                Some(JsonRpcError {
3337                    code: -32004,
3338                    message: "gateway has no signing keypair configured".to_string(),
3339                    data: None,
3340                }),
3341            ),
3342        },
3343        "mobkit/cross_mob/peer_info" => {
3344            let member_id = request.params.get("member_id").and_then(Value::as_str);
3345            match member_id {
3346                Some(mid) if !mid.is_empty() => {
3347                    let handle = runtime.handle();
3348                    let mob_id = handle.mob_id().to_string();
3349                    let meerkat_id = MeerkatId::from(mid);
3350                    match handle.get_member(&meerkat_id).await {
3351                        Some(entry) => match entry.peer_id() {
3352                            Some(peer_id) => {
3353                                let comms_name = format!("{}/{}/{}", mob_id, entry.role, mid);
3354                                let address = format!("inproc://{comms_name}");
3355                                response_value(
3356                                    response_id,
3357                                    Some(serde_json::json!({
3358                                        "member_id": mid,
3359                                        "mob_id": mob_id,
3360                                        "comms_name": comms_name,
3361                                        "peer_id": peer_id,
3362                                        "address": address,
3363                                    })),
3364                                    None,
3365                                )
3366                            }
3367                            None => response_value(
3368                                response_id,
3369                                None,
3370                                Some(JsonRpcError {
3371                                    code: -32000,
3372                                    message: format!("member {mid:?} has no comms runtime"),
3373                                    data: None,
3374                                }),
3375                            ),
3376                        },
3377                        None => response_value(
3378                            response_id,
3379                            None,
3380                            Some(JsonRpcError {
3381                                code: -32000,
3382                                message: format!("member {mid:?} not found"),
3383                                data: None,
3384                            }),
3385                        ),
3386                    }
3387                }
3388                _ => invalid_params(response_id, "member_id required".to_string()),
3389            }
3390        }
3391        "mobkit/cross_mob/directory" => {
3392            let entries: Vec<Value> = contact_directory
3393                .map(|dir| {
3394                    dir.list()
3395                        .into_iter()
3396                        .filter_map(|e| serde_json::to_value(e).ok())
3397                        .collect()
3398                })
3399                .unwrap_or_default();
3400            response_value(
3401                response_id,
3402                Some(serde_json::json!({ "mobs": entries })),
3403                None,
3404            )
3405        }
3406        method
3407            if matches!(
3408                method,
3409                "mobkit/mob_labels/set"
3410                    | "mobkit/mob_labels/get"
3411                    | "mobkit/mob_labels/delete"
3412                    | "mobkit/run_labels/set"
3413                    | "mobkit/run_labels/get"
3414                    | "mobkit/run_labels/delete",
3415            ) =>
3416        {
3417            dispatch_label_method(
3418                method,
3419                metadata_table.as_deref(),
3420                runtime.handle().mob_id().as_str(),
3421                response_id,
3422                &request.params,
3423            )
3424            .await
3425        }
3426        _ => response_value(
3427            response_id,
3428            None,
3429            Some(JsonRpcError {
3430                code: -32601,
3431                message: "Method not found".to_string(),
3432                data: None,
3433            }),
3434        ),
3435    }
3436}
3437
3438/// Dispatch the six `mobkit/{mob,run}_labels/*` RPCs against a metadata table.
3439///
3440/// Single entrypoint shared by every label method; the dispatch arms in
3441/// `handle_console_runtime_rpc` simply delegate based on the matched method
3442/// name. Mirrors the unified-runtime handlers in `rpc::mob_methods` — both
3443/// transports project the same outcomes to the same wire shape.
3444async fn dispatch_label_method(
3445    method: &str,
3446    metadata_table: Option<&RuntimeMetadataTable>,
3447    mob_id: &str,
3448    response_id: Value,
3449    params: &Value,
3450) -> Value {
3451    let Some(table) = metadata_table else {
3452        return invalid_params(
3453            response_id,
3454            "metadata table not configured for this runtime",
3455        );
3456    };
3457
3458    let scope = match method {
3459        "mobkit/mob_labels/set" | "mobkit/mob_labels/get" | "mobkit/mob_labels/delete" => {
3460            MetadataScope::Mob(mob_id.to_string())
3461        }
3462        _ => match crate::runtime::parse_run_id_param(params) {
3463            Ok(run_id) => MetadataScope::Run(mob_id.to_string(), run_id.to_string()),
3464            Err(message) => return invalid_params(response_id, message),
3465        },
3466    };
3467
3468    let outcome = match method {
3469        "mobkit/mob_labels/set" | "mobkit/run_labels/set" => {
3470            crate::runtime::dispatch_labels_set(table, scope, params).await
3471        }
3472        "mobkit/mob_labels/get" | "mobkit/run_labels/get" => {
3473            crate::runtime::dispatch_labels_get(table, scope).await
3474        }
3475        "mobkit/mob_labels/delete" | "mobkit/run_labels/delete" => {
3476            crate::runtime::dispatch_labels_delete(table, scope).await
3477        }
3478        _ => unreachable!("dispatch_label_method called with non-label method: {method}"),
3479    };
3480
3481    match outcome {
3482        crate::runtime::LabelRpcResult::Accepted => response_value(
3483            response_id,
3484            Some(serde_json::json!({"accepted": true})),
3485            None,
3486        ),
3487        crate::runtime::LabelRpcResult::Labels(labels) => response_value(
3488            response_id,
3489            Some(serde_json::json!({"labels": labels_to_json_value(&labels)})),
3490            None,
3491        ),
3492        crate::runtime::LabelRpcResult::InvalidParams(message) => {
3493            invalid_params(response_id, message)
3494        }
3495    }
3496}
3497
3498/// Shared body for `mobkit/cross_mob/wire_local` and `unwire_local` over
3499/// the console transport. `wire = true` calls `MobHandle::wire`, `false`
3500/// calls `MobHandle::unwire`. Both share param parsing and response shape.
3501///
3502/// Non-inproc transports (`tcp://`, `uds://`) require a non-zero pubkey;
3503/// the caller may supply it via `remote_pubkey_b64` or rely on TOFU
3504/// flows configured at the contact-directory layer (which this handler
3505/// does not consult — it only sees the explicit params).
3506async fn handle_console_wire_local(
3507    runtime: &MobRuntime,
3508    params: &Value,
3509    response_id: Value,
3510    wire: bool,
3511) -> Value {
3512    let local = params.get("local_member_id").and_then(Value::as_str);
3513    let comms_name = params.get("remote_comms_name").and_then(Value::as_str);
3514    let peer_id = params.get("remote_peer_id").and_then(Value::as_str);
3515    let addr = params.get("remote_address").and_then(Value::as_str);
3516
3517    let remote_pubkey = match params.get("remote_pubkey_b64") {
3518        None => None,
3519        Some(v) if v.is_null() => None,
3520        Some(v) => match v.as_str() {
3521            Some(s) if !s.is_empty() => match crate::auth::peer_keys::decode_pubkey_b64(s) {
3522                Ok(bytes) => Some(bytes),
3523                Err(err) => {
3524                    return invalid_params(response_id, format!("remote_pubkey_b64: {err}"));
3525                }
3526            },
3527            _ => None,
3528        },
3529    };
3530
3531    let (local_id, cname, pid, address) = match (local, comms_name, peer_id, addr) {
3532        (Some(l), Some(c), Some(p), Some(a))
3533            if !l.is_empty() && !c.is_empty() && !p.is_empty() && !a.is_empty() =>
3534        {
3535            (l, c, p, a)
3536        }
3537        _ => {
3538            return invalid_params(
3539                response_id,
3540                "local_member_id, remote_comms_name, remote_peer_id, and remote_address required",
3541            );
3542        }
3543    };
3544
3545    let is_inproc = address.starts_with("inproc://");
3546    let spec_result = match (is_inproc, remote_pubkey) {
3547        (true, None) => TrustedPeerDescriptor::test_only_unsigned(cname, pid, address),
3548        (true, Some(bytes)) => {
3549            TrustedPeerDescriptor::unsigned_with_pubkey(cname, pid, bytes, address)
3550        }
3551        (false, None) => {
3552            return invalid_params(
3553                response_id,
3554                "remote_pubkey_b64 is required for non-inproc transports",
3555            );
3556        }
3557        (false, Some(bytes)) => {
3558            if bytes == [0u8; 32] {
3559                return invalid_params(
3560                    response_id,
3561                    "remote_pubkey_b64 must be non-zero for non-inproc transports",
3562                );
3563            }
3564            TrustedPeerDescriptor::unsigned_with_pubkey(cname, pid, bytes, address)
3565        }
3566    };
3567
3568    let spec = match spec_result {
3569        Ok(spec) => spec,
3570        Err(err) => {
3571            return invalid_params(response_id, format!("invalid peer spec: {err}"));
3572        }
3573    };
3574
3575    let result = if wire {
3576        runtime
3577            .handle()
3578            .wire(MeerkatId::from(local_id), PeerTarget::External(spec))
3579            .await
3580    } else {
3581        runtime
3582            .handle()
3583            .unwire(MeerkatId::from(local_id), PeerTarget::External(spec))
3584            .await
3585    };
3586
3587    let action = if wire { "wire_local" } else { "unwire_local" };
3588    match result {
3589        Ok(()) => response_value(
3590            response_id,
3591            Some(serde_json::json!({
3592                "accepted": true,
3593                "local_member_id": local_id,
3594                "remote_comms_name": cname,
3595            })),
3596            None,
3597        ),
3598        Err(err) => internal_error(response_id, format!("cross_mob/{action} failed: {err}")),
3599    }
3600}
3601
3602async fn build_live_snapshot(
3603    runtime: &MobRuntime,
3604    config_module_ids: &[String],
3605    console_events: Option<&ConsoleEventStore>,
3606    visibility_policy: &dyn ConsoleVisibilityPolicy,
3607    read_model: &ConsoleSnapshotReadModel,
3608) -> ConsoleLiveSnapshot {
3609    let read_model_state = read_model.snapshot(runtime).await;
3610    let running = read_model_state.running.unwrap_or(true);
3611    // Hot path: clone the pre-projected members from the cached read
3612    // model. NO `handle.*` async calls happen here — the background
3613    // refresh task is the only thing that walks the mob roster, so
3614    // snapshot requests never contend with spawn/retire activity.
3615    // First request on a cold cache pays one synchronous refresh via
3616    // `snapshot(runtime).await` above; subsequent requests just clone.
3617    let mut members = read_model_state.primary_members.clone();
3618    if visibility_policy.include_implicit_delegate_members() {
3619        for group in &read_model_state.delegate_member_groups {
3620            members.extend(group.iter().cloned());
3621        }
3622    }
3623    dedupe_console_members_by_identity(&mut members);
3624
3625    // Use configured module IDs when available because topology and health
3626    // surfaces describe loaded modules, not live mob members.
3627    // Fall back to member IDs only for pure mob runtimes with no module config.
3628    let loaded_modules = if config_module_ids.is_empty() {
3629        let mut mods: Vec<String> = members
3630            .iter()
3631            .filter(|member| member.state != MEMBER_STATE_RETIRING)
3632            .map(|member| member.agent_identity.clone())
3633            .collect();
3634        mods.sort();
3635        mods
3636    } else {
3637        let mut mods = config_module_ids.to_vec();
3638        mods.sort();
3639        mods
3640    };
3641
3642    let agents = members
3643        .iter()
3644        .map(|member| async move {
3645            let label = member
3646                .labels
3647                .get("display_name")
3648                .cloned()
3649                .unwrap_or_else(|| member.agent_identity.clone());
3650            let watched = member
3651                .labels
3652                .get("console_watched")
3653                .map(|value: &String| value == "true");
3654            let alert_level = member
3655                .labels
3656                .get("console_alert_level")
3657                .filter(|value: &&String| matches!(value.as_str(), "elevated" | "critical"))
3658                .cloned();
3659            let degraded = member
3660                .labels
3661                .get("console_degraded")
3662                .map(|value: &String| value == "true");
3663            let degraded_reason = member.labels.get("console_degraded_reason").cloned();
3664            let response_phase = match console_events {
3665                Some(store) => {
3666                    store
3667                        .response_phase_for_identity(&member.agent_identity)
3668                        .await
3669                }
3670                None => None,
3671            };
3672            ConsoleAgentLiveSnapshot {
3673                agent_id: member.agent_identity.clone(),
3674                member_id: member.agent_identity.clone(),
3675                label,
3676                kind: "meerkat".to_string(),
3677                identity: Some(member.agent_identity.clone()),
3678                role: Some(member.role.clone()),
3679                state: Some(member.state.clone()),
3680                session_id: member.session_id.clone(),
3681                model_capabilities: member.model_capabilities.clone(),
3682                response_phase,
3683                watched,
3684                alert_level,
3685                degraded,
3686                degraded_reason,
3687            }
3688        })
3689        .collect::<Vec<_>>();
3690    let mut agents = join_all(agents).await;
3691    agents.sort_by(|left, right| left.label.cmp(&right.label));
3692    ConsoleLiveSnapshot::new(
3693        Some(runtime.handle().mob_id().to_string()),
3694        running,
3695        loaded_modules,
3696        agents,
3697        members,
3698        true,
3699    )
3700}
3701
3702async fn collect_console_snapshot_read_model(
3703    runtime: &MobRuntime,
3704) -> ConsoleSnapshotReadModelState {
3705    let handle = runtime.handle();
3706    let mut state = ConsoleSnapshotReadModelState {
3707        running: Some(matches!(
3708            handle.status().await.ok(),
3709            Some(MobState::Creating | MobState::Running)
3710        )),
3711        ..ConsoleSnapshotReadModelState::default()
3712    };
3713    collect_console_session_index_for_handle(&handle, &mut state).await;
3714
3715    // Snapshot + project the primary mob into the cache. Done here
3716    // under the background refresh lock so per-request
3717    // `build_live_snapshot` calls never need to enter MobHandle async
3718    // methods. The session-id index in `state` was populated above by
3719    // `collect_console_session_index_for_handle`.
3720    let (primary_members, _primary_owner_index) =
3721        project_console_members_from_handle(&handle, None, None, &state).await;
3722    state.primary_members = primary_members;
3723
3724    let Some(mcp_state) = runtime.agent_mob_mcp_state() else {
3725        return state;
3726    };
3727    let primary_mob_id = handle.mob_id().to_string();
3728    let mut processed = BTreeSet::from([primary_mob_id]);
3729    let mut delegate_groups: Vec<Vec<ConsoleMember>> = Vec::new();
3730    loop {
3731        let mut progressed = false;
3732        for (mob_id, _mob_state) in mcp_state.mob_list().await {
3733            if processed.contains(mob_id.as_str()) {
3734                continue;
3735            }
3736            let Ok(delegate_handle) = mcp_state.handle_for(&mob_id).await else {
3737                continue;
3738            };
3739            let Some(owner_session_id) = delegate_handle.definition().owner_bridge_session_index()
3740            else {
3741                processed.insert(mob_id.to_string());
3742                continue;
3743            };
3744            let Some(host_identity) = state.session_owner_by_id.get(owner_session_id).cloned()
3745            else {
3746                continue;
3747            };
3748            collect_console_session_index_for_handle(&delegate_handle, &mut state).await;
3749            let (delegate_members, _delegate_owner_index) = project_console_members_from_handle(
3750                &delegate_handle,
3751                Some(&host_identity),
3752                Some(mob_id.as_str()),
3753                &state,
3754            )
3755            .await;
3756            delegate_groups.push(delegate_members);
3757            processed.insert(mob_id.to_string());
3758            progressed = true;
3759        }
3760        if !progressed {
3761            break;
3762        }
3763    }
3764    state.delegate_member_groups = delegate_groups;
3765    state
3766}
3767
3768async fn collect_console_session_index_for_handle(
3769    handle: &MobHandle,
3770    state: &mut ConsoleSnapshotReadModelState,
3771) {
3772    for entry in handle.list_members_including_retiring().await {
3773        let identity = entry.agent_identity.to_string();
3774        let Some(session_id) = handle
3775            .resolve_bridge_session_id(&entry.agent_identity)
3776            .await
3777            .map(|session_id| session_id.to_string())
3778        else {
3779            state.session_id_by_identity.remove(&identity);
3780            continue;
3781        };
3782        state
3783            .session_owner_by_id
3784            .insert(session_id.clone(), identity.clone());
3785        state.session_id_by_identity.insert(identity, session_id);
3786    }
3787}
3788
3789fn apply_console_visibility_policy(
3790    snapshot: &mut ConsoleLiveSnapshot,
3791    visibility_policy: &dyn ConsoleVisibilityPolicy,
3792) {
3793    let mut hidden = BTreeSet::new();
3794    snapshot.members.retain(|member| {
3795        let visible = visibility_policy.member_visible(member);
3796        if !visible {
3797            hidden.insert(member.agent_identity.clone());
3798        }
3799        visible
3800    });
3801    snapshot
3802        .agents
3803        .retain(|agent| !hidden.contains(&agent.agent_id));
3804    snapshot
3805        .loaded_modules
3806        .retain(|module_id| !hidden.contains(module_id));
3807}
3808
3809async fn reset_all_live_console_agents(
3810    runtime: &MobRuntime,
3811    console_events: Option<&ConsoleEventStore>,
3812    console_aggregator: Option<&MobKitConsoleAggregator>,
3813) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
3814    let read_model = ConsoleSnapshotReadModel::default();
3815    *read_model.inner.write().await = collect_console_snapshot_read_model(runtime).await;
3816    // Mark the freshly-built model primed so `build_live_snapshot` doesn't
3817    // try to re-prime; this is a one-shot read for the reset path.
3818    read_model
3819        .primed
3820        .store(true, std::sync::atomic::Ordering::Release);
3821    let snapshot = build_live_snapshot(
3822        runtime,
3823        &[],
3824        console_events,
3825        &AllowAllConsoleVisibilityPolicy,
3826        &read_model,
3827    )
3828    .await;
3829    let mut main_identities = BTreeSet::new();
3830    let mut delegate_members = BTreeSet::new();
3831    for member in snapshot.members {
3832        if member.state == MEMBER_STATE_RETIRING {
3833            continue;
3834        }
3835        if let Some(source_mob_id) = member.labels.get("source_mob_id").cloned() {
3836            delegate_members.insert((source_mob_id, member.agent_identity));
3837        } else {
3838            main_identities.insert(member.agent_identity);
3839        }
3840    }
3841    let current_main_identities = main_identities.clone();
3842    let baseline_specs = runtime.baseline_member_specs().await;
3843    let baseline_identities = baseline_specs
3844        .iter()
3845        .map(|spec| spec.identity.to_string())
3846        .collect::<BTreeSet<_>>();
3847    main_identities.extend(baseline_identities.iter().cloned());
3848
3849    let mut retired_delegates = Vec::new();
3850    let mut reset_main = Vec::new();
3851    let mut failures = Vec::new();
3852
3853    if let Some(state) = runtime.agent_mob_mcp_state() {
3854        for (mob_id, identity) in delegate_members {
3855            match state.handle_for(&MobId::from(mob_id.as_str())).await {
3856                Ok(handle) => match handle.retire(MeerkatId::from(identity.as_str())).await {
3857                    Ok(()) => retired_delegates.push(json!({
3858                        "identity": identity,
3859                        "mob_id": mob_id,
3860                    })),
3861                    Err(err) => failures.push(json!({
3862                        "identity": identity,
3863                        "mob_id": mob_id,
3864                        "error": err.to_string(),
3865                    })),
3866                },
3867                Err(err) => failures.push(json!({
3868                    "identity": identity,
3869                    "mob_id": mob_id,
3870                    "error": err.to_string(),
3871                })),
3872            }
3873        }
3874    } else if let Some(aggregator) = console_aggregator {
3875        let identities = delegate_members
3876            .into_iter()
3877            .map(|(_, identity)| identity)
3878            .collect::<BTreeSet<_>>();
3879        for identity in identities {
3880            match aggregator.retire_identity(&identity).await {
3881                Ok(true) => retired_delegates.push(json!({ "identity": identity })),
3882                Ok(false) => failures.push(json!({
3883                    "identity": identity,
3884                    "error": "unknown identity",
3885                })),
3886                Err(err) => failures.push(json!({
3887                    "identity": identity,
3888                    "error": err.to_string(),
3889                })),
3890            }
3891        }
3892    }
3893
3894    let handle = runtime.handle();
3895    for spec in baseline_specs {
3896        let identity = spec.identity.to_string();
3897        if current_main_identities.contains(&identity) {
3898            continue;
3899        }
3900        match handle.ensure_member(spec).await {
3901            Ok(_outcome) => {
3902                if let Some(store) = console_events {
3903                    store
3904                        .record_lifecycle(
3905                            &identity,
3906                            "identity_reset",
3907                            json!({ "scope": "reset_all", "restored": true }),
3908                        )
3909                        .await;
3910                }
3911                reset_main.push(identity);
3912            }
3913            Err(err) => failures.push(json!({
3914                "identity": identity,
3915                "error": err.to_string(),
3916            })),
3917        }
3918    }
3919    for identity in main_identities {
3920        if baseline_identities.contains(&identity) && !current_main_identities.contains(&identity) {
3921            continue;
3922        }
3923        if baseline_identities.contains(&identity) {
3924            match handle
3925                .respawn(MeerkatId::from(identity.as_str()), None)
3926                .await
3927            {
3928                Ok(_receipt) => {
3929                    if let Some(store) = console_events {
3930                        store
3931                            .record_lifecycle(
3932                                &identity,
3933                                "identity_reset",
3934                                json!({ "scope": "reset_all" }),
3935                            )
3936                            .await;
3937                    }
3938                    reset_main.push(identity);
3939                }
3940                Err(err) => failures.push(json!({
3941                    "identity": identity,
3942                    "error": err.to_string(),
3943                })),
3944            }
3945        } else {
3946            match handle.retire(MeerkatId::from(identity.as_str())).await {
3947                Ok(()) => {
3948                    if let Some(store) = console_events {
3949                        store
3950                            .record_lifecycle(
3951                                &identity,
3952                                "identity_retired",
3953                                json!({ "scope": "reset_all", "dynamic": true }),
3954                            )
3955                            .await;
3956                    }
3957                    retired_delegates.push(json!({ "identity": identity }));
3958                }
3959                Err(err) => failures.push(json!({
3960                    "identity": identity,
3961                    "error": err.to_string(),
3962                })),
3963            }
3964        }
3965    }
3966
3967    let startup_history = if let Some(aggregator) = console_aggregator {
3968        aggregator.clear_timeline_frames().await?;
3969        Some(
3970            wait_for_reset_startup_history(
3971                aggregator,
3972                baseline_identities.iter().cloned().collect(),
3973                Duration::from_mins(1),
3974            )
3975            .await?,
3976        )
3977    } else {
3978        None
3979    };
3980
3981    Ok(json!({
3982        "reset": reset_main,
3983        "retired_delegates": retired_delegates,
3984        "failed": failures,
3985        "startup_history": startup_history,
3986    }))
3987}
3988
3989async fn wait_for_reset_startup_history(
3990    aggregator: &MobKitConsoleAggregator,
3991    identities: BTreeSet<String>,
3992    timeout: Duration,
3993) -> ConsoleLogResult<Value> {
3994    if identities.is_empty() {
3995        return Ok(json!({
3996            "timeout": false,
3997            "ready": Vec::<String>::new(),
3998            "pending": Vec::<String>::new(),
3999        }));
4000    }
4001
4002    let deadline = Instant::now() + timeout;
4003    let mut pending = identities;
4004    let mut ready = BTreeSet::new();
4005    while !pending.is_empty() {
4006        for identity in pending.clone() {
4007            let page = aggregator
4008                .query_timeline(ConsoleTimelineQuery {
4009                    identity: Some(identity.clone()),
4010                    limit: 1000,
4011                    ..ConsoleTimelineQuery::default()
4012                })
4013                .await?;
4014            let startup_completed = page.frames.iter().any(|frame| {
4015                matches!(
4016                    frame.kind.as_str(),
4017                    "interaction_complete" | "turn_completed"
4018                )
4019            });
4020            if startup_completed {
4021                pending.remove(&identity);
4022                ready.insert(identity);
4023            }
4024        }
4025
4026        if pending.is_empty() {
4027            break;
4028        }
4029        if Instant::now() >= deadline {
4030            return Ok(json!({
4031                "timeout": true,
4032                "ready": ready.into_iter().collect::<Vec<_>>(),
4033                "pending": pending.into_iter().collect::<Vec<_>>(),
4034            }));
4035        }
4036        tokio::time::sleep(Duration::from_millis(250)).await;
4037    }
4038
4039    Ok(json!({
4040        "timeout": false,
4041        "ready": ready.into_iter().collect::<Vec<_>>(),
4042        "pending": Vec::<String>::new(),
4043    }))
4044}
4045
4046fn dedupe_console_members_by_identity(members: &mut Vec<ConsoleMember>) {
4047    let mut seen_member_ids = BTreeSet::new();
4048    members.retain(|member| seen_member_ids.insert(member.agent_identity.clone()));
4049}
4050
4051async fn project_console_members_from_handle(
4052    handle: &MobHandle,
4053    host_identity: Option<&str>,
4054    source_mob_id: Option<&str>,
4055    read_model: &ConsoleSnapshotReadModelState,
4056) -> (Vec<ConsoleMember>, BTreeMap<String, String>) {
4057    let entries = handle.list_all_members().await;
4058    let mut members = Vec::with_capacity(entries.len());
4059    let mut session_owner_by_id = BTreeMap::new();
4060    for entry in &entries {
4061        let identity = entry.agent_identity.to_string();
4062        let session_id = read_model.session_id_by_identity.get(&identity).cloned();
4063        if let Some(session_id) = session_id.as_ref() {
4064            session_owner_by_id.insert(session_id.clone(), identity.clone());
4065        }
4066        let model_capabilities =
4067            model_capabilities_for_role(handle.definition(), entry.role.as_str());
4068        let mut labels = entry.labels.clone();
4069        if let Some(host_identity) = host_identity {
4070            labels
4071                .entry("delegate_host_identity".to_string())
4072                .or_insert_with(|| host_identity.to_string());
4073            labels
4074                .entry("group".to_string())
4075                .or_insert_with(|| "Coordinators".to_string());
4076        }
4077        if let Some(source_mob_id) = source_mob_id {
4078            labels
4079                .entry("source_mob_id".to_string())
4080                .or_insert_with(|| source_mob_id.to_string());
4081        }
4082        let mut wired_to: Vec<String> = entry.wired_to.iter().map(ToString::to_string).collect();
4083        if let Some(host_identity) = host_identity
4084            && !wired_to.iter().any(|peer| peer == host_identity)
4085        {
4086            wired_to.push(host_identity.to_string());
4087        }
4088        members.push(ConsoleMember {
4089            agent_identity: identity,
4090            role: entry.role.to_string(),
4091            state: match entry.state {
4092                meerkat_mob::MemberState::Active => MEMBER_STATE_ACTIVE.to_string(),
4093                meerkat_mob::MemberState::Retiring => MEMBER_STATE_RETIRING.to_string(),
4094            },
4095            model_capabilities,
4096            runtime_mode: Some(entry.runtime_mode.to_string()),
4097            session_id,
4098            wired_to,
4099            labels,
4100        });
4101    }
4102    (members, session_owner_by_id)
4103}
4104
4105async fn build_aggregator_live_snapshot(
4106    aggregator: &MobKitConsoleAggregator,
4107    config_module_ids: &[String],
4108) -> Result<ConsoleLiveSnapshot, Box<dyn std::error::Error + Send + Sync>> {
4109    let identities = aggregator.list_identities().await?;
4110    let mut members = Vec::with_capacity(identities.len());
4111    for identity in &identities {
4112        let mut labels = identity.labels.clone();
4113        labels
4114            .entry("display_name".to_string())
4115            .or_insert_with(|| identity.display_name.clone());
4116        labels
4117            .entry("addressable".to_string())
4118            .or_insert_with(|| identity.addressable.to_string());
4119        let wired_to = aggregator
4120            .inspect_identity(&identity.identity)
4121            .await
4122            .ok()
4123            .flatten()
4124            .map(|inspection| inspection.peers)
4125            .unwrap_or_default();
4126        members.push(ConsoleMember {
4127            agent_identity: identity.identity.clone(),
4128            role: labels
4129                .get("role")
4130                .cloned()
4131                .unwrap_or_else(|| "identity".to_string()),
4132            state: identity.health.clone(),
4133            model_capabilities: ConsoleModelCapabilities::default(),
4134            runtime_mode: Some("console_aggregator".to_string()),
4135            session_id: identity.session_id.clone(),
4136            wired_to,
4137            labels,
4138        });
4139    }
4140    members.sort_by(|left, right| left.agent_identity.cmp(&right.agent_identity));
4141    let agents = members
4142        .iter()
4143        .map(|member| ConsoleAgentLiveSnapshot {
4144            agent_id: member.agent_identity.clone(),
4145            member_id: member.agent_identity.clone(),
4146            label: member
4147                .labels
4148                .get("display_name")
4149                .cloned()
4150                .unwrap_or_else(|| member.agent_identity.clone()),
4151            kind: "meerkat".to_string(),
4152            identity: Some(member.agent_identity.clone()),
4153            role: Some(member.role.clone()),
4154            state: Some(member.state.clone()),
4155            session_id: member.session_id.clone(),
4156            model_capabilities: member.model_capabilities.clone(),
4157            response_phase: None,
4158            watched: None,
4159            alert_level: None,
4160            degraded: None,
4161            degraded_reason: None,
4162        })
4163        .collect::<Vec<_>>();
4164    let loaded_modules = if config_module_ids.is_empty() {
4165        members
4166            .iter()
4167            .map(|member| member.agent_identity.clone())
4168            .collect()
4169    } else {
4170        config_module_ids.to_vec()
4171    };
4172    Ok(ConsoleLiveSnapshot::new(
4173        Some("console-aggregator".to_string()),
4174        true,
4175        loaded_modules,
4176        agents,
4177        members,
4178        true,
4179    ))
4180}
4181
4182pub async fn console_frontend_index_handler() -> impl IntoResponse {
4183    (
4184        [
4185            (header::CONTENT_TYPE, "text/html; charset=utf-8"),
4186            (header::CACHE_CONTROL, "no-store"),
4187        ],
4188        CONSOLE_FRONTEND_INDEX_HTML,
4189    )
4190}
4191
4192pub async fn console_frontend_app_js_handler() -> impl IntoResponse {
4193    (
4194        [
4195            (
4196                header::CONTENT_TYPE,
4197                "application/javascript; charset=utf-8",
4198            ),
4199            (header::CACHE_CONTROL, "no-store"),
4200        ],
4201        CONSOLE_FRONTEND_APP_JS,
4202    )
4203}
4204
4205pub async fn console_frontend_app_css_handler() -> impl IntoResponse {
4206    (
4207        [
4208            (header::CONTENT_TYPE, "text/css; charset=utf-8"),
4209            (header::CACHE_CONTROL, "no-store"),
4210        ],
4211        CONSOLE_FRONTEND_APP_CSS,
4212    )
4213}
4214
4215#[cfg(test)]
4216mod tests {
4217    use super::{
4218        ConsoleSnapshotReadModel, ConsoleSnapshotReadModelState, MAX_MULTIPART_BODY_BYTES,
4219        MAX_MULTIPART_IMAGE_BYTES, MultipartImageUpload, apply_console_visibility_policy,
4220        collect_console_snapshot_read_model, console_send_identity_first, cursor_is_after,
4221        dedupe_console_members_by_identity, externalize_image_upload_placeholders,
4222        externalize_single_image_upload, handle_console_aggregator_rpc,
4223        project_console_members_from_handle, query_timeline_snapshot,
4224    };
4225    use crate::blob_store::{BinaryBlobStore, ObjectStoreBlobStore};
4226    use crate::console_aggregator::{
4227        AllowAllConsoleVisibilityPolicy, HideImplicitDelegateMembersConsoleVisibilityPolicy,
4228    };
4229    use crate::console_aggregator::{
4230        ConsoleCursor, ConsoleFrameSource, ConsoleFrameSourceKind, ConsoleFrameStatus,
4231        ConsoleTimelineQuery, MobKitConsoleAggregator, NewConsoleFrame,
4232    };
4233    use crate::identity_first::{
4234        AgentAddressability, AgentIdentity, AgentRuntimeId, CheckpointVersion,
4235        ContinuityGeneration, ContinuityRecord, DurabilityPolicy, DurableAgentSpec, FencingToken,
4236        IdentityLifecycleState, IdentityRuntime, IdentityRuntimeConfig, LeaseGrant,
4237        LocalContinuityStore, LocalLeaseProvider,
4238    };
4239    use crate::mob_handle_runtime::{MobRuntime, model_capabilities_for_role};
4240    use crate::rpc::{JSONRPC_VERSION, JsonRpcRequest};
4241    use crate::runtime::{ConsoleAgentLiveSnapshot, ConsoleLiveSnapshot, ConsoleMember};
4242    use crate::unified_runtime::ConsoleEventStore;
4243    use crate::{MobBootstrapOptions, MobBootstrapSpec};
4244    use bytes::Bytes;
4245    use meerkat::{AgentFactory, Config, build_ephemeral_service};
4246    use meerkat_client::TestClient;
4247    use meerkat_mob::ProfileName;
4248    use meerkat_mob::{MobDefinition, MobStorage, SpawnMemberSpec};
4249    use serde_json::{Value, json};
4250    use std::collections::BTreeMap;
4251    use std::sync::Arc;
4252    use std::time::Duration;
4253
4254    async fn build_empty_console_test_runtime(
4255        mob_id: &str,
4256    ) -> Result<(tempfile::TempDir, MobRuntime), Box<dyn std::error::Error + Send + Sync>> {
4257        let temp_dir = tempfile::tempdir()?;
4258        let session_path = temp_dir.path().join("sessions");
4259        std::fs::create_dir_all(&session_path)?;
4260        let factory = AgentFactory::new(&session_path).comms(true);
4261        let session_service = Arc::new(build_ephemeral_service(factory, Config::default(), 16));
4262        let definition = MobDefinition::from_toml(&format!(
4263            r#"
4264[mob]
4265id = "{mob_id}"
4266
4267[profiles.worker]
4268model = "gpt-5.5"
4269external_addressable = true
4270
4271[profiles.worker.tools]
4272comms = true
4273"#
4274        ))?;
4275        let runtime = MobRuntime::bootstrap(
4276            MobBootstrapSpec::new(definition, MobStorage::in_memory(), session_service)
4277                .with_options(MobBootstrapOptions {
4278                    allow_ephemeral_sessions: true,
4279                    notify_orchestrator_on_resume: true,
4280                    default_llm_client: Some(Arc::new(TestClient::default())),
4281                }),
4282        )
4283        .await?;
4284        Ok((temp_dir, runtime))
4285    }
4286
4287    fn rpc_request(method: &str) -> JsonRpcRequest {
4288        JsonRpcRequest {
4289            jsonrpc: JSONRPC_VERSION.to_string(),
4290            id: Some(json!(1)),
4291            method: method.to_string(),
4292            params: json!({}),
4293        }
4294    }
4295
4296    #[tokio::test]
4297    async fn identity_first_console_send_reserves_timeline_and_uses_identity_runtime()
4298    -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4299        let identity = AgentIdentity::parse("agent:console")?;
4300        let record = ContinuityRecord {
4301            identity: identity.clone(),
4302            agent_runtime_id: AgentRuntimeId::parse("rt:agent:console:0")?,
4303            session_id: meerkat_core::types::SessionId::new(),
4304            generation: ContinuityGeneration::new(0),
4305            checkpoint_version: CheckpointVersion::new(0),
4306        };
4307        let runtime = IdentityRuntime::new(IdentityRuntimeConfig {
4308            continuity_store: Arc::new(LocalContinuityStore::in_memory()?),
4309            lease_provider: Arc::new(LocalLeaseProvider::new()),
4310            runtime_instance_id: "console-test".to_string(),
4311            has_runtime_store: true,
4312            durability_policy: DurabilityPolicy::SyncWriteThrough,
4313            bridge: None,
4314            default_timeout: None,
4315        });
4316        runtime
4317            .register(
4318                DurableAgentSpec {
4319                    identity: identity.clone(),
4320                    profile: ProfileName::from("default"),
4321                    addressability: AgentAddressability::Addressable,
4322                    display_name: None,
4323                    labels: BTreeMap::new(),
4324                    context: None,
4325                    additional_instructions: Vec::new(),
4326                    initial_message: None,
4327                    runtime_mode_override: None,
4328                },
4329                IdentityLifecycleState::Active,
4330                Some(record.clone()),
4331                Some(LeaseGrant {
4332                    identity: identity.clone(),
4333                    fencing_token: FencingToken::new(7),
4334                    ttl: Duration::from_mins(1),
4335                }),
4336            )
4337            .await;
4338
4339        let aggregator = MobKitConsoleAggregator::in_memory();
4340        let events = ConsoleEventStore::new();
4341        let accepted = console_send_identity_first(
4342            &aggregator,
4343            &runtime,
4344            Some(&events),
4345            crate::console_aggregator::ConsoleSendRequest {
4346                identity: identity.as_str().to_string(),
4347                content: serde_json::to_value(meerkat_core::ContentInput::Text(
4348                    "hello".to_string(),
4349                ))?,
4350                origin: "test".to_string(),
4351                idempotency_key: "idem-1".to_string(),
4352                handling_mode: None,
4353            },
4354        )
4355        .await?;
4356
4357        assert_eq!(accepted.identity, identity.as_str());
4358        assert_eq!(accepted.status, ConsoleFrameStatus::Accepted);
4359        assert_eq!(accepted.session_id, Some(record.session_id.to_string()));
4360
4361        let page = aggregator
4362            .query_timeline(ConsoleTimelineQuery {
4363                identity: Some(identity.as_str().to_string()),
4364                ..ConsoleTimelineQuery::default()
4365            })
4366            .await?;
4367        assert_eq!(page.frames.len(), 1);
4368        assert_eq!(page.frames[0].runtime_key, "identity-first");
4369        assert_eq!(page.frames[0].status, ConsoleFrameStatus::Accepted);
4370        assert_eq!(
4371            page.frames[0].session_id,
4372            Some(record.session_id.to_string())
4373        );
4374        Ok(())
4375    }
4376
4377    #[test]
4378    fn multipart_body_limit_covers_configured_image_limit() {
4379        const _: () = assert!(MAX_MULTIPART_BODY_BYTES > MAX_MULTIPART_IMAGE_BYTES);
4380        const _: () = assert!(MAX_MULTIPART_BODY_BYTES > 2 * 1024 * 1024);
4381    }
4382
4383    /// Cold-cache contract: a `prime_now` waiter that arrives while
4384    /// another task holds `refresh_lock` must park on the lock and
4385    /// resume after that task releases it. No race-prone signaling
4386    /// involved — the lock acquisition itself IS the signal that the
4387    /// in-flight refresh has finished and `primed` is true.
4388    ///
4389    /// Test shape: hold `refresh_lock` from the test thread (no real
4390    /// refresh task), spawn a `prime_now`-style waiter, then set
4391    /// `primed` + drop the lock. The waiter must observe `primed`
4392    /// after acquiring the lock and return without redoing the
4393    /// refresh (we'd otherwise deadlock since we don't supply a
4394    /// real `MobRuntime`).
4395    #[tokio::test]
4396    async fn cold_cache_waiter_resumes_when_refresh_lock_drops()
4397    -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4398        use std::sync::atomic::Ordering;
4399        use tokio::time::Duration;
4400
4401        let model = ConsoleSnapshotReadModel::default();
4402        let guard = model
4403            .refresh_lock
4404            .clone()
4405            .try_lock_owned()
4406            .map_err(|_| "refresh_lock unexpectedly contended at test start")?;
4407
4408        let model_for_waiter = model.clone();
4409        let waiter = tokio::spawn(async move {
4410            // Inlined `prime_now` shape (skips the runtime call,
4411            // since the test will set `primed` before this acquires).
4412            if model_for_waiter
4413                .primed
4414                .load(std::sync::atomic::Ordering::Acquire)
4415            {
4416                return;
4417            }
4418            let _wait_guard = model_for_waiter.refresh_lock.clone().lock_owned().await;
4419            // After acquiring, `primed` must be true (the "refresher"
4420            // — i.e., the test thread — set it before releasing).
4421            assert!(
4422                model_for_waiter
4423                    .primed
4424                    .load(std::sync::atomic::Ordering::Acquire),
4425                "waiter acquired lock but primed is still false"
4426            );
4427        });
4428
4429        // Give the waiter time to reach `lock_owned().await`.
4430        tokio::time::sleep(Duration::from_millis(20)).await;
4431
4432        // Set primed, then release the lock. The waiter parked on
4433        // `lock_owned()` should acquire it immediately.
4434        model.primed.store(true, Ordering::Release);
4435        drop(guard);
4436
4437        let result = tokio::time::timeout(Duration::from_secs(1), waiter).await;
4438        assert!(
4439            result.is_ok(),
4440            "waiter should resume once the refresh lock drops"
4441        );
4442        Ok(())
4443    }
4444
4445    /// Companion: when `primed` is already set, `snapshot()` returns
4446    /// without touching the refresh lock at all. Guards against an
4447    /// over-eager `prime_now` that would deadlock during normal
4448    /// (hot-cache) traffic.
4449    #[tokio::test]
4450    async fn snapshot_skips_refresh_lock_when_already_primed()
4451    -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4452        use std::sync::atomic::Ordering;
4453        use tokio::time::Duration;
4454
4455        let model = ConsoleSnapshotReadModel::default();
4456        model.primed.store(true, Ordering::Release);
4457        // Pre-acquire the refresh lock to prove it isn't touched.
4458        let _guard = model
4459            .refresh_lock
4460            .clone()
4461            .try_lock_owned()
4462            .map_err(|_| "refresh_lock unexpectedly contended at test start")?;
4463
4464        // `snapshot()` calls `prime_now` only on cold cache; with
4465        // primed=true the lock-await branch must not be reached.
4466        // We test the contract via direct inspection: if `prime_now`
4467        // accidentally tried `lock_owned().await` here, this would
4468        // hang. The timeout below is the deadlock guard.
4469        let snap_fast_path = async {
4470            assert!(
4471                model.primed.load(Ordering::Acquire),
4472                "primed precondition for hot-cache path"
4473            );
4474        };
4475        let result = tokio::time::timeout(Duration::from_millis(100), snap_fast_path).await;
4476        assert!(result.is_ok(), "hot-cache snapshot path should not block");
4477        Ok(())
4478    }
4479
4480    #[tokio::test]
4481    async fn console_aggregator_reset_all_rpc_force_refreshes_identity_cache()
4482    -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4483        let (_temp_dir, runtime) =
4484            build_empty_console_test_runtime("console-reset-fresh-identity-cache").await?;
4485        let aggregator = MobKitConsoleAggregator::in_memory();
4486        aggregator.register_runtime_handles_with_policy(
4487            "runtime-reset",
4488            "reset",
4489            runtime.clone(),
4490            ConsoleEventStore::new(),
4491            Arc::new(AllowAllConsoleVisibilityPolicy),
4492        );
4493        let primed_empty = aggregator.list_identities().await?;
4494        assert!(
4495            primed_empty.is_empty(),
4496            "test precondition: identity cache should be primed empty before late spawn"
4497        );
4498
4499        runtime
4500            .handle()
4501            .spawn_spec(SpawnMemberSpec::from_wire(
4502                "worker".to_string(),
4503                "agent-reset".to_string(),
4504                Some("You are agent-reset.".into()),
4505                None,
4506                None,
4507            ))
4508            .await?;
4509
4510        let response =
4511            handle_console_aggregator_rpc(Some(aggregator), rpc_request("mobkit/reset_all"), true)
4512                .await;
4513
4514        assert_eq!(response["error"], Value::Null);
4515        assert_eq!(response["result"]["retired"], json!(["reset/agent-reset"]));
4516        let _ = runtime.handle().stop().await;
4517        Ok(())
4518    }
4519
4520    #[test]
4521    fn timeline_stream_cursor_filter_uses_numeric_console_sequence() {
4522        assert!(cursor_is_after(
4523            &ConsoleCursor::from("console:10"),
4524            &ConsoleCursor::from("console:9")
4525        ));
4526        assert!(!cursor_is_after(
4527            &ConsoleCursor::from("console:9"),
4528            &ConsoleCursor::from("console:10")
4529        ));
4530    }
4531
4532    #[test]
4533    fn console_live_snapshot_dedupes_repeated_delegate_identities() {
4534        let mut members = vec![
4535            ConsoleMember {
4536                agent_identity: "incident-commander".to_string(),
4537                role: "commander".to_string(),
4538                state: "active".to_string(),
4539                model_capabilities: Default::default(),
4540                runtime_mode: None,
4541                session_id: None,
4542                wired_to: Vec::new(),
4543                labels: BTreeMap::new(),
4544            },
4545            ConsoleMember {
4546                agent_identity: "qa-child".to_string(),
4547                role: "delegate".to_string(),
4548                state: "active".to_string(),
4549                model_capabilities: Default::default(),
4550                runtime_mode: None,
4551                session_id: Some("first".to_string()),
4552                wired_to: vec!["qa-parent".to_string()],
4553                labels: BTreeMap::from([(
4554                    "delegate_host_identity".to_string(),
4555                    "qa-parent".to_string(),
4556                )]),
4557            },
4558            ConsoleMember {
4559                agent_identity: "qa-child".to_string(),
4560                role: "delegate".to_string(),
4561                state: "active".to_string(),
4562                model_capabilities: Default::default(),
4563                runtime_mode: None,
4564                session_id: Some("second".to_string()),
4565                wired_to: vec!["qa-parent".to_string()],
4566                labels: BTreeMap::from([(
4567                    "delegate_host_identity".to_string(),
4568                    "qa-parent".to_string(),
4569                )]),
4570            },
4571        ];
4572
4573        dedupe_console_members_by_identity(&mut members);
4574
4575        assert_eq!(
4576            members
4577                .iter()
4578                .map(|member| member.agent_identity.as_str())
4579                .collect::<Vec<_>>(),
4580            vec!["incident-commander", "qa-child"]
4581        );
4582        assert_eq!(members[1].session_id.as_deref(), Some("first"));
4583    }
4584
4585    #[test]
4586    fn console_visibility_policy_hides_implicit_delegate_members_from_snapshot() {
4587        let mut snapshot = ConsoleLiveSnapshot::new(
4588            Some("runtime".to_string()),
4589            true,
4590            vec!["incident-commander".to_string(), "qa-child".to_string()],
4591            vec![
4592                ConsoleAgentLiveSnapshot {
4593                    agent_id: "incident-commander".to_string(),
4594                    member_id: "incident-commander".to_string(),
4595                    label: "Incident Commander".to_string(),
4596                    kind: "meerkat".to_string(),
4597                    identity: Some("incident-commander".to_string()),
4598                    role: Some("commander".to_string()),
4599                    state: Some("active".to_string()),
4600                    session_id: None,
4601                    model_capabilities: Default::default(),
4602                    response_phase: None,
4603                    watched: None,
4604                    alert_level: None,
4605                    degraded: None,
4606                    degraded_reason: None,
4607                },
4608                ConsoleAgentLiveSnapshot {
4609                    agent_id: "qa-child".to_string(),
4610                    member_id: "qa-child".to_string(),
4611                    label: "QA Child".to_string(),
4612                    kind: "meerkat".to_string(),
4613                    identity: Some("qa-child".to_string()),
4614                    role: Some("delegate".to_string()),
4615                    state: Some("active".to_string()),
4616                    session_id: Some("delegate-session".to_string()),
4617                    model_capabilities: Default::default(),
4618                    response_phase: None,
4619                    watched: None,
4620                    alert_level: None,
4621                    degraded: None,
4622                    degraded_reason: None,
4623                },
4624            ],
4625            vec![
4626                ConsoleMember {
4627                    agent_identity: "incident-commander".to_string(),
4628                    role: "commander".to_string(),
4629                    state: "active".to_string(),
4630                    model_capabilities: Default::default(),
4631                    runtime_mode: None,
4632                    session_id: None,
4633                    wired_to: Vec::new(),
4634                    labels: BTreeMap::new(),
4635                },
4636                ConsoleMember {
4637                    agent_identity: "qa-child".to_string(),
4638                    role: "delegate".to_string(),
4639                    state: "active".to_string(),
4640                    model_capabilities: Default::default(),
4641                    runtime_mode: None,
4642                    session_id: Some("delegate-session".to_string()),
4643                    wired_to: vec!["qa-parent".to_string()],
4644                    labels: BTreeMap::from([(
4645                        "source_mob_id".to_string(),
4646                        "implicit-qa-mob".to_string(),
4647                    )]),
4648                },
4649            ],
4650            true,
4651        );
4652
4653        apply_console_visibility_policy(
4654            &mut snapshot,
4655            &HideImplicitDelegateMembersConsoleVisibilityPolicy,
4656        );
4657
4658        assert_eq!(
4659            snapshot
4660                .members
4661                .iter()
4662                .map(|member| member.agent_identity.as_str())
4663                .collect::<Vec<_>>(),
4664            vec!["incident-commander"]
4665        );
4666        assert_eq!(
4667            snapshot
4668                .agents
4669                .iter()
4670                .map(|agent| agent.agent_id.as_str())
4671                .collect::<Vec<_>>(),
4672            vec!["incident-commander"]
4673        );
4674        assert_eq!(snapshot.loaded_modules, vec!["incident-commander"]);
4675    }
4676
4677    #[tokio::test]
4678    async fn live_snapshot_member_projection_uses_roster_profile_capabilities()
4679    -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4680        let temp_dir = tempfile::tempdir()?;
4681        let session_path = temp_dir.path().join("sessions");
4682        std::fs::create_dir_all(&session_path)?;
4683        let factory = AgentFactory::new(&session_path).comms(true);
4684        let session_service = Arc::new(build_ephemeral_service(factory, Config::default(), 16));
4685        let definition = MobDefinition::from_toml(
4686            r#"
4687[mob]
4688id = "console-snapshot-test"
4689
4690[profiles.worker]
4691model = "gpt-5.5"
4692
4693[profiles.worker.tools]
4694comms = true
4695"#,
4696        )?;
4697        let expected = model_capabilities_for_role(&definition, "worker");
4698        let runtime = MobRuntime::bootstrap(
4699            MobBootstrapSpec::new(definition, MobStorage::in_memory(), session_service)
4700                .with_options(MobBootstrapOptions {
4701                    allow_ephemeral_sessions: true,
4702                    notify_orchestrator_on_resume: true,
4703                    default_llm_client: Some(Arc::new(TestClient::default())),
4704                }),
4705        )
4706        .await?;
4707        runtime
4708            .handle()
4709            .spawn_spec(SpawnMemberSpec::from_wire(
4710                "worker".to_string(),
4711                "worker:one".to_string(),
4712                Some("You are worker one.".into()),
4713                None,
4714                None,
4715            ))
4716            .await?;
4717
4718        let empty_read_model = ConsoleSnapshotReadModelState::default();
4719        let (members, session_owner_by_id) =
4720            project_console_members_from_handle(&runtime.handle(), None, None, &empty_read_model)
4721                .await;
4722
4723        assert_eq!(members.len(), 1);
4724        assert_eq!(members[0].model_capabilities, expected);
4725        assert_eq!(members[0].session_id, None);
4726        assert!(session_owner_by_id.is_empty());
4727
4728        let refreshed_read_model = collect_console_snapshot_read_model(&runtime).await;
4729        let (members, session_owner_by_id) = project_console_members_from_handle(
4730            &runtime.handle(),
4731            None,
4732            None,
4733            &refreshed_read_model,
4734        )
4735        .await;
4736        assert_eq!(
4737            members[0].session_id.as_ref(),
4738            session_owner_by_id.keys().next()
4739        );
4740
4741        // Materialized cache: the refresh should have populated
4742        // `primary_members` with exactly the same shape that the
4743        // synchronous projection produces. `build_live_snapshot` reads
4744        // straight from this slot — never calls `handle.list_all_members`
4745        // — so this assertion is the cache's contract.
4746        assert_eq!(
4747            refreshed_read_model.primary_members.len(),
4748            members.len(),
4749            "primary_members cache should hold the same members as live projection"
4750        );
4751        assert_eq!(
4752            refreshed_read_model.primary_members[0].agent_identity,
4753            members[0].agent_identity
4754        );
4755        assert_eq!(
4756            refreshed_read_model.primary_members[0].session_id,
4757            members[0].session_id
4758        );
4759        Ok(())
4760    }
4761
4762    #[tokio::test]
4763    async fn fresh_timeline_snapshot_reads_tail_without_full_log_replay()
4764    -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4765        let aggregator = MobKitConsoleAggregator::in_memory();
4766        for idx in 0..25_000 {
4767            aggregator
4768                .store()
4769                .append_if_absent(NewConsoleFrame {
4770                    id: None,
4771                    dedupe_key: format!("event-{idx}"),
4772                    timestamp_ms: idx,
4773                    runtime_key: "runtime-a".to_string(),
4774                    identity: "agent-a".to_string(),
4775                    conversation_id: Some("agent-a".to_string()),
4776                    session_id: None,
4777                    kind: "text_delta".to_string(),
4778                    status: ConsoleFrameStatus::Completed,
4779                    payload: json!({ "delta": idx }),
4780                    source: ConsoleFrameSource {
4781                        kind: ConsoleFrameSourceKind::ConsoleEvent,
4782                        source_cursor: None,
4783                    },
4784                    source_event_id: Some(format!("event-{idx}")),
4785                    interaction_id: None,
4786                    turn_id: None,
4787                    run_id: None,
4788                    parent_frame_id: None,
4789                    caused_by_frame_id: None,
4790                })
4791                .await?;
4792        }
4793
4794        let (frames, cursor) = query_timeline_snapshot(
4795            &aggregator,
4796            ConsoleTimelineQuery {
4797                identity: Some("agent-a".to_string()),
4798                after: None,
4799                limit: 200,
4800                ..ConsoleTimelineQuery::default()
4801            },
4802        )
4803        .await?;
4804
4805        assert!(!frames.is_empty());
4806        assert_eq!(cursor.as_ref().and_then(ConsoleCursor::seq), Some(25_000));
4807        assert_eq!(
4808            frames.last().and_then(|frame| frame.cursor.seq()),
4809            Some(25_000)
4810        );
4811        Ok(())
4812    }
4813
4814    #[tokio::test]
4815    async fn fresh_timeline_snapshot_keeps_sparse_identity_frames()
4816    -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4817        let aggregator = MobKitConsoleAggregator::in_memory();
4818        aggregator
4819            .store()
4820            .append_if_absent(NewConsoleFrame {
4821                id: None,
4822                dedupe_key: "sparse-event".to_string(),
4823                timestamp_ms: 1,
4824                runtime_key: "runtime-a".to_string(),
4825                identity: "sparse-agent".to_string(),
4826                conversation_id: Some("sparse-agent".to_string()),
4827                session_id: None,
4828                kind: "text_complete".to_string(),
4829                status: ConsoleFrameStatus::Completed,
4830                payload: json!({ "text": "still visible" }),
4831                source: ConsoleFrameSource {
4832                    kind: ConsoleFrameSourceKind::ConsoleEvent,
4833                    source_cursor: None,
4834                },
4835                source_event_id: Some("sparse-event".to_string()),
4836                interaction_id: None,
4837                turn_id: None,
4838                run_id: None,
4839                parent_frame_id: None,
4840                caused_by_frame_id: None,
4841            })
4842            .await?;
4843        for idx in 0..25_000 {
4844            aggregator
4845                .store()
4846                .append_if_absent(NewConsoleFrame {
4847                    id: None,
4848                    dedupe_key: format!("other-event-{idx}"),
4849                    timestamp_ms: idx + 2,
4850                    runtime_key: "runtime-a".to_string(),
4851                    identity: "busy-agent".to_string(),
4852                    conversation_id: Some("busy-agent".to_string()),
4853                    session_id: None,
4854                    kind: "text_delta".to_string(),
4855                    status: ConsoleFrameStatus::Completed,
4856                    payload: json!({ "delta": idx }),
4857                    source: ConsoleFrameSource {
4858                        kind: ConsoleFrameSourceKind::ConsoleEvent,
4859                        source_cursor: None,
4860                    },
4861                    source_event_id: Some(format!("other-event-{idx}")),
4862                    interaction_id: None,
4863                    turn_id: None,
4864                    run_id: None,
4865                    parent_frame_id: None,
4866                    caused_by_frame_id: None,
4867                })
4868                .await?;
4869        }
4870
4871        let (frames, cursor) = query_timeline_snapshot(
4872            &aggregator,
4873            ConsoleTimelineQuery {
4874                identity: Some("sparse-agent".to_string()),
4875                after: None,
4876                limit: 200,
4877                ..ConsoleTimelineQuery::default()
4878            },
4879        )
4880        .await?;
4881
4882        assert_eq!(frames.len(), 1);
4883        assert_eq!(frames[0].identity, "sparse-agent");
4884        assert_eq!(frames[0].payload["text"], json!("still visible"));
4885        assert_eq!(cursor.as_ref().and_then(ConsoleCursor::seq), Some(1));
4886        Ok(())
4887    }
4888
4889    #[tokio::test]
4890    async fn timeline_snapshot_clamps_requested_limit_to_store_page_size()
4891    -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4892        let aggregator = MobKitConsoleAggregator::in_memory();
4893        for idx in 0..2_500 {
4894            aggregator
4895                .store()
4896                .append_if_absent(NewConsoleFrame {
4897                    id: None,
4898                    dedupe_key: format!("clamp-event-{idx}"),
4899                    timestamp_ms: idx,
4900                    runtime_key: "runtime-a".to_string(),
4901                    identity: "agent-a".to_string(),
4902                    conversation_id: Some("agent-a".to_string()),
4903                    session_id: None,
4904                    kind: "text_delta".to_string(),
4905                    status: ConsoleFrameStatus::Completed,
4906                    payload: json!({ "delta": idx }),
4907                    source: ConsoleFrameSource {
4908                        kind: ConsoleFrameSourceKind::ConsoleEvent,
4909                        source_cursor: None,
4910                    },
4911                    source_event_id: Some(format!("clamp-event-{idx}")),
4912                    interaction_id: None,
4913                    turn_id: None,
4914                    run_id: None,
4915                    parent_frame_id: None,
4916                    caused_by_frame_id: None,
4917                })
4918                .await?;
4919        }
4920
4921        let (frames, cursor) = query_timeline_snapshot(
4922            &aggregator,
4923            ConsoleTimelineQuery {
4924                identity: Some("agent-a".to_string()),
4925                after: Some(ConsoleCursor::from("console:100")),
4926                limit: 5_000,
4927                ..ConsoleTimelineQuery::default()
4928            },
4929        )
4930        .await?;
4931
4932        assert_eq!(frames.len(), 2_400);
4933        assert_eq!(
4934            frames.first().and_then(|frame| frame.cursor.seq()),
4935            Some(101)
4936        );
4937        assert_eq!(
4938            frames.last().and_then(|frame| frame.cursor.seq()),
4939            Some(2_500)
4940        );
4941        assert_eq!(cursor.as_ref().and_then(ConsoleCursor::seq), Some(2_500));
4942        Ok(())
4943    }
4944
4945    #[tokio::test]
4946    async fn multipart_blob_upload_stores_one_file() -> Result<(), Box<dyn std::error::Error>> {
4947        let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
4948        let mut files = BTreeMap::new();
4949        files.insert(
4950            "upload-1".to_string(),
4951            MultipartImageUpload {
4952                media_type: "image/png".to_string(),
4953                bytes: Bytes::from_static(b"png-data"),
4954            },
4955        );
4956        let result = externalize_single_image_upload(
4957            &json!({
4958                "upload": {
4959                    "type": "image_upload",
4960                    "upload_id": "upload-1",
4961                    "media_type": "image/png"
4962                }
4963            }),
4964            files,
4965            store.clone(),
4966        )
4967        .await
4968        .map_err(std::io::Error::other)?;
4969
4970        assert_eq!(result["media_type"], json!("image/png"));
4971        assert_eq!(result["size"], json!(8));
4972        let Some(blob_id) = result["blob_id"].as_str() else {
4973            return Err(std::io::Error::other("blob id").into());
4974        };
4975        let payload = store
4976            .get_bytes(&meerkat_core::BlobId::from(blob_id))
4977            .await?;
4978        assert_eq!(payload.data.as_ref(), b"png-data");
4979        Ok(())
4980    }
4981
4982    #[tokio::test]
4983    async fn multipart_blob_upload_accepts_part_name_alias()
4984    -> Result<(), Box<dyn std::error::Error>> {
4985        let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
4986        let mut files = BTreeMap::new();
4987        files.insert(
4988            "image-field".to_string(),
4989            MultipartImageUpload {
4990                media_type: "image/png".to_string(),
4991                bytes: Bytes::from_static(b"png-data"),
4992            },
4993        );
4994        let result = externalize_single_image_upload(
4995            &json!({
4996                "upload": {
4997                    "type": "image_upload",
4998                    "part_name": "image-field",
4999                    "media_type": "image/png"
5000                }
5001            }),
5002            files,
5003            store,
5004        )
5005        .await
5006        .map_err(std::io::Error::other)?;
5007
5008        assert_eq!(result["media_type"], json!("image/png"));
5009        assert!(
5010            result["blob_id"]
5011                .as_str()
5012                .is_some_and(|value| value.starts_with("sha256:"))
5013        );
5014        Ok(())
5015    }
5016
5017    #[tokio::test]
5018    async fn multipart_blob_upload_rejects_media_mismatch() -> Result<(), Box<dyn std::error::Error>>
5019    {
5020        let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
5021        let mut files = BTreeMap::new();
5022        files.insert(
5023            "upload-1".to_string(),
5024            MultipartImageUpload {
5025                media_type: "image/jpeg".to_string(),
5026                bytes: Bytes::from_static(b"jpeg-data"),
5027            },
5028        );
5029        let err = match externalize_single_image_upload(
5030            &json!({
5031                "upload": {
5032                    "type": "image_upload",
5033                    "upload_id": "upload-1",
5034                    "media_type": "image/png"
5035                }
5036            }),
5037            files,
5038            store,
5039        )
5040        .await
5041        {
5042            Ok(_) => return Err(std::io::Error::other("media mismatch").into()),
5043            Err(err) => err,
5044        };
5045        assert!(
5046            err.contains("media type mismatch"),
5047            "unexpected error: {err}"
5048        );
5049        Ok(())
5050    }
5051
5052    #[tokio::test]
5053    async fn multipart_blob_upload_rejects_extra_file() -> Result<(), Box<dyn std::error::Error>> {
5054        let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
5055        let mut files = BTreeMap::new();
5056        for id in ["upload-1", "upload-2"] {
5057            files.insert(
5058                id.to_string(),
5059                MultipartImageUpload {
5060                    media_type: "image/png".to_string(),
5061                    bytes: Bytes::from_static(b"png"),
5062                },
5063            );
5064        }
5065        let err = match externalize_single_image_upload(
5066            &json!({
5067                "upload": {
5068                    "type": "image_upload",
5069                    "upload_id": "upload-1",
5070                    "media_type": "image/png"
5071                }
5072            }),
5073            files,
5074            store,
5075        )
5076        .await
5077        {
5078            Ok(_) => return Err(std::io::Error::other("one file only").into()),
5079            Err(err) => err,
5080        };
5081        assert!(
5082            err.contains("exactly one file part"),
5083            "unexpected error: {err}"
5084        );
5085        Ok(())
5086    }
5087
5088    #[tokio::test]
5089    async fn multipart_send_replaces_placeholders_and_removes_shadow_message()
5090    -> Result<(), Box<dyn std::error::Error>> {
5091        let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
5092        let mut files = BTreeMap::new();
5093        files.insert(
5094            "upload-1".to_string(),
5095            MultipartImageUpload {
5096                media_type: "image/webp".to_string(),
5097                bytes: Bytes::from_static(b"webp-data"),
5098            },
5099        );
5100        let mut params = json!({
5101            "member_id": "artist",
5102            "message": "stale shadow text",
5103            "content": [
5104                { "type": "text", "text": "describe" },
5105                {
5106                    "type": "image_upload",
5107                    "upload_id": "upload-1",
5108                    "media_type": "image/webp"
5109                }
5110            ]
5111        });
5112        externalize_image_upload_placeholders(&mut params, files, store)
5113            .await
5114            .map_err(std::io::Error::other)?;
5115
5116        assert!(params.get("message").is_none());
5117        assert_eq!(params["content"][1]["type"], json!("image"));
5118        assert_eq!(params["content"][1]["source"], json!("blob"));
5119        assert_eq!(params["content"][1]["media_type"], json!("image/webp"));
5120        assert!(
5121            params["content"][1]["blob_id"]
5122                .as_str()
5123                .is_some_and(|value| value.starts_with("sha256:"))
5124        );
5125        Ok(())
5126    }
5127
5128    #[tokio::test]
5129    async fn multipart_send_accepts_part_name_placeholder() -> Result<(), Box<dyn std::error::Error>>
5130    {
5131        let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
5132        let mut files = BTreeMap::new();
5133        files.insert(
5134            "image-field".to_string(),
5135            MultipartImageUpload {
5136                media_type: "image/png".to_string(),
5137                bytes: Bytes::from_static(b"png-data"),
5138            },
5139        );
5140        let mut params = json!({
5141            "member_id": "analyst",
5142            "content": [
5143                { "type": "text", "text": "describe" },
5144                {
5145                    "type": "image_upload",
5146                    "part_name": "image-field",
5147                    "media_type": "image/png"
5148                }
5149            ]
5150        });
5151
5152        externalize_image_upload_placeholders(&mut params, files, store)
5153            .await
5154            .map_err(std::io::Error::other)?;
5155
5156        assert_eq!(params["content"][1]["type"], json!("image"));
5157        assert_eq!(params["content"][1]["source"], json!("blob"));
5158        assert_eq!(params["content"][1]["media_type"], json!("image/png"));
5159        Ok(())
5160    }
5161
5162    #[tokio::test]
5163    async fn multipart_send_rejects_placeholder_without_file()
5164    -> Result<(), Box<dyn std::error::Error>> {
5165        let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
5166        let mut params = json!({
5167            "content": [{
5168                "type": "image_upload",
5169                "upload_id": "missing",
5170                "media_type": "image/png"
5171            }]
5172        });
5173        let err = match externalize_image_upload_placeholders(&mut params, BTreeMap::new(), store)
5174            .await
5175        {
5176            Ok(()) => return Err(std::io::Error::other("missing file").into()),
5177            Err(err) => err,
5178        };
5179        assert!(err.contains("missing file part"), "unexpected error: {err}");
5180        Ok(())
5181    }
5182}