1use async_stream::stream;
4use axum::extract::{DefaultBodyLimit, Multipart, Path as AxumPath, Query, State};
5use axum::http::{HeaderMap, HeaderValue, StatusCode, Uri, header};
6use axum::response::sse::{Event, KeepAlive, Sse};
7use axum::response::{IntoResponse, Redirect};
8use axum::routing::{get, post};
9use axum::{Json, Router};
10use base64::Engine;
11use futures::future::join_all;
12use meerkat_core::ContentInput;
13use meerkat_core::comms::TrustedPeerDescriptor;
14use meerkat_mob::MobState;
15use meerkat_mob::ids::{MeerkatId, MobId};
16use meerkat_mob::launch::MemberLaunchMode;
17use meerkat_mob::runtime::reconcile::MemberFilter;
18use meerkat_mob::{MobHandle, PeerTarget, ProfileName, SpawnMemberSpec};
19
20use crate::mob_handle_runtime::{
21 member_entry_to_json, model_capabilities_for_member_entry, model_capabilities_for_role,
22};
23use serde_json::{Value, json};
24use std::collections::{BTreeMap, BTreeSet};
25use std::convert::Infallible;
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use crate::blob_store::{BinaryBlobPayload, BinaryBlobStore, is_valid_blob_id_value};
30use crate::console_aggregator::{
31 AllowAllConsoleVisibilityPolicy, ConsoleCursor, ConsoleFrame, ConsoleLogResult,
32 ConsoleLogStore, ConsoleReplayUnavailable, ConsoleSendError, ConsoleSendRequest,
33 ConsoleTimelineEvent, ConsoleTimelineQuery, ConsoleVisibilityPolicy,
34 HideImplicitDelegateMembersConsoleVisibilityPolicy, MobKitConsoleAggregator,
35};
36use crate::contact_directory::ContactDirectory;
37use crate::http_sse::{DEFAULT_KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TEXT};
38use crate::mob_handle_runtime::{MEMBER_STATE_ACTIVE, MEMBER_STATE_RETIRING, MobRuntime};
39use crate::rpc::{JSONRPC_VERSION, JsonRpcError, JsonRpcRequest, JsonRpcResponse};
40use crate::runtime::MobkitRuntimeHandle;
41use crate::runtime::{
42 ConsoleAgentLiveSnapshot, ConsoleLiveSnapshot, ConsoleMember, ConsoleModelCapabilities,
43 ConsoleRestJsonRequest, DeliveryHistoryRequest, GatingDecideRequest, GatingDecision,
44 RuntimeDecisionState, extract_bearer_token_from_header,
45 handle_console_rest_json_route_with_snapshot, validate_console_token,
46};
47use crate::runtime::{MetadataScope, RuntimeMetadataTable, labels_to_json_value};
48use crate::unified_runtime::console_events::ConsoleEventStore;
49use crate::unified_runtime::mob_events::MobEventsStore;
50use crate::unified_runtime::{EventLogStore, EventQuery};
51
52#[derive(Clone)]
53pub struct ConsoleJsonState {
54 pub decisions: RuntimeDecisionState,
55 pub runtime: Option<MobRuntime>,
56 pub module_runtime: Option<std::sync::Arc<tokio::sync::Mutex<MobkitRuntimeHandle>>>,
57 pub contact_directory: Option<ContactDirectory>,
58 pub event_log: Option<std::sync::Arc<dyn EventLogStore>>,
59 pub gateway_peer_keys: Option<crate::auth::peer_keys::GatewayPeerKeys>,
63 pub(crate) identity_runtime: Option<Arc<crate::identity_first::IdentityRuntime>>,
64 pub(crate) console_events: Option<ConsoleEventStore>,
65 pub(crate) console_aggregator: Option<MobKitConsoleAggregator>,
66 pub(crate) mob_events: Option<MobEventsStore>,
67 pub(crate) metadata_table: Option<std::sync::Arc<RuntimeMetadataTable>>,
68 pub(crate) visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
69 pub(crate) snapshot_read_model: ConsoleSnapshotReadModel,
70}
71
72#[derive(Clone, Default)]
73pub(crate) struct ConsoleSnapshotReadModel {
74 inner: Arc<tokio::sync::RwLock<ConsoleSnapshotReadModelState>>,
75 refresh_lock: Arc<tokio::sync::Mutex<()>>,
81 primed: Arc<std::sync::atomic::AtomicBool>,
85}
86
87#[derive(Clone, Default)]
88struct ConsoleSnapshotReadModelState {
89 running: Option<bool>,
90 session_id_by_identity: BTreeMap<String, String>,
91 session_owner_by_id: BTreeMap<String, String>,
92 primary_members: Vec<ConsoleMember>,
97 delegate_member_groups: Vec<Vec<ConsoleMember>>,
102}
103
104impl ConsoleSnapshotReadModel {
105 async fn snapshot(&self, runtime: &MobRuntime) -> ConsoleSnapshotReadModelState {
112 if !self.primed.load(std::sync::atomic::Ordering::Acquire) {
113 self.prime_now(runtime).await;
114 }
115 self.inner.read().await.clone()
116 }
117
118 async fn prime_now(&self, runtime: &MobRuntime) {
138 if self.primed.load(std::sync::atomic::Ordering::Acquire) {
139 return;
140 }
141 let _guard = self.refresh_lock.clone().lock_owned().await;
142 if self.primed.load(std::sync::atomic::Ordering::Acquire) {
143 return;
144 }
145 let refreshed = collect_console_snapshot_read_model(runtime).await;
146 *self.inner.write().await = refreshed;
147 self.primed
148 .store(true, std::sync::atomic::Ordering::Release);
149 }
153
154 fn refresh_soon(&self, runtime: MobRuntime) {
161 let Ok(runtime_handle) = tokio::runtime::Handle::try_current() else {
162 return;
163 };
164 let Ok(guard) = self.refresh_lock.clone().try_lock_owned() else {
165 return;
166 };
167 let inner = Arc::clone(&self.inner);
168 let primed = Arc::clone(&self.primed);
169 runtime_handle.spawn(async move {
170 let _guard = guard;
171 let refreshed = collect_console_snapshot_read_model(&runtime).await;
172 *inner.write().await = refreshed;
173 primed.store(true, std::sync::atomic::Ordering::Release);
174 });
178 }
179}
180
181const CONSOLE_FRONTEND_INDEX_HTML: &str = include_str!("../console-dist/index.html");
182const CONSOLE_FRONTEND_APP_JS: &str = include_str!("../console-dist/console-app.js");
183const CONSOLE_FRONTEND_APP_CSS: &str = include_str!("../console-dist/console-app.css");
184const MAX_MULTIPART_IMAGE_BYTES: usize = 25 * 1024 * 1024;
185const MAX_MULTIPART_IMAGES: usize = 4;
186const MAX_MULTIPART_BODY_BYTES: usize =
187 (MAX_MULTIPART_IMAGE_BYTES * MAX_MULTIPART_IMAGES) + 1024 * 1024;
188
189pub fn console_json_router(decisions: RuntimeDecisionState) -> Router {
190 console_json_router_with_state(ConsoleJsonState {
191 decisions,
192 runtime: None,
193 module_runtime: None,
194 contact_directory: None,
195 event_log: None,
196 gateway_peer_keys: None,
197 identity_runtime: None,
198 console_events: None,
199 console_aggregator: None,
200 mob_events: None,
201 metadata_table: None,
202 visibility_policy: Arc::new(HideImplicitDelegateMembersConsoleVisibilityPolicy),
203 snapshot_read_model: ConsoleSnapshotReadModel::default(),
204 })
205}
206
207pub fn console_json_router_with_aggregator(
208 decisions: RuntimeDecisionState,
209 console_aggregator: MobKitConsoleAggregator,
210) -> Router {
211 console_json_router_with_state(ConsoleJsonState {
212 decisions,
213 runtime: None,
214 module_runtime: None,
215 contact_directory: None,
216 event_log: None,
217 gateway_peer_keys: None,
218 identity_runtime: None,
219 console_events: None,
220 console_aggregator: Some(console_aggregator),
221 mob_events: None,
222 metadata_table: None,
223 visibility_policy: Arc::new(HideImplicitDelegateMembersConsoleVisibilityPolicy),
224 snapshot_read_model: ConsoleSnapshotReadModel::default(),
225 })
226}
227
228pub fn console_json_router_with_runtime(
229 decisions: RuntimeDecisionState,
230 runtime: MobRuntime,
231 contact_directory: Option<ContactDirectory>,
232 event_log: Option<std::sync::Arc<dyn EventLogStore>>,
233) -> Router {
234 console_json_router_with_runtime_and_events(
235 decisions,
236 runtime,
237 None,
238 contact_directory,
239 event_log,
240 None,
241 None,
242 None,
243 None,
244 None,
245 None,
246 )
247}
248
249#[allow(clippy::too_many_arguments)]
250pub(crate) fn console_json_router_with_runtime_and_events(
251 decisions: RuntimeDecisionState,
252 runtime: MobRuntime,
253 module_runtime: Option<std::sync::Arc<tokio::sync::Mutex<MobkitRuntimeHandle>>>,
254 contact_directory: Option<ContactDirectory>,
255 event_log: Option<std::sync::Arc<dyn EventLogStore>>,
256 gateway_peer_keys: Option<crate::auth::peer_keys::GatewayPeerKeys>,
257 console_events: Option<ConsoleEventStore>,
258 console_log_store: Option<std::sync::Arc<dyn ConsoleLogStore>>,
259 mob_events: Option<MobEventsStore>,
260 metadata_table: Option<std::sync::Arc<RuntimeMetadataTable>>,
261 identity_runtime: Option<Arc<crate::identity_first::IdentityRuntime>>,
262) -> Router {
263 console_json_router_with_runtime_events_and_policy(
264 decisions,
265 runtime,
266 module_runtime,
267 contact_directory,
268 event_log,
269 gateway_peer_keys,
270 console_events,
271 console_log_store,
272 mob_events,
273 metadata_table,
274 identity_runtime,
275 Arc::new(HideImplicitDelegateMembersConsoleVisibilityPolicy),
276 )
277}
278
279#[allow(clippy::too_many_arguments)]
280pub(crate) fn console_json_router_with_runtime_events_and_policy(
281 decisions: RuntimeDecisionState,
282 runtime: MobRuntime,
283 module_runtime: Option<std::sync::Arc<tokio::sync::Mutex<MobkitRuntimeHandle>>>,
284 contact_directory: Option<ContactDirectory>,
285 event_log: Option<std::sync::Arc<dyn EventLogStore>>,
286 gateway_peer_keys: Option<crate::auth::peer_keys::GatewayPeerKeys>,
287 console_events: Option<ConsoleEventStore>,
288 console_log_store: Option<std::sync::Arc<dyn ConsoleLogStore>>,
289 mob_events: Option<MobEventsStore>,
290 metadata_table: Option<std::sync::Arc<RuntimeMetadataTable>>,
291 identity_runtime: Option<Arc<crate::identity_first::IdentityRuntime>>,
292 visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
293) -> Router {
294 let console_aggregator = console_events.clone().map(|events| {
295 if let Some(store) = console_log_store {
296 let aggregator = MobKitConsoleAggregator::new(store);
297 aggregator.register_runtime_handles_with_policy(
298 "default",
299 "",
300 runtime.clone(),
301 events,
302 visibility_policy.clone(),
303 );
304 aggregator
305 } else {
306 let aggregator = MobKitConsoleAggregator::in_memory();
307 aggregator.register_runtime_handles_with_policy(
308 "default",
309 "",
310 runtime.clone(),
311 events,
312 visibility_policy.clone(),
313 );
314 aggregator
315 }
316 });
317 let snapshot_read_model = ConsoleSnapshotReadModel::default();
318 snapshot_read_model.refresh_soon(runtime.clone());
319 console_json_router_with_state(ConsoleJsonState {
320 decisions,
321 runtime: Some(runtime),
322 module_runtime,
323 contact_directory,
324 event_log,
325 gateway_peer_keys,
326 identity_runtime,
327 console_events,
328 console_aggregator,
329 mob_events,
330 metadata_table,
331 visibility_policy,
332 snapshot_read_model,
333 })
334}
335
336pub fn console_frontend_router() -> Router {
337 Router::new()
338 .route("/", get(|| async { Redirect::temporary("/console") }))
339 .route("/favicon.ico", get(|| async { StatusCode::NO_CONTENT }))
340 .route("/console", get(console_frontend_index_handler))
341 .route("/console/", get(console_frontend_index_handler))
342 .route(
343 "/console/assets/console-app.js",
344 get(console_frontend_app_js_handler),
345 )
346 .route(
347 "/console/assets/console-app.css",
348 get(console_frontend_app_css_handler),
349 )
350}
351
352fn console_json_router_with_state(state: ConsoleJsonState) -> Router {
353 let router = Router::new()
354 .route("/console/experience", get(console_json_handler))
355 .route("/console/modules", get(console_json_handler))
356 .route("/console/identities", get(console_identities_handler))
357 .route("/console/timeline", get(console_timeline_handler))
358 .route(
359 "/console/timeline/stream",
360 get(console_timeline_stream_handler),
361 )
362 .route(
363 "/console/identity/{identity}/stream",
364 get(console_identity_timeline_stream_handler),
365 )
366 .route("/console/send", post(console_send_handler))
367 .route("/console/rpc", post(console_rpc_handler))
368 .route(
369 "/console/rpc/multipart",
370 post(console_rpc_multipart_handler)
371 .layer(DefaultBodyLimit::max(MAX_MULTIPART_BODY_BYTES)),
372 )
373 .route("/blobs/{blob_id}", get(blob_get_handler));
374 router.with_state(state)
375}
376
377pub async fn console_json_handler(
378 State(state): State<ConsoleJsonState>,
379 headers: HeaderMap,
380 uri: Uri,
381) -> impl IntoResponse {
382 let mut path = uri
383 .path_and_query()
384 .map(|path_and_query| path_and_query.as_str().to_string())
385 .unwrap_or_else(|| uri.path().to_string());
386
387 let already_has_token = path
399 .split_once('?')
400 .map(|(_, q)| form_urlencoded::parse(q.as_bytes()).any(|(key, _)| key == "auth_token"))
401 .unwrap_or(false);
402 if !already_has_token
403 && let Some(bearer) = headers
404 .get(header::AUTHORIZATION)
405 .and_then(|v| v.to_str().ok())
406 .and_then(extract_bearer_token_from_header)
407 {
408 let encoded: String = form_urlencoded::byte_serialize(bearer.as_bytes()).collect();
409 let sep = if path.contains('?') { '&' } else { '?' };
410 path = format!("{path}{sep}auth_token={encoded}");
411 }
412
413 let config_module_ids: Vec<String> = state
414 .decisions
415 .modules
416 .iter()
417 .map(|m| m.id.clone())
418 .collect();
419 let live_snapshot = match &state.runtime {
420 Some(runtime) => {
421 state.snapshot_read_model.refresh_soon(runtime.clone());
422 Some(
423 build_live_snapshot(
424 runtime,
425 &config_module_ids,
426 state.console_events.as_ref(),
427 state.visibility_policy.as_ref(),
428 &state.snapshot_read_model,
429 )
430 .await,
431 )
432 }
433 None => match &state.console_aggregator {
434 Some(aggregator) => build_aggregator_live_snapshot(aggregator, &config_module_ids)
435 .await
436 .ok(),
437 None => None,
438 },
439 }
440 .map(|mut snapshot| {
441 apply_console_visibility_policy(&mut snapshot, state.visibility_policy.as_ref());
442 snapshot
443 });
444
445 let response = handle_console_rest_json_route_with_snapshot(
446 &state.decisions,
447 &ConsoleRestJsonRequest {
448 method: "GET".to_string(),
449 path,
450 auth: None,
451 },
452 live_snapshot.as_ref(),
453 );
454 let status = StatusCode::from_u16(response.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
455 (status, Json::<Value>(response.body))
456}
457
458pub async fn console_rpc_handler(
459 State(state): State<ConsoleJsonState>,
460 headers: HeaderMap,
461 uri: Uri,
462 Json(request): Json<Value>,
463) -> impl IntoResponse {
464 let parsed_request = match serde_json::from_value::<JsonRpcRequest>(request) {
466 Ok(req) => req,
467 Err(_) => {
468 return (
469 StatusCode::OK,
470 Json::<Value>(serde_json::json!({
471 "jsonrpc": JSONRPC_VERSION,
472 "id": Value::Null,
473 "error": { "code": -32600, "message": "Invalid Request" }
474 })),
475 );
476 }
477 };
478
479 if !console_request_authorized(&state, &headers, &uri) {
484 return (
485 StatusCode::UNAUTHORIZED,
486 Json::<Value>(serde_json::json!({
487 "jsonrpc": JSONRPC_VERSION,
488 "id": parsed_request.id.unwrap_or(Value::Null),
489 "error": {
490 "code": -32600,
491 "message": "unauthorized: console rpc requires a valid auth token",
492 }
493 })),
494 );
495 }
496 let is_authenticated = true;
505 let Some(runtime) = &state.runtime else {
506 let response_value = handle_console_aggregator_rpc(
507 state.console_aggregator.clone(),
508 parsed_request,
509 is_authenticated,
510 )
511 .await;
512 return (StatusCode::OK, Json::<Value>(response_value));
513 };
514
515 let response_value = Box::pin(handle_console_runtime_rpc(
516 runtime,
517 state.module_runtime.clone(),
518 state.contact_directory.as_ref(),
519 state.gateway_peer_keys.as_ref(),
520 state.console_events.clone(),
521 state.console_aggregator.clone(),
522 state.identity_runtime.clone(),
523 state.metadata_table.clone(),
524 state.mob_events.clone(),
525 parsed_request,
526 is_authenticated,
527 ))
528 .await;
529 (StatusCode::OK, Json::<Value>(response_value))
530}
531
532#[derive(Debug, serde::Deserialize)]
533struct ConsoleTimelineHttpQuery {
534 #[serde(default)]
535 identity: Option<String>,
536 #[serde(default)]
537 conversation_id: Option<String>,
538 #[serde(default)]
539 after: Option<String>,
540 #[serde(default)]
541 limit: Option<usize>,
542}
543
544async fn console_identities_handler(
545 State(state): State<ConsoleJsonState>,
546 headers: HeaderMap,
547 uri: Uri,
548) -> impl IntoResponse {
549 if !console_request_authorized(&state, &headers, &uri) {
550 return console_json_error(
551 StatusCode::UNAUTHORIZED,
552 "unauthorized",
553 "console identities require a valid auth token",
554 );
555 }
556 let Some(aggregator) = &state.console_aggregator else {
557 return console_json_error(
558 StatusCode::NOT_FOUND,
559 "unavailable",
560 "console aggregator unavailable",
561 );
562 };
563 let aggregator = aggregator.clone();
564 match aggregator.list_identities().await {
565 Ok(identities) => (
566 StatusCode::OK,
567 Json::<Value>(json!({ "identities": identities })),
568 )
569 .into_response(),
570 Err(err) => console_json_error(
571 StatusCode::INTERNAL_SERVER_ERROR,
572 "internal_error",
573 &err.to_string(),
574 ),
575 }
576}
577
578async fn console_timeline_handler(
579 State(state): State<ConsoleJsonState>,
580 headers: HeaderMap,
581 uri: Uri,
582 Query(query): Query<ConsoleTimelineHttpQuery>,
583) -> impl IntoResponse {
584 if !console_request_authorized(&state, &headers, &uri) {
585 return console_json_error(
586 StatusCode::UNAUTHORIZED,
587 "unauthorized",
588 "console timeline requires a valid auth token",
589 );
590 }
591 let Some(aggregator) = &state.console_aggregator else {
592 return console_json_error(
593 StatusCode::NOT_FOUND,
594 "unavailable",
595 "console aggregator unavailable",
596 );
597 };
598 let timeline_query = timeline_query_from_http(query, None);
599 match aggregator.query_timeline(timeline_query).await {
600 Ok(page) => (
601 StatusCode::OK,
602 Json::<Value>(serde_json::to_value(page).unwrap_or_else(|_| json!({ "frames": [] }))),
603 )
604 .into_response(),
605 Err(err) => {
606 console_json_error(StatusCode::CONFLICT, "replay_unavailable", &err.to_string())
607 }
608 }
609}
610
611async fn console_send_handler(
612 State(state): State<ConsoleJsonState>,
613 headers: HeaderMap,
614 uri: Uri,
615 Json(request): Json<ConsoleSendRequest>,
616) -> impl IntoResponse {
617 if !console_request_authorized(&state, &headers, &uri) {
618 return console_json_error(
619 StatusCode::UNAUTHORIZED,
620 "unauthorized",
621 "console send requires a valid auth token",
622 );
623 }
624 let Some(aggregator) = &state.console_aggregator else {
625 return console_json_error(
626 StatusCode::NOT_FOUND,
627 "unavailable",
628 "console aggregator unavailable",
629 );
630 };
631 if let Some(identity_runtime) = &state.identity_runtime {
632 return match console_send_identity_first(
633 aggregator,
634 identity_runtime,
635 state.console_events.as_ref(),
636 request,
637 )
638 .await
639 {
640 Ok(accepted) => (
641 StatusCode::OK,
642 Json::<Value>(
643 serde_json::to_value(accepted).unwrap_or_else(|_| json!({ "accepted": true })),
644 ),
645 )
646 .into_response(),
647 Err(err) => console_send_error_response(err),
648 };
649 }
650 match aggregator.send(request).await {
651 Ok(accepted) => (
652 StatusCode::OK,
653 Json::<Value>(
654 serde_json::to_value(accepted).unwrap_or_else(|_| json!({ "accepted": true })),
655 ),
656 )
657 .into_response(),
658 Err(err) => console_send_error_response(err),
659 }
660}
661
662async fn console_send_identity_first(
663 aggregator: &MobKitConsoleAggregator,
664 identity_runtime: &crate::identity_first::IdentityRuntime,
665 console_events: Option<&ConsoleEventStore>,
666 mut request: ConsoleSendRequest,
667) -> Result<crate::console_aggregator::ConsoleInteractionAccepted, ConsoleSendError> {
668 let requested_identity = request.identity.clone();
669 let parsed_identity = crate::identity_first::AgentIdentity::parse(request.identity.as_str())
670 .map_err(|err| ConsoleSendError::InvalidRequest(format!("invalid identity: {err}")))?;
671 let content: ContentInput = serde_json::from_value(request.content.clone())
672 .map_err(|err| ConsoleSendError::InvalidContent(err.to_string()))?;
673 if let ContentInput::Text(text) = &content
674 && text.trim().is_empty()
675 {
676 return Err(ConsoleSendError::InvalidContent(
677 "content must be non-empty".to_string(),
678 ));
679 }
680 if let ContentInput::Blocks(blocks) = &content
681 && blocks.is_empty()
682 {
683 return Err(ConsoleSendError::InvalidContent(
684 "content blocks must be non-empty".to_string(),
685 ));
686 }
687
688 let (identity, status) = match identity_runtime.status(&parsed_identity).await {
689 Ok(status) => (parsed_identity, status),
690 Err(original_err) => {
691 let Some(canonical_identity) =
692 resolve_console_send_identity_alias(aggregator, &requested_identity).await
693 else {
694 return Err(identity_runtime_error_to_console_send_error(
695 requested_identity.as_str(),
696 original_err,
697 ));
698 };
699 let identity = crate::identity_first::AgentIdentity::parse(canonical_identity.as_str())
700 .map_err(|err| {
701 ConsoleSendError::InvalidRequest(format!("invalid aliased identity: {err}"))
702 })?;
703 let status = identity_runtime.status(&identity).await.map_err(|_| {
704 identity_runtime_error_to_console_send_error(
705 requested_identity.as_str(),
706 original_err,
707 )
708 })?;
709 request.identity = canonical_identity;
710 (identity, status)
711 }
712 };
713 let session_id = status
714 .session_id
715 .as_ref()
716 .map(std::string::ToString::to_string);
717 let runtime_member_id = status
718 .agent_runtime_id
719 .as_ref()
720 .map(|id| id.as_str().to_string());
721 let accepted = aggregator
722 .reserve_identity_first_interaction(request.clone(), session_id.as_deref())
723 .await?;
724
725 if let Some(events) = console_events {
726 events
727 .reserve_interaction_value(
728 identity.as_str(),
729 runtime_member_id.as_deref(),
730 &accepted.interaction_id,
731 &request.origin,
732 request.content.clone(),
733 )
734 .await
735 .map_err(ConsoleSendError::State)?;
736 }
737
738 match identity_runtime.send(&identity, &content).await {
739 Ok(_) => Ok(accepted),
740 Err(err) => {
741 let _ = aggregator
742 .mark_interaction_delivery_failed(&accepted.input_frame_id)
743 .await;
744 if let Some(events) = console_events {
745 events
746 .record_lifecycle(
747 identity.as_str(),
748 "interaction_failed",
749 json!({
750 "interaction_id": accepted.interaction_id,
751 "origin": request.origin,
752 "error": err.to_string(),
753 }),
754 )
755 .await;
756 }
757 Err(identity_runtime_error_to_console_send_error(
758 identity.as_str(),
759 err,
760 ))
761 }
762 }
763}
764
765async fn resolve_console_send_identity_alias(
766 aggregator: &MobKitConsoleAggregator,
767 requested_identity: &str,
768) -> Option<String> {
769 let identities = aggregator.list_identities().await.ok()?;
770 identities
771 .into_iter()
772 .find(|record| record.runtime_member_id == requested_identity)
773 .map(|record| record.identity)
774}
775
776fn identity_runtime_error_to_console_send_error(
777 identity: &str,
778 err: crate::identity_first::IdentityRuntimeError,
779) -> ConsoleSendError {
780 match err {
781 crate::identity_first::IdentityRuntimeError::UnknownIdentity(_) => {
782 ConsoleSendError::UnknownIdentity(identity.to_string())
783 }
784 crate::identity_first::IdentityRuntimeError::NotAddressable(_) => {
785 ConsoleSendError::NotAddressable(identity.to_string())
786 }
787 crate::identity_first::IdentityRuntimeError::InvalidState { .. } => {
788 ConsoleSendError::Retired(identity.to_string())
789 }
790 other => ConsoleSendError::Dispatch(other.to_string()),
791 }
792}
793
794async fn console_timeline_stream_handler(
795 State(state): State<ConsoleJsonState>,
796 headers: HeaderMap,
797 uri: Uri,
798 Query(query): Query<ConsoleTimelineHttpQuery>,
799) -> impl IntoResponse {
800 if !console_request_authorized(&state, &headers, &uri) {
801 return console_json_error(
802 StatusCode::UNAUTHORIZED,
803 "unauthorized",
804 "console timeline stream requires a valid auth token",
805 );
806 }
807 let Some(aggregator) = &state.console_aggregator else {
808 return console_json_error(
809 StatusCode::NOT_FOUND,
810 "unavailable",
811 "console aggregator unavailable",
812 );
813 };
814 let aggregator = aggregator.clone();
815 let last_event_id = headers
816 .get("last-event-id")
817 .and_then(|value| value.to_str().ok())
818 .map(str::trim)
819 .filter(|value| !value.is_empty())
820 .map(ToString::to_string);
821 let timeline_query = timeline_query_from_http(query, last_event_id);
822 let mut rx = aggregator.subscribe();
823 let (snapshot_frames, snapshot_cursor) =
824 match query_timeline_snapshot(&aggregator, timeline_query.clone()).await {
825 Ok(snapshot) => snapshot,
826 Err(_) => {
827 let latest_cursor = aggregator.latest_cursor().await.ok().flatten();
828 let requested_cursor = timeline_query
829 .after
830 .as_ref()
831 .map(ToString::to_string)
832 .unwrap_or_default();
833 return (
834 StatusCode::CONFLICT,
835 Json::<Value>(
836 serde_json::to_value(ConsoleReplayUnavailable {
837 error: "replay_unavailable".to_string(),
838 requested_cursor,
839 latest_cursor,
840 })
841 .unwrap_or_else(|_| json!({ "error": "replay_unavailable" })),
842 ),
843 )
844 .into_response();
845 }
846 };
847 let identity = timeline_query.identity.clone();
848 let conversation_id = timeline_query.conversation_id.clone();
849 let snapshot_after = timeline_query.after.clone();
850 let stream = stream! {
851 if let Some(event) = sse_event_from_timeline_event(&ConsoleTimelineEvent::SnapshotStarted { after: snapshot_after }) {
852 yield Ok::<Event, Infallible>(event);
853 }
854 let mut latest_cursor = snapshot_cursor;
855 for frame in snapshot_frames {
856 latest_cursor = Some(frame.cursor.clone());
857 if let Some(event) = sse_event_from_timeline_event(&ConsoleTimelineEvent::ConsoleFrame { frame }) {
858 yield Ok::<Event, Infallible>(event);
859 }
860 }
861 if let Some(event) = sse_event_from_timeline_event(&ConsoleTimelineEvent::SnapshotComplete { cursor: latest_cursor.clone() }) {
862 yield Ok::<Event, Infallible>(event);
863 }
864 loop {
865 match rx.recv().await {
866 Ok(event) if timeline_event_matches(&event, identity.as_deref(), conversation_id.as_deref()) => {
867 if !aggregator.timeline_event_visible(&event).await {
868 continue;
869 }
870 if let Some(event_cursor) = timeline_event_cursor(&event)
871 && let Some(current_cursor) = latest_cursor.as_ref()
872 && !cursor_is_after(event_cursor, current_cursor)
873 {
874 continue;
875 }
876 if let Some(sse) = sse_event_from_timeline_event(&event) {
877 if let Some(event_cursor) = timeline_event_cursor(&event) {
878 latest_cursor = Some(event_cursor.clone());
879 }
880 yield Ok::<Event, Infallible>(sse);
881 }
882 }
883 Ok(_) => {}
884 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
885 let event = ConsoleTimelineEvent::ReplayUnavailable {
886 requested_cursor: format!("lagged:{skipped}"),
887 latest_cursor: None,
888 };
889 if let Some(sse) = sse_event_from_timeline_event(&event) {
890 yield Ok::<Event, Infallible>(sse);
891 }
892 }
893 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
894 }
895 }
896 };
897 Sse::new(stream)
898 .keep_alive(
899 KeepAlive::new()
900 .interval(DEFAULT_KEEP_ALIVE_INTERVAL)
901 .text(KEEP_ALIVE_TEXT),
902 )
903 .into_response()
904}
905
906async fn console_identity_timeline_stream_handler(
907 State(state): State<ConsoleJsonState>,
908 headers: HeaderMap,
909 uri: Uri,
910 AxumPath(identity): AxumPath<String>,
911 Query(mut query): Query<ConsoleTimelineHttpQuery>,
912) -> impl IntoResponse {
913 query.identity = Some(identity);
914 Box::pin(console_timeline_stream_handler(
915 State(state),
916 headers,
917 uri,
918 Query(query),
919 ))
920 .await
921 .into_response()
922}
923
924fn timeline_query_from_http(
925 query: ConsoleTimelineHttpQuery,
926 fallback_after: Option<String>,
927) -> ConsoleTimelineQuery {
928 let after = query.after.or(fallback_after).map(ConsoleCursor::from);
929 ConsoleTimelineQuery {
930 identity: query
931 .identity
932 .map(|value| value.trim().to_string())
933 .filter(|value| !value.is_empty()),
934 conversation_id: query
935 .conversation_id
936 .map(|value| value.trim().to_string())
937 .filter(|value| !value.is_empty()),
938 after,
939 limit: query.limit.unwrap_or(200),
940 }
941}
942
943async fn query_timeline_snapshot(
944 aggregator: &MobKitConsoleAggregator,
945 mut query: ConsoleTimelineQuery,
946) -> ConsoleLogResult<(Vec<ConsoleFrame>, Option<ConsoleCursor>)> {
947 const MAX_SNAPSHOT_PAGES: usize = 100;
948 const STORE_PAGE_LIMIT: usize = 1_000;
949 const DEFAULT_SNAPSHOT_LIMIT: usize = 200;
950 let mut frames = Vec::new();
951 let mut latest_cursor = query.after.clone();
952 if query.after.is_none() {
953 query.limit = if query.limit == 0 {
954 DEFAULT_SNAPSHOT_LIMIT
955 } else {
956 query.limit
957 }
958 .clamp(1, STORE_PAGE_LIMIT);
959 return query_fresh_timeline_snapshot(aggregator, query, STORE_PAGE_LIMIT).await;
960 }
961 query.limit = STORE_PAGE_LIMIT;
962 let query_identity = query.identity.clone();
963 for page_idx in 0..MAX_SNAPSHOT_PAGES {
964 let page = aggregator.store().query_frames(query.clone()).await?;
965 if page.frames.is_empty() {
966 break;
967 }
968 latest_cursor = page.next_cursor.clone();
969 let page_len = page.frames.len();
970 frames.extend(
971 visible_snapshot_frames(aggregator, page.frames, query_identity.as_deref()).await?,
972 );
973 query.after = latest_cursor.clone();
974 if page_len < STORE_PAGE_LIMIT {
975 break;
976 }
977 if page_idx + 1 == MAX_SNAPSHOT_PAGES {
978 return Err(Box::new(std::io::Error::other(
979 "timeline replay exceeded maximum snapshot pages",
980 )));
981 }
982 }
983 Ok((frames, latest_cursor))
984}
985
986async fn query_fresh_timeline_snapshot(
987 aggregator: &MobKitConsoleAggregator,
988 mut query: ConsoleTimelineQuery,
989 store_page_limit: usize,
990) -> ConsoleLogResult<(Vec<ConsoleFrame>, Option<ConsoleCursor>)> {
991 let requested_limit = query.limit;
992 query.limit = store_page_limit;
993 let query_identity = query.identity.clone();
994 let mut latest_cursor = None;
995 let mut tail = std::collections::VecDeque::with_capacity(requested_limit);
996 loop {
997 let page = aggregator.store().query_frames(query.clone()).await?;
998 if page.frames.is_empty() {
999 break;
1000 }
1001 latest_cursor = page.next_cursor.clone();
1002 let page_len = page.frames.len();
1003 for frame in
1004 visible_snapshot_frames(aggregator, page.frames, query_identity.as_deref()).await?
1005 {
1006 if tail.len() >= requested_limit {
1007 tail.pop_front();
1008 }
1009 tail.push_back(frame);
1010 }
1011 query.after = latest_cursor.clone();
1012 if page_len < query.limit {
1013 break;
1014 }
1015 }
1016 Ok((tail.into_iter().collect(), latest_cursor))
1017}
1018
1019async fn visible_snapshot_frames(
1020 aggregator: &MobKitConsoleAggregator,
1021 frames: Vec<ConsoleFrame>,
1022 identity: Option<&str>,
1023) -> ConsoleLogResult<Vec<ConsoleFrame>> {
1024 let mut visible = Vec::with_capacity(frames.len());
1025 for frame in frames {
1026 if aggregator
1027 .timeline_frame_visible_for_query(&frame, identity)
1028 .await
1029 {
1030 visible.push(frame);
1031 }
1032 }
1033 Ok(visible)
1034}
1035
1036fn console_json_error(status: StatusCode, error: &str, message: &str) -> axum::response::Response {
1037 (
1038 status,
1039 Json::<Value>(json!({
1040 "error": error,
1041 "message": message,
1042 })),
1043 )
1044 .into_response()
1045}
1046
1047fn console_send_error_response(err: ConsoleSendError) -> axum::response::Response {
1048 let (status, code) = match &err {
1049 ConsoleSendError::UnknownIdentity(_) => (StatusCode::NOT_FOUND, "unknown_identity"),
1050 ConsoleSendError::NotAddressable(_) => (StatusCode::CONFLICT, "not_addressable"),
1051 ConsoleSendError::Retired(_) => (StatusCode::CONFLICT, "retired"),
1052 ConsoleSendError::InvalidContent(_)
1053 | ConsoleSendError::InvalidHandlingMode(_)
1054 | ConsoleSendError::InvalidRequest(_) => (StatusCode::BAD_REQUEST, "invalid_request"),
1055 ConsoleSendError::IdempotencyConflict(_) => (StatusCode::CONFLICT, "idempotency_conflict"),
1056 ConsoleSendError::State(_) | ConsoleSendError::Dispatch(_) | ConsoleSendError::Log(_) => {
1057 (StatusCode::INTERNAL_SERVER_ERROR, "internal_error")
1058 }
1059 };
1060 console_json_error(status, code, &err.to_string())
1061}
1062
1063fn console_send_rpc_code(err: &ConsoleSendError) -> i64 {
1064 match err {
1065 ConsoleSendError::UnknownIdentity(_) => -32001,
1066 ConsoleSendError::NotAddressable(_) => -32002,
1067 ConsoleSendError::InvalidContent(_)
1068 | ConsoleSendError::InvalidHandlingMode(_)
1069 | ConsoleSendError::InvalidRequest(_) => -32602,
1070 ConsoleSendError::IdempotencyConflict(_) => -32009,
1071 ConsoleSendError::Retired(_) => -32004,
1072 ConsoleSendError::State(_) | ConsoleSendError::Dispatch(_) | ConsoleSendError::Log(_) => {
1073 -32000
1074 }
1075 }
1076}
1077
1078fn console_send_rpc_error(response_id: Value, err: ConsoleSendError) -> Value {
1079 response_value(
1080 response_id,
1081 None,
1082 Some(JsonRpcError {
1083 code: console_send_rpc_code(&err),
1084 message: err.to_string(),
1085 data: None,
1086 }),
1087 )
1088}
1089
1090fn timeline_event_matches(
1091 event: &ConsoleTimelineEvent,
1092 identity: Option<&str>,
1093 conversation_id: Option<&str>,
1094) -> bool {
1095 let frame = match event {
1096 ConsoleTimelineEvent::ConsoleFrame { frame }
1097 | ConsoleTimelineEvent::FrameUpdated { frame } => frame,
1098 ConsoleTimelineEvent::SnapshotStarted { .. }
1099 | ConsoleTimelineEvent::SnapshotComplete { .. }
1100 | ConsoleTimelineEvent::ReplayUnavailable { .. } => return true,
1101 };
1102 if identity.is_some_and(|value| frame.identity != value) {
1103 return false;
1104 }
1105 if conversation_id.is_some_and(|value| frame.conversation_id.as_deref() != Some(value)) {
1106 return false;
1107 }
1108 true
1109}
1110
1111fn timeline_event_cursor(event: &ConsoleTimelineEvent) -> Option<&ConsoleCursor> {
1112 match event {
1113 ConsoleTimelineEvent::ConsoleFrame { frame }
1114 | ConsoleTimelineEvent::FrameUpdated { frame } => Some(&frame.cursor),
1115 ConsoleTimelineEvent::SnapshotStarted { .. }
1116 | ConsoleTimelineEvent::SnapshotComplete { .. }
1117 | ConsoleTimelineEvent::ReplayUnavailable { .. } => None,
1118 }
1119}
1120
1121fn cursor_is_after(candidate: &ConsoleCursor, current: &ConsoleCursor) -> bool {
1122 match (candidate.seq(), current.seq()) {
1123 (Some(candidate), Some(current)) => candidate > current,
1124 _ => candidate > current,
1125 }
1126}
1127
1128fn sse_event_from_timeline_event(event: &ConsoleTimelineEvent) -> Option<Event> {
1129 let (event_name, id) = match event {
1130 ConsoleTimelineEvent::SnapshotStarted { .. } => ("snapshot_started", None),
1131 ConsoleTimelineEvent::ConsoleFrame { frame } => (
1132 if frame.kind == "frame_updated" {
1133 "frame_updated"
1134 } else {
1135 "console_frame"
1136 },
1137 Some(frame.cursor.to_string()),
1138 ),
1139 ConsoleTimelineEvent::FrameUpdated { frame } => {
1140 ("frame_updated", Some(frame.cursor.to_string()))
1141 }
1142 ConsoleTimelineEvent::SnapshotComplete { cursor } => (
1143 "snapshot_complete",
1144 cursor.as_ref().map(ToString::to_string),
1145 ),
1146 ConsoleTimelineEvent::ReplayUnavailable { .. } => ("replay_unavailable", None),
1147 };
1148 let data = match serde_json::to_string(event) {
1149 Ok(value) => value,
1150 Err(_) => return None,
1151 };
1152 let mut sse = Event::default().event(event_name).data(data);
1153 if let Some(id) = id {
1154 sse = sse.id(id);
1155 }
1156 Some(sse)
1157}
1158
1159pub async fn console_rpc_multipart_handler(
1160 State(state): State<ConsoleJsonState>,
1161 headers: HeaderMap,
1162 uri: Uri,
1163 mut multipart: Multipart,
1164) -> impl IntoResponse {
1165 if !console_request_authorized(&state, &headers, &uri) {
1166 return (
1167 StatusCode::UNAUTHORIZED,
1168 Json::<Value>(serde_json::json!({
1169 "jsonrpc": JSONRPC_VERSION,
1170 "id": Value::Null,
1171 "error": {
1172 "code": -32600,
1173 "message": "unauthorized: console rpc requires a valid auth token",
1174 }
1175 })),
1176 );
1177 }
1178
1179 let mut payload: Option<String> = None;
1180 let mut files: std::collections::BTreeMap<String, MultipartImageUpload> =
1181 std::collections::BTreeMap::new();
1182
1183 while let Some(mut field) = match multipart.next_field().await {
1184 Ok(field) => field,
1185 Err(err) => {
1186 return (
1187 StatusCode::BAD_REQUEST,
1188 Json::<Value>(json_rpc_error_value(
1189 Value::Null,
1190 -32602,
1191 format!("invalid multipart body: {err}"),
1192 )),
1193 );
1194 }
1195 } {
1196 let name = field.name().unwrap_or("").to_string();
1197 if name == "payload" {
1198 if payload.is_some() {
1199 return (
1200 StatusCode::BAD_REQUEST,
1201 Json::<Value>(json_rpc_error_value(
1202 Value::Null,
1203 -32602,
1204 "duplicate payload part",
1205 )),
1206 );
1207 }
1208 payload = match field.text().await {
1209 Ok(text) => Some(text),
1210 Err(err) => {
1211 return (
1212 StatusCode::BAD_REQUEST,
1213 Json::<Value>(json_rpc_error_value(
1214 Value::Null,
1215 -32602,
1216 format!("invalid payload part: {err}"),
1217 )),
1218 );
1219 }
1220 };
1221 continue;
1222 }
1223
1224 let Some(upload_id) = name.strip_prefix("file:").filter(|id| !id.is_empty()) else {
1225 return (
1226 StatusCode::BAD_REQUEST,
1227 Json::<Value>(json_rpc_error_value(
1228 Value::Null,
1229 -32602,
1230 format!("unexpected multipart field: {name}"),
1231 )),
1232 );
1233 };
1234 if files.len() >= MAX_MULTIPART_IMAGES {
1235 return (
1236 StatusCode::BAD_REQUEST,
1237 Json::<Value>(json_rpc_error_value(
1238 Value::Null,
1239 -32602,
1240 format!("too many image attachments; max {MAX_MULTIPART_IMAGES}"),
1241 )),
1242 );
1243 }
1244 if files.contains_key(upload_id) {
1245 return (
1246 StatusCode::BAD_REQUEST,
1247 Json::<Value>(json_rpc_error_value(
1248 Value::Null,
1249 -32602,
1250 format!("duplicate file part for upload_id {upload_id}"),
1251 )),
1252 );
1253 }
1254 let media_type = field
1255 .content_type()
1256 .map(str::to_string)
1257 .unwrap_or_else(|| "application/octet-stream".to_string());
1258 if !is_allowed_image_media_type(&media_type) {
1259 return (
1260 StatusCode::BAD_REQUEST,
1261 Json::<Value>(json_rpc_error_value(
1262 Value::Null,
1263 -32602,
1264 format!("unsupported image media type: {media_type}"),
1265 )),
1266 );
1267 }
1268 let mut bytes = bytes::BytesMut::new();
1269 loop {
1270 let chunk = match field.chunk().await {
1271 Ok(chunk) => chunk,
1272 Err(err) => {
1273 return (
1274 StatusCode::BAD_REQUEST,
1275 Json::<Value>(json_rpc_error_value(
1276 Value::Null,
1277 -32602,
1278 format!("invalid file part {upload_id}: {err}"),
1279 )),
1280 );
1281 }
1282 };
1283 let Some(chunk) = chunk else {
1284 break;
1285 };
1286 if bytes.len() + chunk.len() > MAX_MULTIPART_IMAGE_BYTES {
1287 return (
1288 StatusCode::BAD_REQUEST,
1289 Json::<Value>(json_rpc_error_value(
1290 Value::Null,
1291 -32602,
1292 format!("image attachment {upload_id} exceeds 25 MiB"),
1293 )),
1294 );
1295 }
1296 bytes.extend_from_slice(&chunk);
1297 }
1298 files.insert(
1299 upload_id.to_string(),
1300 MultipartImageUpload {
1301 media_type,
1302 bytes: bytes.freeze(),
1303 },
1304 );
1305 }
1306
1307 let payload = match payload {
1308 Some(payload) => payload,
1309 None => {
1310 return (
1311 StatusCode::BAD_REQUEST,
1312 Json::<Value>(json_rpc_error_value(
1313 Value::Null,
1314 -32602,
1315 "payload part required",
1316 )),
1317 );
1318 }
1319 };
1320 let mut parsed_request = match serde_json::from_str::<JsonRpcRequest>(&payload) {
1321 Ok(req) => req,
1322 Err(err) => {
1323 return (
1324 StatusCode::OK,
1325 Json::<Value>(json_rpc_error_value(
1326 Value::Null,
1327 -32600,
1328 format!("Invalid Request: {err}"),
1329 )),
1330 );
1331 }
1332 };
1333 let response_id = parsed_request.id.clone().unwrap_or(Value::Null);
1334 match parsed_request.method.as_str() {
1335 "mobkit/console/send" => {
1336 let Some(aggregator) = &state.console_aggregator else {
1337 return (
1338 StatusCode::OK,
1339 Json::<Value>(invalid_params(
1340 response_id,
1341 "mobkit/console/send multipart requires a console aggregator",
1342 )),
1343 );
1344 };
1345 let Some(identity) = parsed_request
1346 .params
1347 .get("identity")
1348 .and_then(Value::as_str)
1349 else {
1350 return (
1351 StatusCode::OK,
1352 Json::<Value>(invalid_params(response_id, "identity required")),
1353 );
1354 };
1355 let binary_blob_store = match aggregator.binary_blob_store_for_identity(identity).await
1356 {
1357 Ok(Some(store)) => store,
1358 Ok(None) => {
1359 return (
1360 StatusCode::OK,
1361 Json::<Value>(invalid_params(
1362 response_id,
1363 "binary blob store unavailable for identity",
1364 )),
1365 );
1366 }
1367 Err(err) => {
1368 return (
1369 StatusCode::OK,
1370 Json::<Value>(console_send_rpc_error(response_id, err)),
1371 );
1372 }
1373 };
1374 if let Err(message) = externalize_image_upload_placeholders(
1375 &mut parsed_request.params,
1376 files,
1377 binary_blob_store,
1378 )
1379 .await
1380 {
1381 return (
1382 StatusCode::OK,
1383 Json::<Value>(invalid_params(response_id, message)),
1384 );
1385 }
1386 }
1387 "mobkit/blob/upload" => {
1388 let Some(runtime) = &state.runtime else {
1389 return (
1390 StatusCode::NOT_FOUND,
1391 Json::<Value>(json_rpc_error_value(
1392 response_id,
1393 -32600,
1394 "mobkit/blob/upload multipart requires a unified runtime",
1395 )),
1396 );
1397 };
1398 let Some(binary_blob_store) = runtime.binary_blob_store() else {
1399 return (
1400 StatusCode::INTERNAL_SERVER_ERROR,
1401 Json::<Value>(json_rpc_error_value(
1402 response_id,
1403 -32000,
1404 "binary blob store unavailable",
1405 )),
1406 );
1407 };
1408 let result = match externalize_single_image_upload(
1409 &parsed_request.params,
1410 files,
1411 binary_blob_store,
1412 )
1413 .await
1414 {
1415 Ok(result) => result,
1416 Err(message) => {
1417 return (
1418 StatusCode::OK,
1419 Json::<Value>(invalid_params(response_id, message)),
1420 );
1421 }
1422 };
1423 return (
1424 StatusCode::OK,
1425 Json::<Value>(response_value(response_id, Some(result), None)),
1426 );
1427 }
1428 _ => {
1429 return (
1430 StatusCode::OK,
1431 Json::<Value>(invalid_params(
1432 response_id,
1433 "multipart RPC supports mobkit/console/send and mobkit/blob/upload only",
1434 )),
1435 );
1436 }
1437 }
1438 let response_value = if parsed_request.method == "mobkit/console/send"
1439 && state.runtime.is_none()
1440 {
1441 handle_console_aggregator_rpc(state.console_aggregator.clone(), parsed_request, true).await
1442 } else {
1443 let Some(runtime) = &state.runtime else {
1444 return (
1445 StatusCode::NOT_FOUND,
1446 Json::<Value>(json_rpc_error_value(
1447 response_id,
1448 -32600,
1449 "console rpc multipart requires a unified runtime",
1450 )),
1451 );
1452 };
1453 Box::pin(handle_console_runtime_rpc(
1454 runtime,
1455 state.module_runtime.clone(),
1456 state.contact_directory.as_ref(),
1457 state.gateway_peer_keys.as_ref(),
1458 state.console_events.clone(),
1459 state.console_aggregator.clone(),
1460 state.identity_runtime.clone(),
1461 state.metadata_table.clone(),
1462 state.mob_events.clone(),
1463 parsed_request,
1464 true,
1465 ))
1466 .await
1467 };
1468 (StatusCode::OK, Json::<Value>(response_value))
1469}
1470
1471pub async fn blob_get_handler(
1472 State(state): State<ConsoleJsonState>,
1473 headers: HeaderMap,
1474 uri: Uri,
1475 AxumPath(blob_id): AxumPath<String>,
1476) -> impl IntoResponse {
1477 if !console_request_authorized(&state, &headers, &uri) {
1478 return (
1479 StatusCode::UNAUTHORIZED,
1480 Json::<Value>(serde_json::json!({ "error": "unauthorized" })),
1481 )
1482 .into_response();
1483 }
1484 if !is_valid_blob_id_value(&blob_id) {
1485 return (
1486 StatusCode::BAD_REQUEST,
1487 Json::<Value>(serde_json::json!({ "error": "invalid_blob_id" })),
1488 )
1489 .into_response();
1490 }
1491 let blob_id = meerkat_core::BlobId::from(blob_id.as_str());
1492 let mut stores: Vec<std::sync::Arc<dyn BinaryBlobStore>> = Vec::new();
1493 if let Some(runtime) = &state.runtime
1494 && let Some(store) = runtime.binary_blob_store()
1495 {
1496 stores.push(store);
1497 }
1498 if let Some(aggregator) = &state.console_aggregator {
1499 stores.extend(aggregator.binary_blob_stores());
1500 }
1501 if stores.is_empty() {
1502 return (
1503 StatusCode::NOT_FOUND,
1504 Json::<Value>(serde_json::json!({ "error": "blob_store_unavailable" })),
1505 )
1506 .into_response();
1507 }
1508 for store in stores {
1509 match store.get_bytes(&blob_id).await {
1510 Ok(payload) => return blob_payload_response(payload),
1511 Err(meerkat_core::BlobStoreError::NotFound(_)) => continue,
1512 Err(err) => {
1513 return (
1514 StatusCode::INTERNAL_SERVER_ERROR,
1515 Json::<Value>(serde_json::json!({ "error": err.to_string() })),
1516 )
1517 .into_response();
1518 }
1519 }
1520 }
1521 (
1522 StatusCode::NOT_FOUND,
1523 Json::<Value>(serde_json::json!({ "error": "blob_not_found" })),
1524 )
1525 .into_response()
1526}
1527
1528fn blob_payload_response(payload: BinaryBlobPayload) -> axum::response::Response {
1529 let mut response_headers = HeaderMap::new();
1530 let content_type = HeaderValue::from_str(&payload.media_type)
1531 .unwrap_or_else(|_| HeaderValue::from_static("application/octet-stream"));
1532 response_headers.insert(header::CONTENT_TYPE, content_type);
1533 if let Ok(content_length) = HeaderValue::from_str(&payload.size.to_string()) {
1534 response_headers.insert(header::CONTENT_LENGTH, content_length);
1535 }
1536 response_headers.insert(
1537 header::CACHE_CONTROL,
1538 HeaderValue::from_static("private, max-age=31536000, immutable"),
1539 );
1540 (StatusCode::OK, response_headers, payload.data).into_response()
1541}
1542
1543fn console_request_authorized(state: &ConsoleJsonState, headers: &HeaderMap, uri: &Uri) -> bool {
1544 if !state.decisions.console.require_app_auth {
1545 return true;
1546 }
1547 console_request_token(headers, uri)
1548 .is_some_and(|token| validate_console_token(&state.decisions, &token))
1549}
1550
1551fn console_request_token(headers: &HeaderMap, uri: &Uri) -> Option<String> {
1552 let bearer_token = headers
1553 .get(header::AUTHORIZATION)
1554 .and_then(|v| v.to_str().ok())
1555 .and_then(extract_bearer_token_from_header)
1556 .map(String::from);
1557 let query_token = uri.query().and_then(|q| {
1560 form_urlencoded::parse(q.as_bytes())
1561 .find(|(key, _)| key == "auth_token")
1562 .map(|(_, value)| value.into_owned())
1563 });
1564 bearer_token.or(query_token)
1565}
1566
1567#[derive(Debug)]
1568struct MultipartImageUpload {
1569 media_type: String,
1570 bytes: bytes::Bytes,
1571}
1572
1573fn json_rpc_error_value(id: Value, code: i64, message: impl Into<String>) -> Value {
1574 serde_json::json!({
1575 "jsonrpc": JSONRPC_VERSION,
1576 "id": id,
1577 "error": {
1578 "code": code,
1579 "message": message.into(),
1580 }
1581 })
1582}
1583
1584fn is_allowed_image_media_type(media_type: &str) -> bool {
1585 matches!(
1586 media_type,
1587 "image/png" | "image/jpeg" | "image/webp" | "image/gif"
1588 )
1589}
1590
1591fn image_upload_part_name<'a>(
1592 object: &'a serde_json::Map<String, Value>,
1593 context: &str,
1594) -> Result<&'a str, String> {
1595 object
1596 .get("upload_id")
1597 .or_else(|| object.get("part_name"))
1598 .and_then(Value::as_str)
1599 .map(str::trim)
1600 .filter(|value| !value.is_empty())
1601 .ok_or_else(|| format!("{context}.upload_id or {context}.part_name is required"))
1602}
1603
1604async fn externalize_image_upload_placeholders(
1605 params: &mut Value,
1606 files: std::collections::BTreeMap<String, MultipartImageUpload>,
1607 blob_store: std::sync::Arc<dyn crate::blob_store::BinaryBlobStore>,
1608) -> Result<(), String> {
1609 let Some(content) = params.get_mut("content") else {
1610 return Err("multipart payload params.content is required".to_string());
1611 };
1612 let mut placeholders = std::collections::BTreeMap::<String, String>::new();
1613 collect_image_upload_placeholders(content, &mut placeholders)?;
1614 if placeholders.is_empty() {
1615 return Err(
1616 "multipart payload must contain at least one image_upload placeholder".to_string(),
1617 );
1618 }
1619 if placeholders.len() > MAX_MULTIPART_IMAGES {
1620 return Err(format!(
1621 "too many image_upload placeholders; max {MAX_MULTIPART_IMAGES}"
1622 ));
1623 }
1624 for upload_id in files.keys() {
1625 if !placeholders.contains_key(upload_id) {
1626 return Err(format!(
1627 "file part has no matching image_upload placeholder: {upload_id}"
1628 ));
1629 }
1630 }
1631 for upload_id in placeholders.keys() {
1632 if !files.contains_key(upload_id) {
1633 return Err(format!(
1634 "image_upload placeholder missing file part: {upload_id}"
1635 ));
1636 }
1637 }
1638
1639 let mut refs = std::collections::BTreeMap::<String, Value>::new();
1640 for (upload_id, file) in files {
1641 let declared_media_type = placeholders
1642 .get(&upload_id)
1643 .cloned()
1644 .unwrap_or_else(|| file.media_type.clone());
1645 if !is_allowed_image_media_type(&declared_media_type) {
1646 return Err(format!(
1647 "unsupported image media type in placeholder {upload_id}: {declared_media_type}"
1648 ));
1649 }
1650 if declared_media_type != file.media_type {
1651 return Err(format!(
1652 "media type mismatch for {upload_id}: placeholder {declared_media_type}, file {}",
1653 file.media_type
1654 ));
1655 }
1656 let blob_ref = blob_store
1657 .put_bytes(&file.media_type, file.bytes)
1658 .await
1659 .map_err(|err| format!("failed to store image {upload_id}: {err}"))?;
1660 refs.insert(
1661 upload_id,
1662 serde_json::json!({
1663 "type": "image",
1664 "media_type": blob_ref.media_type,
1665 "source": "blob",
1666 "blob_id": blob_ref.blob_id,
1667 }),
1668 );
1669 }
1670 replace_image_upload_placeholders(content, &refs)?;
1671 if let Some(object) = params.as_object_mut() {
1672 object.remove("message");
1673 }
1674 Ok(())
1675}
1676
1677async fn externalize_single_image_upload(
1678 params: &Value,
1679 files: std::collections::BTreeMap<String, MultipartImageUpload>,
1680 blob_store: std::sync::Arc<dyn crate::blob_store::BinaryBlobStore>,
1681) -> Result<Value, String> {
1682 let upload = params.get("upload").unwrap_or(params);
1683 if upload
1684 .get("type")
1685 .and_then(Value::as_str)
1686 .is_some_and(|kind| kind != "image_upload")
1687 {
1688 return Err("upload.type must be image_upload".to_string());
1689 }
1690 let upload_object = upload
1691 .as_object()
1692 .ok_or_else(|| "upload must be an object".to_string())?;
1693 let upload_id = image_upload_part_name(upload_object, "upload")?;
1694 let Some(file) = files.get(upload_id) else {
1695 return Err(format!(
1696 "image_upload placeholder missing file part: {upload_id}"
1697 ));
1698 };
1699 if files.len() != 1 {
1700 return Err("mobkit/blob/upload accepts exactly one file part".to_string());
1701 }
1702 let declared_media_type = upload
1703 .get("media_type")
1704 .and_then(Value::as_str)
1705 .unwrap_or(file.media_type.as_str());
1706 if !is_allowed_image_media_type(declared_media_type) {
1707 return Err(format!(
1708 "unsupported image media type in upload {upload_id}: {declared_media_type}"
1709 ));
1710 }
1711 if declared_media_type != file.media_type {
1712 return Err(format!(
1713 "media type mismatch for {upload_id}: placeholder {declared_media_type}, file {}",
1714 file.media_type
1715 ));
1716 }
1717 let size = file.bytes.len() as u64;
1718 let blob_ref = blob_store
1719 .put_bytes(&file.media_type, file.bytes.clone())
1720 .await
1721 .map_err(|err| format!("failed to store image {upload_id}: {err}"))?;
1722 Ok(json!({
1723 "blob_id": blob_ref.blob_id,
1724 "media_type": blob_ref.media_type,
1725 "size": size,
1726 }))
1727}
1728
1729fn collect_image_upload_placeholders(
1730 value: &Value,
1731 placeholders: &mut std::collections::BTreeMap<String, String>,
1732) -> Result<(), String> {
1733 match value {
1734 Value::Array(items) => {
1735 for item in items {
1736 collect_image_upload_placeholders(item, placeholders)?;
1737 }
1738 }
1739 Value::Object(object) => {
1740 if object.get("type").and_then(Value::as_str) == Some("image_upload") {
1741 let upload_id = image_upload_part_name(object, "image_upload")?;
1742 let media_type = object
1743 .get("media_type")
1744 .and_then(Value::as_str)
1745 .map(str::trim)
1746 .filter(|value| !value.is_empty())
1747 .ok_or_else(|| format!("image_upload {upload_id} requires media_type"))?;
1748 if placeholders
1749 .insert(upload_id.to_string(), media_type.to_string())
1750 .is_some()
1751 {
1752 return Err(format!("duplicate image_upload placeholder: {upload_id}"));
1753 }
1754 } else {
1755 for child in object.values() {
1756 collect_image_upload_placeholders(child, placeholders)?;
1757 }
1758 }
1759 }
1760 _ => {}
1761 }
1762 Ok(())
1763}
1764
1765fn replace_image_upload_placeholders(
1766 value: &mut Value,
1767 refs: &std::collections::BTreeMap<String, Value>,
1768) -> Result<(), String> {
1769 match value {
1770 Value::Array(items) => {
1771 for item in items {
1772 replace_image_upload_placeholders(item, refs)?;
1773 }
1774 }
1775 Value::Object(object) => {
1776 if object.get("type").and_then(Value::as_str) == Some("image_upload") {
1777 let upload_id = image_upload_part_name(object, "image_upload")?;
1778 let replacement = refs
1779 .get(upload_id)
1780 .ok_or_else(|| format!("missing blob replacement for {upload_id}"))?;
1781 *value = replacement.clone();
1782 } else {
1783 for child in object.values_mut() {
1784 replace_image_upload_placeholders(child, refs)?;
1785 }
1786 }
1787 }
1788 _ => {}
1789 }
1790 Ok(())
1791}
1792
1793fn response_value(id: Value, result: Option<Value>, error: Option<JsonRpcError>) -> Value {
1794 serde_json::to_value(JsonRpcResponse {
1795 jsonrpc: JSONRPC_VERSION.to_string(),
1796 id,
1797 result,
1798 error,
1799 })
1800 .unwrap_or_else(|_| {
1801 serde_json::json!({
1802 "jsonrpc": JSONRPC_VERSION,
1803 "id": Value::Null,
1804 "error": {
1805 "code": -32603,
1806 "message": "serialization failed",
1807 }
1808 })
1809 })
1810}
1811
1812fn invalid_params(id: Value, message: impl Into<String>) -> Value {
1813 response_value(
1814 id,
1815 None,
1816 Some(JsonRpcError {
1817 code: -32602,
1818 message: message.into(),
1819 data: None,
1820 }),
1821 )
1822}
1823
1824async fn member_entry_to_console_json(
1825 runtime: &MobRuntime,
1826 entry: &meerkat_mob::runtime::MobMemberListEntry,
1827) -> Value {
1828 let mut value = member_entry_to_json(entry);
1829 if let Some(object) = value.as_object_mut() {
1830 object.insert(
1831 "model_capabilities".to_string(),
1832 serde_json::to_value(model_capabilities_for_member_entry(
1833 runtime.handle().definition(),
1834 entry,
1835 ))
1836 .unwrap_or(Value::Null),
1837 );
1838 }
1839 value
1840}
1841
1842fn internal_error(id: Value, message: impl Into<String>) -> Value {
1843 response_value(
1844 id,
1845 None,
1846 Some(JsonRpcError {
1847 code: -32000,
1848 message: message.into(),
1849 data: None,
1850 }),
1851 )
1852}
1853
1854fn stale_event_cursor_response(id: Value, after_cursor: u64, latest_cursor: u64) -> Value {
1859 response_value(
1860 id,
1861 None,
1862 Some(JsonRpcError {
1863 code: crate::rpc::MOB_EVENTS_STALE_CURSOR_CODE,
1864 message: format!(
1865 "stale mob event cursor: requested {after_cursor}, latest {latest_cursor}"
1866 ),
1867 data: Some(serde_json::json!({
1868 "error": "event_query_stale",
1869 "after_cursor": after_cursor,
1870 "latest_cursor": latest_cursor,
1871 })),
1872 }),
1873 )
1874}
1875
1876fn parse_console_helper_options(
1877 options_val: Option<&Value>,
1878) -> Result<meerkat_mob::HelperOptions, String> {
1879 crate::rpc::mob_methods::parse_helper_options(options_val)
1880}
1881
1882fn member_is_addressable(member: &meerkat_mob::runtime::MobMemberListEntry) -> bool {
1883 member
1884 .labels
1885 .get("addressable")
1886 .map(|value: &String| !value.eq_ignore_ascii_case("false"))
1887 .unwrap_or(true)
1888}
1889
1890fn member_addressability(member: &meerkat_mob::runtime::MobMemberListEntry) -> &'static str {
1891 if member_is_addressable(member) {
1892 "addressable"
1893 } else {
1894 "internal_only"
1895 }
1896}
1897
1898fn console_identity_status_json(
1899 member: &meerkat_mob::runtime::MobMemberListEntry,
1900 session_id: Option<String>,
1901 response_phase: Option<String>,
1902) -> Value {
1903 json!({
1904 "identity": member.agent_identity.to_string(),
1905 "state": member.state,
1906 "role": member.role.to_string(),
1907 "addressability": member_addressability(member),
1908 "display_name": member.labels.get("display_name"),
1909 "labels": member.labels,
1910 "agent_runtime_id": member.binding_atoms().0.to_string(),
1911 "session_id": session_id,
1912 "generation": Value::Null,
1913 "checkpoint_version": Value::Null,
1914 "lease_healthy": Value::Null,
1915 "lease": Value::Null,
1916 "response_phase": response_phase,
1917 })
1918}
1919
1920fn console_identity_inspect_json(
1921 member: &meerkat_mob::runtime::MobMemberListEntry,
1922 session_id: Option<String>,
1923 response_phase: Option<String>,
1924) -> Value {
1925 let peers: Vec<String> = member.wired_to.iter().map(ToString::to_string).collect();
1926 json!({
1927 "identity": member.agent_identity.to_string(),
1928 "state": member.state,
1929 "role": member.role.to_string(),
1930 "addressability": member_addressability(member),
1931 "display_name": member.labels.get("display_name"),
1932 "labels": member.labels,
1933 "lease_healthy": Value::Null,
1934 "lease": Value::Null,
1935 "continuity": {
1936 "generation": Value::Null,
1937 "checkpoint_version": Value::Null,
1938 "session_id": session_id,
1939 "agent_runtime_id": member.binding_atoms().0.to_string(),
1940 },
1941 "topology_peers": peers,
1942 "output_preview": Value::Null,
1943 "response_phase": response_phase,
1944 })
1945}
1946
1947async fn lookup_member_with_session(
1951 handle: &MobHandle,
1952 identity: &MeerkatId,
1953) -> Option<(meerkat_mob::runtime::MobMemberListEntry, Option<String>)> {
1954 let entries = handle.list_members_including_retiring().await;
1955 let entry = entries
1956 .into_iter()
1957 .find(|e| &e.agent_identity == identity)?;
1958 let session_id = handle
1959 .resolve_bridge_session_id(identity)
1960 .await
1961 .map(|s| s.to_string());
1962 Some((entry, session_id))
1963}
1964
1965#[allow(clippy::too_many_arguments)]
1966async fn handle_console_aggregator_rpc(
1967 console_aggregator: Option<MobKitConsoleAggregator>,
1968 request: JsonRpcRequest,
1969 is_authenticated: bool,
1970) -> Value {
1971 let response_id = request.id.clone().unwrap_or(Value::Null);
1972 match request.method.as_str() {
1973 "mobkit/capabilities" => response_value(
1974 response_id,
1975 Some(json!({
1976 "methods": [
1977 "mobkit/capabilities",
1978 "mobkit/console/list_identities",
1979 "mobkit/console/inspect_identity",
1980 "mobkit/console/query_timeline",
1981 "mobkit/retire",
1982 "mobkit/reset_all",
1983 "mobkit/console/send",
1984 ],
1985 "authenticated": is_authenticated,
1986 "features": {
1987 "console_aggregator": console_aggregator.is_some(),
1988 "multi_runtime_console": console_aggregator.is_some(),
1989 }
1990 })),
1991 None,
1992 ),
1993 "mobkit/console/list_identities" => {
1994 let Some(aggregator) = &console_aggregator else {
1995 return console_aggregator_unavailable(response_id);
1996 };
1997 match aggregator.list_identities().await {
1998 Ok(identities) => {
1999 response_value(response_id, Some(json!({ "identities": identities })), None)
2000 }
2001 Err(err) => internal_error(response_id, format!("list_identities failed: {err}")),
2002 }
2003 }
2004 "mobkit/console/inspect_identity" => {
2005 let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2006 return invalid_params(response_id, "identity required");
2007 };
2008 let Some(aggregator) = &console_aggregator else {
2009 return console_aggregator_unavailable(response_id);
2010 };
2011 match aggregator.inspect_identity(identity).await {
2012 Ok(Some(inspection)) => response_value(
2013 response_id,
2014 Some(serde_json::to_value(inspection).unwrap_or(Value::Null)),
2015 None,
2016 ),
2017 Ok(None) => response_value(
2018 response_id,
2019 None,
2020 Some(JsonRpcError {
2021 code: -32001,
2022 message: format!("unknown identity: {identity}"),
2023 data: None,
2024 }),
2025 ),
2026 Err(err) => internal_error(response_id, format!("inspect_identity failed: {err}")),
2027 }
2028 }
2029 "mobkit/console/query_timeline" => {
2030 let query: ConsoleTimelineQuery = match serde_json::from_value(request.params.clone()) {
2031 Ok(query) => query,
2032 Err(err) => {
2033 return invalid_params(response_id, format!("invalid query params: {err}"));
2034 }
2035 };
2036 let Some(aggregator) = &console_aggregator else {
2037 return console_aggregator_unavailable(response_id);
2038 };
2039 match aggregator.query_timeline(query).await {
2040 Ok(page) => response_value(
2041 response_id,
2042 Some(serde_json::to_value(page).unwrap_or(Value::Null)),
2043 None,
2044 ),
2045 Err(err) => response_value(
2046 response_id,
2047 None,
2048 Some(JsonRpcError {
2049 code: -32010,
2050 message: format!("query_timeline failed: {err}"),
2051 data: Some(json!({ "kind": "replay_unavailable" })),
2052 }),
2053 ),
2054 }
2055 }
2056 "mobkit/console/send" => {
2057 let send_request: ConsoleSendRequest =
2058 match serde_json::from_value(request.params.clone()) {
2059 Ok(request) => request,
2060 Err(err) => {
2061 return invalid_params(response_id, format!("invalid send params: {err}"));
2062 }
2063 };
2064 let Some(aggregator) = &console_aggregator else {
2065 return console_aggregator_unavailable(response_id);
2066 };
2067 match aggregator.send(send_request).await {
2068 Ok(accepted) => response_value(
2069 response_id,
2070 Some(serde_json::to_value(accepted).unwrap_or(Value::Null)),
2071 None,
2072 ),
2073 Err(err) => response_value(
2074 response_id,
2075 None,
2076 Some(JsonRpcError {
2077 code: console_send_rpc_code(&err),
2078 message: err.to_string(),
2079 data: None,
2080 }),
2081 ),
2082 }
2083 }
2084 "mobkit/retire" => {
2085 let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2086 return invalid_params(response_id, "identity required");
2087 };
2088 let Some(aggregator) = &console_aggregator else {
2089 return console_aggregator_unavailable(response_id);
2090 };
2091 match aggregator.retire_identity(identity).await {
2092 Ok(true) => {
2093 response_value(response_id, Some(json!({ "identity": identity })), None)
2094 }
2095 Ok(false) => response_value(
2096 response_id,
2097 None,
2098 Some(JsonRpcError {
2099 code: -32001,
2100 message: format!("unknown identity: {identity}"),
2101 data: None,
2102 }),
2103 ),
2104 Err(err) => internal_error(response_id, format!("retire failed: {err}")),
2105 }
2106 }
2107 "mobkit/reset_all" => {
2108 let Some(aggregator) = &console_aggregator else {
2109 return console_aggregator_unavailable(response_id);
2110 };
2111 match aggregator.list_identities_fresh().await {
2112 Ok(identities) => {
2113 let mut retired = Vec::new();
2114 let mut failed = Vec::new();
2115 for identity in identities {
2116 match aggregator.retire_identity(&identity.identity).await {
2117 Ok(true) => retired.push(identity.identity),
2118 Ok(false) => failed.push(json!({
2119 "identity": identity.identity,
2120 "error": "unknown identity",
2121 })),
2122 Err(err) => failed.push(json!({
2123 "identity": identity.identity,
2124 "error": err.to_string(),
2125 })),
2126 }
2127 }
2128 if let Err(err) = aggregator.clear_timeline_frames().await {
2129 failed.push(json!({
2130 "identity": "_console_timeline",
2131 "error": err.to_string(),
2132 }));
2133 }
2134 response_value(
2135 response_id,
2136 Some(json!({
2137 "retired": retired,
2138 "failed": failed,
2139 })),
2140 None,
2141 )
2142 }
2143 Err(err) => internal_error(response_id, format!("reset_all failed: {err}")),
2144 }
2145 }
2146 _ => response_value(
2147 response_id,
2148 None,
2149 Some(JsonRpcError {
2150 code: -32601,
2151 message: "Method not found".to_string(),
2152 data: None,
2153 }),
2154 ),
2155 }
2156}
2157
2158fn console_aggregator_unavailable(response_id: Value) -> Value {
2159 response_value(
2160 response_id,
2161 None,
2162 Some(JsonRpcError {
2163 code: -32004,
2164 message: "console aggregator unavailable".to_string(),
2165 data: None,
2166 }),
2167 )
2168}
2169
2170#[allow(clippy::too_many_arguments)]
2171async fn handle_console_runtime_rpc(
2172 runtime: &MobRuntime,
2173 module_runtime: Option<std::sync::Arc<tokio::sync::Mutex<MobkitRuntimeHandle>>>,
2174 contact_directory: Option<&ContactDirectory>,
2175 gateway_peer_keys: Option<&crate::auth::peer_keys::GatewayPeerKeys>,
2176 console_events: Option<ConsoleEventStore>,
2177 console_aggregator: Option<MobKitConsoleAggregator>,
2178 identity_runtime: Option<Arc<crate::identity_first::IdentityRuntime>>,
2179 metadata_table: Option<std::sync::Arc<RuntimeMetadataTable>>,
2180 mob_events: Option<MobEventsStore>,
2181 request: JsonRpcRequest,
2182 is_authenticated: bool,
2183) -> Value {
2184 let response_id = request.id.clone().unwrap_or(Value::Null);
2185
2186 match request.method.as_str() {
2187 "mobkit/capabilities" => {
2188 let mut methods = vec![
2189 "mobkit/status",
2190 "mobkit/capabilities",
2191 "mobkit/list_members",
2192 "mobkit/get_member",
2193 "mobkit/find_members",
2194 "mobkit/member_status",
2195 "mobkit/collect_completed",
2196 "mobkit/blob/get",
2197 "mobkit/wait_ready",
2198 "mobkit/flow_status",
2199 "mobkit/list_flows",
2200 "mobkit/list_runs",
2201 "mobkit/console/list_identities",
2202 "mobkit/console/inspect_identity",
2203 "mobkit/console/query_timeline",
2204 "mobkit/mob_events/query",
2205 "mobkit/mob_events/subscribe",
2206 "mobkit/cross_mob/peer_info",
2207 "mobkit/cross_mob/directory",
2208 "mobkit/peer_pubkey",
2209 ];
2210 if module_runtime.is_some() {
2211 methods.extend_from_slice(&[
2212 "mobkit/status_identity",
2213 "mobkit/inspect_identity",
2214 "mobkit/respawn",
2215 "mobkit/reset",
2216 "mobkit/routing/routes/list",
2217 "mobkit/delivery/history",
2218 "mobkit/gating/pending",
2219 "mobkit/gating/audit",
2220 "mobkit/gating/decide",
2221 ]);
2222 }
2223 if is_authenticated {
2224 methods.extend_from_slice(&[
2225 "mobkit/retire",
2226 "mobkit/reset_all",
2227 "mobkit/console/send",
2228 "mobkit/blob/upload",
2229 "mobkit/ensure_member",
2230 "mobkit/retire_member",
2231 "mobkit/respawn_member",
2232 "mobkit/force_cancel_member",
2233 "mobkit/cancel_flow",
2234 "mobkit/run_flow",
2235 "mobkit/spawn_helper",
2236 "mobkit/fork_helper",
2237 "mobkit/attach_existing_session",
2238 "mobkit/reconcile_edges",
2239 "mobkit/cross_mob/wire_local",
2240 "mobkit/cross_mob/unwire_local",
2241 ]);
2242 }
2243 if metadata_table.is_some() {
2244 methods.extend_from_slice(&["mobkit/mob_labels/get", "mobkit/run_labels/get"]);
2245 if is_authenticated {
2246 methods.extend_from_slice(&[
2247 "mobkit/mob_labels/set",
2248 "mobkit/mob_labels/delete",
2249 "mobkit/run_labels/set",
2250 "mobkit/run_labels/delete",
2251 ]);
2252 }
2253 }
2254 response_value(
2255 response_id,
2256 Some(serde_json::json!({
2257 "contract_version": crate::rpc::MOBKIT_CONTRACT_VERSION,
2258 "methods": methods,
2259 "loaded_modules": serde_json::json!([]),
2262 "runtime_capabilities": {
2263 "can_send_messages": is_authenticated,
2264 "can_retire_members": is_authenticated,
2265 "can_spawn_members": is_authenticated,
2266 }
2267 })),
2268 None,
2269 )
2270 }
2271 "mobkit/status" => {
2272 let mob_state = runtime.handle().status().await.ok();
2273 response_value(
2274 response_id,
2275 Some(serde_json::json!({
2276 "contract_version": crate::rpc::MOBKIT_CONTRACT_VERSION,
2277 "running": matches!(mob_state, Some(MobState::Creating | MobState::Running)),
2278 "loaded_modules": serde_json::json!([]),
2281 })),
2282 None,
2283 )
2284 }
2285 "mobkit/console/list_identities" => {
2286 let Some(aggregator) = &console_aggregator else {
2287 return response_value(
2288 response_id,
2289 None,
2290 Some(JsonRpcError {
2291 code: -32004,
2292 message: "console aggregator unavailable".to_string(),
2293 data: None,
2294 }),
2295 );
2296 };
2297 match aggregator.list_identities().await {
2298 Ok(identities) => {
2299 response_value(response_id, Some(json!({ "identities": identities })), None)
2300 }
2301 Err(err) => internal_error(response_id, format!("list_identities failed: {err}")),
2302 }
2303 }
2304 "mobkit/console/inspect_identity" => {
2305 let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2306 return invalid_params(response_id, "identity required");
2307 };
2308 let Some(aggregator) = &console_aggregator else {
2309 return response_value(
2310 response_id,
2311 None,
2312 Some(JsonRpcError {
2313 code: -32004,
2314 message: "console aggregator unavailable".to_string(),
2315 data: None,
2316 }),
2317 );
2318 };
2319 match aggregator.inspect_identity(identity).await {
2320 Ok(Some(inspection)) => response_value(
2321 response_id,
2322 Some(serde_json::to_value(inspection).unwrap_or(Value::Null)),
2323 None,
2324 ),
2325 Ok(None) => response_value(
2326 response_id,
2327 None,
2328 Some(JsonRpcError {
2329 code: -32001,
2330 message: format!("unknown identity: {identity}"),
2331 data: None,
2332 }),
2333 ),
2334 Err(err) => internal_error(response_id, format!("inspect_identity failed: {err}")),
2335 }
2336 }
2337 "mobkit/console/query_timeline" => {
2338 let query: ConsoleTimelineQuery = match serde_json::from_value(request.params.clone()) {
2339 Ok(query) => query,
2340 Err(err) => {
2341 return invalid_params(response_id, format!("invalid query params: {err}"));
2342 }
2343 };
2344 let Some(aggregator) = &console_aggregator else {
2345 return response_value(
2346 response_id,
2347 None,
2348 Some(JsonRpcError {
2349 code: -32004,
2350 message: "console aggregator unavailable".to_string(),
2351 data: None,
2352 }),
2353 );
2354 };
2355 match aggregator.query_timeline(query).await {
2356 Ok(page) => response_value(
2357 response_id,
2358 Some(serde_json::to_value(page).unwrap_or(Value::Null)),
2359 None,
2360 ),
2361 Err(err) => response_value(
2362 response_id,
2363 None,
2364 Some(JsonRpcError {
2365 code: -32010,
2366 message: format!("query_timeline failed: {err}"),
2367 data: Some(json!({ "kind": "replay_unavailable" })),
2368 }),
2369 ),
2370 }
2371 }
2372 "mobkit/console/send" => {
2373 let send_request: ConsoleSendRequest =
2374 match serde_json::from_value(request.params.clone()) {
2375 Ok(request) => request,
2376 Err(err) => {
2377 return invalid_params(response_id, format!("invalid send params: {err}"));
2378 }
2379 };
2380 let Some(aggregator) = &console_aggregator else {
2381 return response_value(
2382 response_id,
2383 None,
2384 Some(JsonRpcError {
2385 code: -32004,
2386 message: "console aggregator unavailable".to_string(),
2387 data: None,
2388 }),
2389 );
2390 };
2391 if let Some(identity_runtime) = &identity_runtime {
2392 return match console_send_identity_first(
2393 aggregator,
2394 identity_runtime,
2395 console_events.as_ref(),
2396 send_request,
2397 )
2398 .await
2399 {
2400 Ok(accepted) => response_value(
2401 response_id,
2402 Some(serde_json::to_value(accepted).unwrap_or(Value::Null)),
2403 None,
2404 ),
2405 Err(err) => response_value(
2406 response_id,
2407 None,
2408 Some(JsonRpcError {
2409 code: console_send_rpc_code(&err),
2410 message: err.to_string(),
2411 data: None,
2412 }),
2413 ),
2414 };
2415 }
2416 match aggregator.send(send_request).await {
2417 Ok(accepted) => response_value(
2418 response_id,
2419 Some(serde_json::to_value(accepted).unwrap_or(Value::Null)),
2420 None,
2421 ),
2422 Err(err) => response_value(
2423 response_id,
2424 None,
2425 Some(JsonRpcError {
2426 code: console_send_rpc_code(&err),
2427 message: err.to_string(),
2428 data: None,
2429 }),
2430 ),
2431 }
2432 }
2433 "mobkit/blob/get" => {
2434 let Some(blob_id) = request
2435 .params
2436 .get("blob_id")
2437 .or_else(|| request.params.get("id"))
2438 .and_then(Value::as_str)
2439 else {
2440 return invalid_params(response_id, "blob_id required");
2441 };
2442 if !is_valid_blob_id_value(blob_id) {
2443 return invalid_params(response_id, "invalid blob_id");
2444 }
2445 let Some(store) = runtime.binary_blob_store() else {
2446 return internal_error(response_id, "binary blob store unavailable");
2447 };
2448 match store.get_bytes(&meerkat_core::BlobId::from(blob_id)).await {
2449 Ok(payload) => response_value(
2450 response_id,
2451 Some(serde_json::json!({
2452 "blob_id": payload.blob_id,
2453 "media_type": payload.media_type,
2454 "size": payload.size,
2455 "data": base64::engine::general_purpose::STANDARD.encode(payload.data.as_ref()),
2456 })),
2457 None,
2458 ),
2459 Err(meerkat_core::BlobStoreError::NotFound(_)) => response_value(
2460 response_id,
2461 None,
2462 Some(JsonRpcError {
2463 code: -32001,
2464 message: format!("blob not found: {blob_id}"),
2465 data: Some(json!({ "kind": "not_found", "blob_id": blob_id })),
2466 }),
2467 ),
2468 Err(err) => internal_error(response_id, format!("blob get failed: {err}")),
2469 }
2470 }
2471 "mobkit/list_members" => {
2472 let handle = runtime.handle();
2473 let entries = handle.list_members_including_retiring().await;
2474 let mut members = Vec::with_capacity(entries.len());
2475 for entry in &entries {
2476 members.push(member_entry_to_console_json(runtime, entry).await);
2477 }
2478 response_value(response_id, Some(Value::Array(members)), None)
2479 }
2480 "mobkit/get_member" => {
2481 let Some(member_id) = request.params.get("member_id").and_then(Value::as_str) else {
2482 return invalid_params(response_id, "member_id required");
2483 };
2484 let handle = runtime.handle();
2485 let identity = MeerkatId::from(member_id);
2486 let entries = handle.list_members_including_retiring().await;
2487 match entries.into_iter().find(|e| e.agent_identity == identity) {
2488 Some(entry) => response_value(
2489 response_id,
2490 Some(member_entry_to_console_json(runtime, &entry).await),
2491 None,
2492 ),
2493 None => invalid_params(response_id, format!("member not found: {member_id}")),
2494 }
2495 }
2496 "mobkit/find_members" => {
2497 let Some(label_key) = request.params.get("label_key").and_then(Value::as_str) else {
2498 return invalid_params(response_id, "label_key required");
2499 };
2500 let Some(label_value) = request.params.get("label_value").and_then(Value::as_str)
2501 else {
2502 return invalid_params(response_id, "label_value required");
2503 };
2504 let handle = runtime.handle();
2505 let filter = MemberFilter {
2506 labels: std::collections::BTreeMap::from([(
2507 label_key.to_string(),
2508 label_value.to_string(),
2509 )]),
2510 role: None,
2511 state: None,
2512 };
2513 let entries = handle.list_members_matching(filter).await;
2514 let mut matches = Vec::with_capacity(entries.len());
2515 for entry in &entries {
2516 matches.push(member_entry_to_console_json(runtime, entry).await);
2517 }
2518 response_value(response_id, Some(Value::Array(matches)), None)
2519 }
2520 "mobkit/status_identity" => {
2521 let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2522 return invalid_params(response_id, "identity required");
2523 };
2524 let handle = runtime.handle();
2525 let mid = MeerkatId::from(identity);
2526 let Some((member, session_id)) = lookup_member_with_session(&handle, &mid).await else {
2527 return invalid_params(response_id, format!("identity not found: {identity}"));
2528 };
2529 let phase = if let Some(store) = &console_events {
2530 store.response_phase_for_identity(identity).await
2531 } else {
2532 None
2533 };
2534 response_value(
2535 response_id,
2536 Some(console_identity_status_json(&member, session_id, phase)),
2537 None,
2538 )
2539 }
2540 "mobkit/inspect_identity" => {
2541 let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2542 return invalid_params(response_id, "identity required");
2543 };
2544 let handle = runtime.handle();
2545 let mid = MeerkatId::from(identity);
2546 let Some((member, session_id)) = lookup_member_with_session(&handle, &mid).await else {
2547 return invalid_params(response_id, format!("identity not found: {identity}"));
2548 };
2549 let phase = if let Some(store) = &console_events {
2550 store.response_phase_for_identity(identity).await
2551 } else {
2552 None
2553 };
2554 response_value(
2555 response_id,
2556 Some(console_identity_inspect_json(&member, session_id, phase)),
2557 None,
2558 )
2559 }
2560 "mobkit/retire" => {
2561 let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2562 return invalid_params(response_id, "identity required");
2563 };
2564 if let Some(aggregator) = &console_aggregator {
2565 return match aggregator.retire_identity(identity).await {
2566 Ok(true) => {
2567 if let Some(store) = &console_events {
2568 store
2569 .record_lifecycle(identity, "identity_retired", json!({}))
2570 .await;
2571 }
2572 response_value(response_id, Some(json!({ "identity": identity })), None)
2573 }
2574 Ok(false) => response_value(
2575 response_id,
2576 None,
2577 Some(JsonRpcError {
2578 code: -32001,
2579 message: format!("unknown identity: {identity}"),
2580 data: None,
2581 }),
2582 ),
2583 Err(err) => internal_error(response_id, format!("retire failed: {err}")),
2584 };
2585 }
2586 match runtime.handle().retire(MeerkatId::from(identity)).await {
2587 Ok(()) => {
2588 if let Some(store) = &console_events {
2589 store
2590 .record_lifecycle(identity, "identity_retired", json!({}))
2591 .await;
2592 }
2593 response_value(response_id, Some(json!({ "identity": identity })), None)
2594 }
2595 Err(err) => internal_error(response_id, format!("retire failed: {err}")),
2596 }
2597 }
2598 "mobkit/respawn" => {
2599 let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2600 return invalid_params(response_id, "identity required");
2601 };
2602 let handle = runtime.handle();
2603 let mid = MeerkatId::from(identity);
2604 match handle.respawn(mid.clone(), None).await {
2605 Ok(_receipt) => {
2606 if let Some(store) = &console_events {
2607 store
2608 .record_lifecycle(identity, "identity_respawned", json!({}))
2609 .await;
2610 }
2611 let body = match lookup_member_with_session(&handle, &mid).await {
2612 Some((entry, session_id)) => {
2613 console_identity_status_json(&entry, session_id, None)
2614 }
2615 None => json!({ "identity": identity }),
2616 };
2617 response_value(response_id, Some(body), None)
2618 }
2619 Err(err) => internal_error(response_id, format!("respawn failed: {err}")),
2620 }
2621 }
2622 "mobkit/reset" => {
2623 let Some(identity) = request.params.get("identity").and_then(Value::as_str) else {
2624 return invalid_params(response_id, "identity required");
2625 };
2626 let handle = runtime.handle();
2627 let mid = MeerkatId::from(identity);
2628 match handle.respawn(mid.clone(), None).await {
2629 Ok(_receipt) => {
2630 if let Some(store) = &console_events {
2631 store
2632 .record_lifecycle(identity, "identity_reset", json!({}))
2633 .await;
2634 }
2635 let body = match lookup_member_with_session(&handle, &mid).await {
2636 Some((entry, session_id)) => {
2637 console_identity_status_json(&entry, session_id, None)
2638 }
2639 None => json!({ "identity": identity }),
2640 };
2641 response_value(response_id, Some(body), None)
2642 }
2643 Err(err) => internal_error(response_id, format!("reset failed: {err}")),
2644 }
2645 }
2646 "mobkit/reset_all" => {
2647 match Box::pin(reset_all_live_console_agents(
2648 runtime,
2649 console_events.as_ref(),
2650 console_aggregator.as_ref(),
2651 ))
2652 .await
2653 {
2654 Ok(body) => response_value(response_id, Some(body), None),
2655 Err(err) => internal_error(response_id, format!("reset_all failed: {err}")),
2656 }
2657 }
2658 "mobkit/routing/routes/list" => {
2659 let Some(module_runtime) = &module_runtime else {
2660 return response_value(
2661 response_id,
2662 None,
2663 Some(JsonRpcError {
2664 code: -32601,
2665 message: "Method not found".to_string(),
2666 data: None,
2667 }),
2668 );
2669 };
2670 let routes = module_runtime.lock().await.list_runtime_routes();
2671 response_value(response_id, Some(json!({ "routes": routes })), None)
2672 }
2673 "mobkit/delivery/history" => {
2674 let Some(module_runtime) = &module_runtime else {
2675 return response_value(
2676 response_id,
2677 None,
2678 Some(JsonRpcError {
2679 code: -32601,
2680 message: "Method not found".to_string(),
2681 data: None,
2682 }),
2683 );
2684 };
2685 let limit = request
2686 .params
2687 .get("limit")
2688 .and_then(Value::as_u64)
2689 .unwrap_or(50) as usize;
2690 let history = module_runtime
2691 .lock()
2692 .await
2693 .delivery_history(DeliveryHistoryRequest {
2694 recipient: None,
2695 sink: None,
2696 limit,
2697 });
2698 response_value(
2699 response_id,
2700 Some(serde_json::to_value(history).unwrap_or(Value::Null)),
2701 None,
2702 )
2703 }
2704 "mobkit/gating/pending" => {
2705 let Some(module_runtime) = &module_runtime else {
2706 return response_value(
2707 response_id,
2708 None,
2709 Some(JsonRpcError {
2710 code: -32601,
2711 message: "Method not found".to_string(),
2712 data: None,
2713 }),
2714 );
2715 };
2716 let pending = module_runtime.lock().await.list_gating_pending();
2717 response_value(response_id, Some(json!({ "pending": pending })), None)
2718 }
2719 "mobkit/gating/audit" => {
2720 let Some(module_runtime) = &module_runtime else {
2721 return response_value(
2722 response_id,
2723 None,
2724 Some(JsonRpcError {
2725 code: -32601,
2726 message: "Method not found".to_string(),
2727 data: None,
2728 }),
2729 );
2730 };
2731 let limit = request
2732 .params
2733 .get("limit")
2734 .and_then(Value::as_u64)
2735 .unwrap_or(50) as usize;
2736 let entries = module_runtime.lock().await.gating_audit_entries(limit);
2737 response_value(response_id, Some(json!({ "entries": entries })), None)
2738 }
2739 "mobkit/gating/decide" => {
2740 let Some(module_runtime) = &module_runtime else {
2741 return response_value(
2742 response_id,
2743 None,
2744 Some(JsonRpcError {
2745 code: -32601,
2746 message: "Method not found".to_string(),
2747 data: None,
2748 }),
2749 );
2750 };
2751 let Some(pending_id) = request.params.get("pending_id").and_then(Value::as_str) else {
2752 return invalid_params(response_id, "pending_id required");
2753 };
2754 let Some(approver_id) = request.params.get("approver_id").and_then(Value::as_str)
2755 else {
2756 return invalid_params(response_id, "approver_id required");
2757 };
2758 let Some(raw_decision) = request.params.get("decision").and_then(Value::as_str) else {
2759 return invalid_params(response_id, "decision required");
2760 };
2761 let decision = match raw_decision {
2762 "approve" => GatingDecision::Approve,
2763 "reject" | "deny" => GatingDecision::Reject,
2764 "escalate" => GatingDecision::Escalate,
2765 _ => {
2766 return invalid_params(
2767 response_id,
2768 format!("unsupported decision: {raw_decision}"),
2769 );
2770 }
2771 };
2772 let reason = request
2773 .params
2774 .get("reason")
2775 .and_then(Value::as_str)
2776 .map(ToString::to_string);
2777 match module_runtime
2778 .lock()
2779 .await
2780 .decide_gating_action(GatingDecideRequest {
2781 pending_id: pending_id.to_string(),
2782 approver_id: approver_id.to_string(),
2783 decision,
2784 reason,
2785 }) {
2786 Ok(result) => response_value(
2787 response_id,
2788 Some(serde_json::to_value(result).unwrap_or(Value::Null)),
2789 None,
2790 ),
2791 Err(err) => invalid_params(response_id, format!("gating decision failed: {err}")),
2792 }
2793 }
2794 "mobkit/ensure_member" => {
2795 let Some(role) = request.params.get("role").and_then(Value::as_str) else {
2796 return invalid_params(response_id, "role required");
2797 };
2798 let Some(agent_identity) = request.params.get("agent_identity").and_then(Value::as_str)
2799 else {
2800 return invalid_params(response_id, "agent_identity required");
2801 };
2802 let labels = match request.params.get("labels") {
2803 None | Some(Value::Null) => std::collections::BTreeMap::new(),
2804 Some(value) => match serde_json::from_value(value.clone()) {
2805 Ok(map) => map,
2806 Err(err) => {
2807 return invalid_params(response_id, format!("invalid labels: {err}"));
2808 }
2809 },
2810 };
2811 let context = request.params.get("context").cloned();
2812 let resume_session_id = match request.params.get("resume_session_id") {
2813 None => None,
2814 Some(Value::Null) => None,
2815 Some(v) => match v.as_str() {
2816 Some(s) => match meerkat_core::types::SessionId::parse(s) {
2817 Ok(sid) => Some(sid),
2818 Err(_) => {
2819 return invalid_params(
2820 response_id,
2821 format!("invalid resume_session_id: {s:?}"),
2822 );
2823 }
2824 },
2825 None => {
2826 return invalid_params(
2827 response_id,
2828 "resume_session_id must be a string".to_string(),
2829 );
2830 }
2831 },
2832 };
2833 let additional_instructions = match request.params.get("additional_instructions") {
2834 None | Some(Value::Null) => None,
2835 Some(Value::Array(arr)) => {
2836 let mut strs = Vec::with_capacity(arr.len());
2837 for (i, entry) in arr.iter().enumerate() {
2838 match entry.as_str() {
2839 Some(s) => strs.push(s.to_string()),
2840 None => {
2841 return invalid_params(
2842 response_id,
2843 format!("additional_instructions[{i}] must be a string"),
2844 );
2845 }
2846 }
2847 }
2848 if strs.is_empty() { None } else { Some(strs) }
2849 }
2850 Some(_) => {
2851 return invalid_params(
2852 response_id,
2853 "additional_instructions must be an array of strings",
2854 );
2855 }
2856 };
2857 let mut spec =
2858 SpawnMemberSpec::new(ProfileName::from(role), MeerkatId::from(agent_identity));
2859 if !labels.is_empty() {
2860 spec = spec.with_labels(labels);
2861 }
2862 if let Some(ctx) = context {
2863 spec = spec.with_context(ctx);
2864 }
2865 if let Some(sid) = resume_session_id {
2866 spec = spec.with_resume_bridge_session_id(sid);
2867 }
2868 if let Some(instructions) = additional_instructions {
2869 spec = spec.with_additional_instructions(instructions);
2870 }
2871 let handle = runtime.handle();
2872 let mid = spec.identity.clone();
2873 match handle.ensure_member(spec).await {
2874 Ok(_outcome) => {
2875 let body = match lookup_member_with_session(&handle, &mid).await {
2876 Some((entry, _sid)) => member_entry_to_json(&entry),
2877 None => Value::Null,
2878 };
2879 response_value(response_id, Some(body), None)
2880 }
2881 Err(err) => internal_error(response_id, format!("ensure_member failed: {err}")),
2882 }
2883 }
2884 "mobkit/retire_member" => {
2885 let Some(member_id) = request.params.get("member_id").and_then(Value::as_str) else {
2886 return invalid_params(response_id, "member_id required");
2887 };
2888 if let Some(aggregator) = &console_aggregator {
2889 return match aggregator.retire_identity(member_id).await {
2890 Ok(true) => response_value(
2891 response_id,
2892 Some(serde_json::json!({ "accepted": true })),
2893 None,
2894 ),
2895 Ok(false) => response_value(
2896 response_id,
2897 None,
2898 Some(JsonRpcError {
2899 code: -32001,
2900 message: format!("unknown identity: {member_id}"),
2901 data: None,
2902 }),
2903 ),
2904 Err(err) => internal_error(response_id, format!("retire_member failed: {err}")),
2905 };
2906 }
2907 match runtime.handle().retire(MeerkatId::from(member_id)).await {
2908 Ok(()) => response_value(
2909 response_id,
2910 Some(serde_json::json!({ "accepted": true })),
2911 None,
2912 ),
2913 Err(err) => internal_error(response_id, format!("retire_member failed: {err}")),
2914 }
2915 }
2916 "mobkit/respawn_member" => {
2917 let Some(member_id) = request.params.get("member_id").and_then(Value::as_str) else {
2918 return invalid_params(response_id, "member_id required");
2919 };
2920 match runtime
2921 .handle()
2922 .respawn(MeerkatId::from(member_id), None)
2923 .await
2924 {
2925 Ok(_receipt) => response_value(
2926 response_id,
2927 Some(serde_json::json!({ "accepted": true })),
2928 None,
2929 ),
2930 Err(err) => internal_error(response_id, format!("respawn_member failed: {err}")),
2931 }
2932 }
2933 "mobkit/reconcile_edges" => response_value(
2934 response_id,
2935 Some(serde_json::json!({
2936 "status": "noop",
2937 "reason": "console runtime routes directly to MobRuntime",
2938 })),
2939 None,
2940 ),
2941 "mobkit/mob_events/query" | "mobkit/mob_events/subscribe" => {
2942 let query: EventQuery = if request.params.is_null() {
2943 EventQuery::default()
2944 } else {
2945 match serde_json::from_value(request.params.clone()) {
2946 Ok(q) => q,
2947 Err(err) => {
2948 return invalid_params(response_id, format!("invalid query params: {err}"));
2949 }
2950 }
2951 };
2952 let Some(store) = mob_events.as_ref() else {
2953 return response_value(
2954 response_id,
2955 Some(serde_json::json!({
2956 "events": [],
2957 "next_after_seq": Value::Null,
2958 })),
2959 None,
2960 );
2961 };
2962 let events_view = runtime.handle().events();
2963 let latest_at_handshake = events_view.latest_cursor().await.unwrap_or(0);
2967 let result = crate::unified_runtime::mob_events::query_ledger_with_filter(
2968 &events_view,
2969 store,
2970 &query,
2971 )
2972 .await;
2973 match result {
2974 Ok(events) => {
2975 let last_cursor = events.last().map(|event| event.cursor);
2976 let body = if request.method == "mobkit/mob_events/subscribe" {
2977 let subscribe_url = crate::unified_runtime::mob_events::build_subscribe_url(
2978 &query,
2979 last_cursor,
2980 latest_at_handshake,
2981 );
2982 serde_json::json!({
2983 "stream": "mob_events",
2984 "events": events,
2985 "next_after_seq": last_cursor,
2986 "subscribe_url": subscribe_url,
2987 "keep_alive": {
2988 "interval_ms": 15_000_u64,
2989 "event": "keep_alive",
2990 },
2991 })
2992 } else {
2993 serde_json::json!({
2994 "events": events,
2995 "next_after_seq": last_cursor,
2996 })
2997 };
2998 response_value(response_id, Some(body), None)
2999 }
3000 Err(crate::unified_runtime::mob_events::MobEventsQueryError::Stale {
3001 after_cursor,
3002 latest_cursor,
3003 }) => stale_event_cursor_response(response_id, after_cursor, latest_cursor),
3004 Err(err) => internal_error(response_id, format!("mob_events query failed: {err}")),
3005 }
3006 }
3007 "mobkit/member_status" => {
3009 let Some(member_id) = request.params.get("member_id").and_then(Value::as_str) else {
3010 return invalid_params(response_id, "member_id required");
3011 };
3012 match runtime
3013 .handle()
3014 .member_status(&MeerkatId::from(member_id))
3015 .await
3016 {
3017 Ok(snapshot) => response_value(
3018 response_id,
3019 Some(serde_json::to_value(&snapshot).unwrap_or(Value::Null)),
3020 None,
3021 ),
3022 Err(err) => internal_error(response_id, format!("member_status failed: {err}")),
3023 }
3024 }
3025 "mobkit/force_cancel_member" => {
3026 let Some(member_id) = request.params.get("member_id").and_then(Value::as_str) else {
3027 return invalid_params(response_id, "member_id required");
3028 };
3029 match runtime
3030 .handle()
3031 .force_cancel_member(MeerkatId::from(member_id))
3032 .await
3033 {
3034 Ok(()) => response_value(
3035 response_id,
3036 Some(serde_json::json!({ "accepted": true })),
3037 None,
3038 ),
3039 Err(err) => {
3040 internal_error(response_id, format!("force_cancel_member failed: {err}"))
3041 }
3042 }
3043 }
3044 "mobkit/wait_ready" => {
3045 let timeout = request
3046 .params
3047 .get("timeout_ms")
3048 .and_then(Value::as_u64)
3049 .map(std::time::Duration::from_millis);
3050 match runtime.handle().wait_for_ready(timeout).await {
3051 Ok(ready) => {
3052 let entries: Vec<Value> = ready
3053 .into_iter()
3054 .map(|(identity, snapshot)| {
3055 serde_json::json!({
3056 "agent_identity": identity.to_string(),
3057 "snapshot": serde_json::to_value(&snapshot)
3058 .unwrap_or(Value::Null),
3059 })
3060 })
3061 .collect();
3062 response_value(
3063 response_id,
3064 Some(serde_json::json!({
3065 "ready": entries,
3066 "timeout": false,
3067 })),
3068 None,
3069 )
3070 }
3071 Err(err) => {
3072 let message = err.to_string();
3073 if message.to_lowercase().contains("timeout") {
3074 response_value(
3075 response_id,
3076 Some(serde_json::json!({
3077 "ready": Vec::<Value>::new(),
3078 "timeout": true,
3079 })),
3080 None,
3081 )
3082 } else {
3083 internal_error(response_id, format!("wait_for_ready failed: {message}"))
3084 }
3085 }
3086 }
3087 }
3088 "mobkit/collect_completed" => {
3089 let completed = runtime.handle().collect_completed().await;
3090 let entries: Vec<Value> = completed
3091 .into_iter()
3092 .map(|(mid, snapshot)| {
3093 serde_json::json!({
3094 "member_id": mid.to_string(),
3095 "snapshot": serde_json::to_value(&snapshot).unwrap_or(Value::Null),
3096 })
3097 })
3098 .collect();
3099 response_value(
3100 response_id,
3101 Some(serde_json::json!({ "completed": entries })),
3102 None,
3103 )
3104 }
3105 "mobkit/cancel_flow" => {
3106 let Some(run_id) = request.params.get("run_id").and_then(Value::as_str) else {
3107 return invalid_params(response_id, "run_id required");
3108 };
3109 let run_id: meerkat_mob::RunId = match run_id.parse() {
3110 Ok(id) => id,
3111 Err(_) => return invalid_params(response_id, "invalid run_id format"),
3112 };
3113 match runtime.handle().cancel_flow(run_id).await {
3114 Ok(()) => response_value(
3115 response_id,
3116 Some(serde_json::json!({ "accepted": true })),
3117 None,
3118 ),
3119 Err(err) => internal_error(response_id, format!("cancel_flow failed: {err}")),
3120 }
3121 }
3122 "mobkit/flow_status" => {
3123 let Some(run_id) = request.params.get("run_id").and_then(Value::as_str) else {
3124 return invalid_params(response_id, "run_id required");
3125 };
3126 let run_id: meerkat_mob::RunId = match run_id.parse() {
3127 Ok(id) => id,
3128 Err(_) => return invalid_params(response_id, "invalid run_id format"),
3129 };
3130 match runtime.handle().flow_status(run_id).await {
3131 Ok(Some(mob_run)) => response_value(
3132 response_id,
3133 Some(serde_json::to_value(&mob_run).unwrap_or(Value::Null)),
3134 None,
3135 ),
3136 Ok(None) => response_value(response_id, Some(Value::Null), None),
3137 Err(err) => internal_error(response_id, format!("flow_status failed: {err}")),
3138 }
3139 }
3140 "mobkit/list_flows" => {
3141 let flows: Vec<String> = runtime
3142 .handle()
3143 .list_flows()
3144 .into_iter()
3145 .map(|id| id.to_string())
3146 .collect();
3147 response_value(
3148 response_id,
3149 Some(serde_json::json!({ "flows": flows })),
3150 None,
3151 )
3152 }
3153 "mobkit/list_runs" => {
3154 let flow_id = request
3155 .params
3156 .get("flow_id")
3157 .and_then(Value::as_str)
3158 .filter(|value| !value.is_empty())
3159 .map(meerkat_mob::FlowId::from);
3160 match runtime.handle().list_runs(flow_id.as_ref()).await {
3161 Ok(runs) => response_value(
3162 response_id,
3163 Some(serde_json::json!({
3164 "runs": serde_json::to_value(&runs).unwrap_or(Value::Null),
3165 })),
3166 None,
3167 ),
3168 Err(err) => internal_error(response_id, format!("list_runs failed: {err}")),
3169 }
3170 }
3171 "mobkit/run_flow" => {
3172 let Some(flow_id_str) = request.params.get("flow_id").and_then(Value::as_str) else {
3173 return invalid_params(response_id, "flow_id required");
3174 };
3175 if flow_id_str.is_empty() {
3176 return invalid_params(response_id, "flow_id required");
3177 }
3178 let flow_id = meerkat_mob::FlowId::from(flow_id_str);
3179 let flow_params = request.params.get("params").cloned().unwrap_or(Value::Null);
3180 match runtime.handle().run_flow(flow_id, flow_params).await {
3181 Ok(run_id) => response_value(
3182 response_id,
3183 Some(serde_json::json!({ "run_id": run_id.to_string() })),
3184 None,
3185 ),
3186 Err(err) => invalid_params(response_id, format!("run_flow failed: {err}")),
3187 }
3188 }
3189 "mobkit/spawn_helper" => {
3190 let Some(agent_identity) = request.params.get("agent_identity").and_then(Value::as_str)
3191 else {
3192 return invalid_params(response_id, "agent_identity required");
3193 };
3194 let Some(task) = request.params.get("task").and_then(Value::as_str) else {
3195 return invalid_params(response_id, "task required");
3196 };
3197 let options = match parse_console_helper_options(request.params.get("options")) {
3198 Ok(opts) => opts,
3199 Err(msg) => return invalid_params(response_id, msg),
3200 };
3201 let handle = runtime.handle();
3202 match handle
3203 .spawn_helper(MeerkatId::from(agent_identity), task, options)
3204 .await
3205 {
3206 Ok(result) => {
3207 response_value(
3212 response_id,
3213 Some(serde_json::json!({
3214 "output": result.output,
3215 "tokens_used": result.tokens_used,
3216 })),
3217 None,
3218 )
3219 }
3220 Err(err) => internal_error(response_id, format!("spawn_helper failed: {err}")),
3221 }
3222 }
3223 "mobkit/fork_helper" => {
3224 let Some(source) = request
3225 .params
3226 .get("source_member_id")
3227 .and_then(Value::as_str)
3228 else {
3229 return invalid_params(response_id, "source_member_id required");
3230 };
3231 let Some(agent_identity) = request.params.get("agent_identity").and_then(Value::as_str)
3232 else {
3233 return invalid_params(response_id, "agent_identity required");
3234 };
3235 let Some(task) = request.params.get("task").and_then(Value::as_str) else {
3236 return invalid_params(response_id, "task required");
3237 };
3238 let fork_context = match request.params.get("fork_context") {
3239 Some(v) if !v.is_null() => {
3240 match serde_json::from_value::<meerkat_mob::launch::ForkContext>(v.clone()) {
3241 Ok(ctx) => ctx,
3242 Err(err) => {
3243 return invalid_params(
3244 response_id,
3245 format!("invalid fork_context: {err}"),
3246 );
3247 }
3248 }
3249 }
3250 _ => meerkat_mob::launch::ForkContext::default(),
3251 };
3252 let options = match parse_console_helper_options(request.params.get("options")) {
3253 Ok(opts) => opts,
3254 Err(msg) => return invalid_params(response_id, msg),
3255 };
3256 let handle = runtime.handle();
3257 match handle
3258 .fork_helper(
3259 &MeerkatId::from(source),
3260 MeerkatId::from(agent_identity),
3261 task,
3262 fork_context,
3263 options,
3264 )
3265 .await
3266 {
3267 Ok(result) => {
3268 response_value(
3272 response_id,
3273 Some(serde_json::json!({
3274 "output": result.output,
3275 "tokens_used": result.tokens_used,
3276 })),
3277 None,
3278 )
3279 }
3280 Err(err) => internal_error(response_id, format!("fork_helper failed: {err}")),
3281 }
3282 }
3283 "mobkit/attach_existing_session" => {
3284 let Some(role) = request.params.get("role").and_then(Value::as_str) else {
3285 return invalid_params(response_id, "role required");
3286 };
3287 let Some(agent_identity) = request.params.get("agent_identity").and_then(Value::as_str)
3288 else {
3289 return invalid_params(response_id, "agent_identity required");
3290 };
3291 let Some(session_id_str) = request.params.get("session_id").and_then(Value::as_str)
3292 else {
3293 return invalid_params(response_id, "session_id required");
3294 };
3295 let bridge_session_id = match meerkat_core::types::SessionId::parse(session_id_str) {
3296 Ok(s) => s,
3297 Err(_) => return invalid_params(response_id, "invalid session_id format"),
3298 };
3299 let mid = MeerkatId::from(agent_identity);
3300 let spec = SpawnMemberSpec::new(ProfileName::from(role), mid.clone())
3301 .with_launch_mode(MemberLaunchMode::Resume { bridge_session_id });
3302 let handle = runtime.handle();
3303 match handle.spawn_spec(spec).await {
3304 Ok(_) => match handle.member_status(&mid).await {
3305 Ok(snapshot) => response_value(
3306 response_id,
3307 Some(serde_json::to_value(&snapshot).unwrap_or(Value::Null)),
3308 None,
3309 ),
3310 Err(err) => internal_error(
3311 response_id,
3312 format!("attach_existing_session status lookup failed: {err}"),
3313 ),
3314 },
3315 Err(err) => internal_error(
3316 response_id,
3317 format!("attach_existing_session failed: {err}"),
3318 ),
3319 }
3320 }
3321 "mobkit/cross_mob/wire_local" => {
3322 handle_console_wire_local(runtime, &request.params, response_id, true).await
3323 }
3324 "mobkit/cross_mob/unwire_local" => {
3325 handle_console_wire_local(runtime, &request.params, response_id, false).await
3326 }
3327 "mobkit/peer_pubkey" => match gateway_peer_keys {
3328 Some(keys) => response_value(
3329 response_id,
3330 Some(serde_json::json!({ "pubkey_b64": keys.pubkey_b64() })),
3331 None,
3332 ),
3333 None => response_value(
3334 response_id,
3335 None,
3336 Some(JsonRpcError {
3337 code: -32004,
3338 message: "gateway has no signing keypair configured".to_string(),
3339 data: None,
3340 }),
3341 ),
3342 },
3343 "mobkit/cross_mob/peer_info" => {
3344 let member_id = request.params.get("member_id").and_then(Value::as_str);
3345 match member_id {
3346 Some(mid) if !mid.is_empty() => {
3347 let handle = runtime.handle();
3348 let mob_id = handle.mob_id().to_string();
3349 let meerkat_id = MeerkatId::from(mid);
3350 match handle.get_member(&meerkat_id).await {
3351 Some(entry) => match entry.peer_id() {
3352 Some(peer_id) => {
3353 let comms_name = format!("{}/{}/{}", mob_id, entry.role, mid);
3354 let address = format!("inproc://{comms_name}");
3355 response_value(
3356 response_id,
3357 Some(serde_json::json!({
3358 "member_id": mid,
3359 "mob_id": mob_id,
3360 "comms_name": comms_name,
3361 "peer_id": peer_id,
3362 "address": address,
3363 })),
3364 None,
3365 )
3366 }
3367 None => response_value(
3368 response_id,
3369 None,
3370 Some(JsonRpcError {
3371 code: -32000,
3372 message: format!("member {mid:?} has no comms runtime"),
3373 data: None,
3374 }),
3375 ),
3376 },
3377 None => response_value(
3378 response_id,
3379 None,
3380 Some(JsonRpcError {
3381 code: -32000,
3382 message: format!("member {mid:?} not found"),
3383 data: None,
3384 }),
3385 ),
3386 }
3387 }
3388 _ => invalid_params(response_id, "member_id required".to_string()),
3389 }
3390 }
3391 "mobkit/cross_mob/directory" => {
3392 let entries: Vec<Value> = contact_directory
3393 .map(|dir| {
3394 dir.list()
3395 .into_iter()
3396 .filter_map(|e| serde_json::to_value(e).ok())
3397 .collect()
3398 })
3399 .unwrap_or_default();
3400 response_value(
3401 response_id,
3402 Some(serde_json::json!({ "mobs": entries })),
3403 None,
3404 )
3405 }
3406 method
3407 if matches!(
3408 method,
3409 "mobkit/mob_labels/set"
3410 | "mobkit/mob_labels/get"
3411 | "mobkit/mob_labels/delete"
3412 | "mobkit/run_labels/set"
3413 | "mobkit/run_labels/get"
3414 | "mobkit/run_labels/delete",
3415 ) =>
3416 {
3417 dispatch_label_method(
3418 method,
3419 metadata_table.as_deref(),
3420 runtime.handle().mob_id().as_str(),
3421 response_id,
3422 &request.params,
3423 )
3424 .await
3425 }
3426 _ => response_value(
3427 response_id,
3428 None,
3429 Some(JsonRpcError {
3430 code: -32601,
3431 message: "Method not found".to_string(),
3432 data: None,
3433 }),
3434 ),
3435 }
3436}
3437
3438async fn dispatch_label_method(
3445 method: &str,
3446 metadata_table: Option<&RuntimeMetadataTable>,
3447 mob_id: &str,
3448 response_id: Value,
3449 params: &Value,
3450) -> Value {
3451 let Some(table) = metadata_table else {
3452 return invalid_params(
3453 response_id,
3454 "metadata table not configured for this runtime",
3455 );
3456 };
3457
3458 let scope = match method {
3459 "mobkit/mob_labels/set" | "mobkit/mob_labels/get" | "mobkit/mob_labels/delete" => {
3460 MetadataScope::Mob(mob_id.to_string())
3461 }
3462 _ => match crate::runtime::parse_run_id_param(params) {
3463 Ok(run_id) => MetadataScope::Run(mob_id.to_string(), run_id.to_string()),
3464 Err(message) => return invalid_params(response_id, message),
3465 },
3466 };
3467
3468 let outcome = match method {
3469 "mobkit/mob_labels/set" | "mobkit/run_labels/set" => {
3470 crate::runtime::dispatch_labels_set(table, scope, params).await
3471 }
3472 "mobkit/mob_labels/get" | "mobkit/run_labels/get" => {
3473 crate::runtime::dispatch_labels_get(table, scope).await
3474 }
3475 "mobkit/mob_labels/delete" | "mobkit/run_labels/delete" => {
3476 crate::runtime::dispatch_labels_delete(table, scope).await
3477 }
3478 _ => unreachable!("dispatch_label_method called with non-label method: {method}"),
3479 };
3480
3481 match outcome {
3482 crate::runtime::LabelRpcResult::Accepted => response_value(
3483 response_id,
3484 Some(serde_json::json!({"accepted": true})),
3485 None,
3486 ),
3487 crate::runtime::LabelRpcResult::Labels(labels) => response_value(
3488 response_id,
3489 Some(serde_json::json!({"labels": labels_to_json_value(&labels)})),
3490 None,
3491 ),
3492 crate::runtime::LabelRpcResult::InvalidParams(message) => {
3493 invalid_params(response_id, message)
3494 }
3495 }
3496}
3497
3498async fn handle_console_wire_local(
3507 runtime: &MobRuntime,
3508 params: &Value,
3509 response_id: Value,
3510 wire: bool,
3511) -> Value {
3512 let local = params.get("local_member_id").and_then(Value::as_str);
3513 let comms_name = params.get("remote_comms_name").and_then(Value::as_str);
3514 let peer_id = params.get("remote_peer_id").and_then(Value::as_str);
3515 let addr = params.get("remote_address").and_then(Value::as_str);
3516
3517 let remote_pubkey = match params.get("remote_pubkey_b64") {
3518 None => None,
3519 Some(v) if v.is_null() => None,
3520 Some(v) => match v.as_str() {
3521 Some(s) if !s.is_empty() => match crate::auth::peer_keys::decode_pubkey_b64(s) {
3522 Ok(bytes) => Some(bytes),
3523 Err(err) => {
3524 return invalid_params(response_id, format!("remote_pubkey_b64: {err}"));
3525 }
3526 },
3527 _ => None,
3528 },
3529 };
3530
3531 let (local_id, cname, pid, address) = match (local, comms_name, peer_id, addr) {
3532 (Some(l), Some(c), Some(p), Some(a))
3533 if !l.is_empty() && !c.is_empty() && !p.is_empty() && !a.is_empty() =>
3534 {
3535 (l, c, p, a)
3536 }
3537 _ => {
3538 return invalid_params(
3539 response_id,
3540 "local_member_id, remote_comms_name, remote_peer_id, and remote_address required",
3541 );
3542 }
3543 };
3544
3545 let is_inproc = address.starts_with("inproc://");
3546 let spec_result = match (is_inproc, remote_pubkey) {
3547 (true, None) => TrustedPeerDescriptor::test_only_unsigned(cname, pid, address),
3548 (true, Some(bytes)) => {
3549 TrustedPeerDescriptor::unsigned_with_pubkey(cname, pid, bytes, address)
3550 }
3551 (false, None) => {
3552 return invalid_params(
3553 response_id,
3554 "remote_pubkey_b64 is required for non-inproc transports",
3555 );
3556 }
3557 (false, Some(bytes)) => {
3558 if bytes == [0u8; 32] {
3559 return invalid_params(
3560 response_id,
3561 "remote_pubkey_b64 must be non-zero for non-inproc transports",
3562 );
3563 }
3564 TrustedPeerDescriptor::unsigned_with_pubkey(cname, pid, bytes, address)
3565 }
3566 };
3567
3568 let spec = match spec_result {
3569 Ok(spec) => spec,
3570 Err(err) => {
3571 return invalid_params(response_id, format!("invalid peer spec: {err}"));
3572 }
3573 };
3574
3575 let result = if wire {
3576 runtime
3577 .handle()
3578 .wire(MeerkatId::from(local_id), PeerTarget::External(spec))
3579 .await
3580 } else {
3581 runtime
3582 .handle()
3583 .unwire(MeerkatId::from(local_id), PeerTarget::External(spec))
3584 .await
3585 };
3586
3587 let action = if wire { "wire_local" } else { "unwire_local" };
3588 match result {
3589 Ok(()) => response_value(
3590 response_id,
3591 Some(serde_json::json!({
3592 "accepted": true,
3593 "local_member_id": local_id,
3594 "remote_comms_name": cname,
3595 })),
3596 None,
3597 ),
3598 Err(err) => internal_error(response_id, format!("cross_mob/{action} failed: {err}")),
3599 }
3600}
3601
3602async fn build_live_snapshot(
3603 runtime: &MobRuntime,
3604 config_module_ids: &[String],
3605 console_events: Option<&ConsoleEventStore>,
3606 visibility_policy: &dyn ConsoleVisibilityPolicy,
3607 read_model: &ConsoleSnapshotReadModel,
3608) -> ConsoleLiveSnapshot {
3609 let read_model_state = read_model.snapshot(runtime).await;
3610 let running = read_model_state.running.unwrap_or(true);
3611 let mut members = read_model_state.primary_members.clone();
3618 if visibility_policy.include_implicit_delegate_members() {
3619 for group in &read_model_state.delegate_member_groups {
3620 members.extend(group.iter().cloned());
3621 }
3622 }
3623 dedupe_console_members_by_identity(&mut members);
3624
3625 let loaded_modules = if config_module_ids.is_empty() {
3629 let mut mods: Vec<String> = members
3630 .iter()
3631 .filter(|member| member.state != MEMBER_STATE_RETIRING)
3632 .map(|member| member.agent_identity.clone())
3633 .collect();
3634 mods.sort();
3635 mods
3636 } else {
3637 let mut mods = config_module_ids.to_vec();
3638 mods.sort();
3639 mods
3640 };
3641
3642 let agents = members
3643 .iter()
3644 .map(|member| async move {
3645 let label = member
3646 .labels
3647 .get("display_name")
3648 .cloned()
3649 .unwrap_or_else(|| member.agent_identity.clone());
3650 let watched = member
3651 .labels
3652 .get("console_watched")
3653 .map(|value: &String| value == "true");
3654 let alert_level = member
3655 .labels
3656 .get("console_alert_level")
3657 .filter(|value: &&String| matches!(value.as_str(), "elevated" | "critical"))
3658 .cloned();
3659 let degraded = member
3660 .labels
3661 .get("console_degraded")
3662 .map(|value: &String| value == "true");
3663 let degraded_reason = member.labels.get("console_degraded_reason").cloned();
3664 let response_phase = match console_events {
3665 Some(store) => {
3666 store
3667 .response_phase_for_identity(&member.agent_identity)
3668 .await
3669 }
3670 None => None,
3671 };
3672 ConsoleAgentLiveSnapshot {
3673 agent_id: member.agent_identity.clone(),
3674 member_id: member.agent_identity.clone(),
3675 label,
3676 kind: "meerkat".to_string(),
3677 identity: Some(member.agent_identity.clone()),
3678 role: Some(member.role.clone()),
3679 state: Some(member.state.clone()),
3680 session_id: member.session_id.clone(),
3681 model_capabilities: member.model_capabilities.clone(),
3682 response_phase,
3683 watched,
3684 alert_level,
3685 degraded,
3686 degraded_reason,
3687 }
3688 })
3689 .collect::<Vec<_>>();
3690 let mut agents = join_all(agents).await;
3691 agents.sort_by(|left, right| left.label.cmp(&right.label));
3692 ConsoleLiveSnapshot::new(
3693 Some(runtime.handle().mob_id().to_string()),
3694 running,
3695 loaded_modules,
3696 agents,
3697 members,
3698 true,
3699 )
3700}
3701
3702async fn collect_console_snapshot_read_model(
3703 runtime: &MobRuntime,
3704) -> ConsoleSnapshotReadModelState {
3705 let handle = runtime.handle();
3706 let mut state = ConsoleSnapshotReadModelState {
3707 running: Some(matches!(
3708 handle.status().await.ok(),
3709 Some(MobState::Creating | MobState::Running)
3710 )),
3711 ..ConsoleSnapshotReadModelState::default()
3712 };
3713 collect_console_session_index_for_handle(&handle, &mut state).await;
3714
3715 let (primary_members, _primary_owner_index) =
3721 project_console_members_from_handle(&handle, None, None, &state).await;
3722 state.primary_members = primary_members;
3723
3724 let Some(mcp_state) = runtime.agent_mob_mcp_state() else {
3725 return state;
3726 };
3727 let primary_mob_id = handle.mob_id().to_string();
3728 let mut processed = BTreeSet::from([primary_mob_id]);
3729 let mut delegate_groups: Vec<Vec<ConsoleMember>> = Vec::new();
3730 loop {
3731 let mut progressed = false;
3732 for (mob_id, _mob_state) in mcp_state.mob_list().await {
3733 if processed.contains(mob_id.as_str()) {
3734 continue;
3735 }
3736 let Ok(delegate_handle) = mcp_state.handle_for(&mob_id).await else {
3737 continue;
3738 };
3739 let Some(owner_session_id) = delegate_handle.definition().owner_bridge_session_index()
3740 else {
3741 processed.insert(mob_id.to_string());
3742 continue;
3743 };
3744 let Some(host_identity) = state.session_owner_by_id.get(owner_session_id).cloned()
3745 else {
3746 continue;
3747 };
3748 collect_console_session_index_for_handle(&delegate_handle, &mut state).await;
3749 let (delegate_members, _delegate_owner_index) = project_console_members_from_handle(
3750 &delegate_handle,
3751 Some(&host_identity),
3752 Some(mob_id.as_str()),
3753 &state,
3754 )
3755 .await;
3756 delegate_groups.push(delegate_members);
3757 processed.insert(mob_id.to_string());
3758 progressed = true;
3759 }
3760 if !progressed {
3761 break;
3762 }
3763 }
3764 state.delegate_member_groups = delegate_groups;
3765 state
3766}
3767
3768async fn collect_console_session_index_for_handle(
3769 handle: &MobHandle,
3770 state: &mut ConsoleSnapshotReadModelState,
3771) {
3772 for entry in handle.list_members_including_retiring().await {
3773 let identity = entry.agent_identity.to_string();
3774 let Some(session_id) = handle
3775 .resolve_bridge_session_id(&entry.agent_identity)
3776 .await
3777 .map(|session_id| session_id.to_string())
3778 else {
3779 state.session_id_by_identity.remove(&identity);
3780 continue;
3781 };
3782 state
3783 .session_owner_by_id
3784 .insert(session_id.clone(), identity.clone());
3785 state.session_id_by_identity.insert(identity, session_id);
3786 }
3787}
3788
3789fn apply_console_visibility_policy(
3790 snapshot: &mut ConsoleLiveSnapshot,
3791 visibility_policy: &dyn ConsoleVisibilityPolicy,
3792) {
3793 let mut hidden = BTreeSet::new();
3794 snapshot.members.retain(|member| {
3795 let visible = visibility_policy.member_visible(member);
3796 if !visible {
3797 hidden.insert(member.agent_identity.clone());
3798 }
3799 visible
3800 });
3801 snapshot
3802 .agents
3803 .retain(|agent| !hidden.contains(&agent.agent_id));
3804 snapshot
3805 .loaded_modules
3806 .retain(|module_id| !hidden.contains(module_id));
3807}
3808
3809async fn reset_all_live_console_agents(
3810 runtime: &MobRuntime,
3811 console_events: Option<&ConsoleEventStore>,
3812 console_aggregator: Option<&MobKitConsoleAggregator>,
3813) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
3814 let read_model = ConsoleSnapshotReadModel::default();
3815 *read_model.inner.write().await = collect_console_snapshot_read_model(runtime).await;
3816 read_model
3819 .primed
3820 .store(true, std::sync::atomic::Ordering::Release);
3821 let snapshot = build_live_snapshot(
3822 runtime,
3823 &[],
3824 console_events,
3825 &AllowAllConsoleVisibilityPolicy,
3826 &read_model,
3827 )
3828 .await;
3829 let mut main_identities = BTreeSet::new();
3830 let mut delegate_members = BTreeSet::new();
3831 for member in snapshot.members {
3832 if member.state == MEMBER_STATE_RETIRING {
3833 continue;
3834 }
3835 if let Some(source_mob_id) = member.labels.get("source_mob_id").cloned() {
3836 delegate_members.insert((source_mob_id, member.agent_identity));
3837 } else {
3838 main_identities.insert(member.agent_identity);
3839 }
3840 }
3841 let current_main_identities = main_identities.clone();
3842 let baseline_specs = runtime.baseline_member_specs().await;
3843 let baseline_identities = baseline_specs
3844 .iter()
3845 .map(|spec| spec.identity.to_string())
3846 .collect::<BTreeSet<_>>();
3847 main_identities.extend(baseline_identities.iter().cloned());
3848
3849 let mut retired_delegates = Vec::new();
3850 let mut reset_main = Vec::new();
3851 let mut failures = Vec::new();
3852
3853 if let Some(state) = runtime.agent_mob_mcp_state() {
3854 for (mob_id, identity) in delegate_members {
3855 match state.handle_for(&MobId::from(mob_id.as_str())).await {
3856 Ok(handle) => match handle.retire(MeerkatId::from(identity.as_str())).await {
3857 Ok(()) => retired_delegates.push(json!({
3858 "identity": identity,
3859 "mob_id": mob_id,
3860 })),
3861 Err(err) => failures.push(json!({
3862 "identity": identity,
3863 "mob_id": mob_id,
3864 "error": err.to_string(),
3865 })),
3866 },
3867 Err(err) => failures.push(json!({
3868 "identity": identity,
3869 "mob_id": mob_id,
3870 "error": err.to_string(),
3871 })),
3872 }
3873 }
3874 } else if let Some(aggregator) = console_aggregator {
3875 let identities = delegate_members
3876 .into_iter()
3877 .map(|(_, identity)| identity)
3878 .collect::<BTreeSet<_>>();
3879 for identity in identities {
3880 match aggregator.retire_identity(&identity).await {
3881 Ok(true) => retired_delegates.push(json!({ "identity": identity })),
3882 Ok(false) => failures.push(json!({
3883 "identity": identity,
3884 "error": "unknown identity",
3885 })),
3886 Err(err) => failures.push(json!({
3887 "identity": identity,
3888 "error": err.to_string(),
3889 })),
3890 }
3891 }
3892 }
3893
3894 let handle = runtime.handle();
3895 for spec in baseline_specs {
3896 let identity = spec.identity.to_string();
3897 if current_main_identities.contains(&identity) {
3898 continue;
3899 }
3900 match handle.ensure_member(spec).await {
3901 Ok(_outcome) => {
3902 if let Some(store) = console_events {
3903 store
3904 .record_lifecycle(
3905 &identity,
3906 "identity_reset",
3907 json!({ "scope": "reset_all", "restored": true }),
3908 )
3909 .await;
3910 }
3911 reset_main.push(identity);
3912 }
3913 Err(err) => failures.push(json!({
3914 "identity": identity,
3915 "error": err.to_string(),
3916 })),
3917 }
3918 }
3919 for identity in main_identities {
3920 if baseline_identities.contains(&identity) && !current_main_identities.contains(&identity) {
3921 continue;
3922 }
3923 if baseline_identities.contains(&identity) {
3924 match handle
3925 .respawn(MeerkatId::from(identity.as_str()), None)
3926 .await
3927 {
3928 Ok(_receipt) => {
3929 if let Some(store) = console_events {
3930 store
3931 .record_lifecycle(
3932 &identity,
3933 "identity_reset",
3934 json!({ "scope": "reset_all" }),
3935 )
3936 .await;
3937 }
3938 reset_main.push(identity);
3939 }
3940 Err(err) => failures.push(json!({
3941 "identity": identity,
3942 "error": err.to_string(),
3943 })),
3944 }
3945 } else {
3946 match handle.retire(MeerkatId::from(identity.as_str())).await {
3947 Ok(()) => {
3948 if let Some(store) = console_events {
3949 store
3950 .record_lifecycle(
3951 &identity,
3952 "identity_retired",
3953 json!({ "scope": "reset_all", "dynamic": true }),
3954 )
3955 .await;
3956 }
3957 retired_delegates.push(json!({ "identity": identity }));
3958 }
3959 Err(err) => failures.push(json!({
3960 "identity": identity,
3961 "error": err.to_string(),
3962 })),
3963 }
3964 }
3965 }
3966
3967 let startup_history = if let Some(aggregator) = console_aggregator {
3968 aggregator.clear_timeline_frames().await?;
3969 Some(
3970 wait_for_reset_startup_history(
3971 aggregator,
3972 baseline_identities.iter().cloned().collect(),
3973 Duration::from_mins(1),
3974 )
3975 .await?,
3976 )
3977 } else {
3978 None
3979 };
3980
3981 Ok(json!({
3982 "reset": reset_main,
3983 "retired_delegates": retired_delegates,
3984 "failed": failures,
3985 "startup_history": startup_history,
3986 }))
3987}
3988
3989async fn wait_for_reset_startup_history(
3990 aggregator: &MobKitConsoleAggregator,
3991 identities: BTreeSet<String>,
3992 timeout: Duration,
3993) -> ConsoleLogResult<Value> {
3994 if identities.is_empty() {
3995 return Ok(json!({
3996 "timeout": false,
3997 "ready": Vec::<String>::new(),
3998 "pending": Vec::<String>::new(),
3999 }));
4000 }
4001
4002 let deadline = Instant::now() + timeout;
4003 let mut pending = identities;
4004 let mut ready = BTreeSet::new();
4005 while !pending.is_empty() {
4006 for identity in pending.clone() {
4007 let page = aggregator
4008 .query_timeline(ConsoleTimelineQuery {
4009 identity: Some(identity.clone()),
4010 limit: 1000,
4011 ..ConsoleTimelineQuery::default()
4012 })
4013 .await?;
4014 let startup_completed = page.frames.iter().any(|frame| {
4015 matches!(
4016 frame.kind.as_str(),
4017 "interaction_complete" | "turn_completed"
4018 )
4019 });
4020 if startup_completed {
4021 pending.remove(&identity);
4022 ready.insert(identity);
4023 }
4024 }
4025
4026 if pending.is_empty() {
4027 break;
4028 }
4029 if Instant::now() >= deadline {
4030 return Ok(json!({
4031 "timeout": true,
4032 "ready": ready.into_iter().collect::<Vec<_>>(),
4033 "pending": pending.into_iter().collect::<Vec<_>>(),
4034 }));
4035 }
4036 tokio::time::sleep(Duration::from_millis(250)).await;
4037 }
4038
4039 Ok(json!({
4040 "timeout": false,
4041 "ready": ready.into_iter().collect::<Vec<_>>(),
4042 "pending": Vec::<String>::new(),
4043 }))
4044}
4045
4046fn dedupe_console_members_by_identity(members: &mut Vec<ConsoleMember>) {
4047 let mut seen_member_ids = BTreeSet::new();
4048 members.retain(|member| seen_member_ids.insert(member.agent_identity.clone()));
4049}
4050
4051async fn project_console_members_from_handle(
4052 handle: &MobHandle,
4053 host_identity: Option<&str>,
4054 source_mob_id: Option<&str>,
4055 read_model: &ConsoleSnapshotReadModelState,
4056) -> (Vec<ConsoleMember>, BTreeMap<String, String>) {
4057 let entries = handle.list_all_members().await;
4058 let mut members = Vec::with_capacity(entries.len());
4059 let mut session_owner_by_id = BTreeMap::new();
4060 for entry in &entries {
4061 let identity = entry.agent_identity.to_string();
4062 let session_id = read_model.session_id_by_identity.get(&identity).cloned();
4063 if let Some(session_id) = session_id.as_ref() {
4064 session_owner_by_id.insert(session_id.clone(), identity.clone());
4065 }
4066 let model_capabilities =
4067 model_capabilities_for_role(handle.definition(), entry.role.as_str());
4068 let mut labels = entry.labels.clone();
4069 if let Some(host_identity) = host_identity {
4070 labels
4071 .entry("delegate_host_identity".to_string())
4072 .or_insert_with(|| host_identity.to_string());
4073 labels
4074 .entry("group".to_string())
4075 .or_insert_with(|| "Coordinators".to_string());
4076 }
4077 if let Some(source_mob_id) = source_mob_id {
4078 labels
4079 .entry("source_mob_id".to_string())
4080 .or_insert_with(|| source_mob_id.to_string());
4081 }
4082 let mut wired_to: Vec<String> = entry.wired_to.iter().map(ToString::to_string).collect();
4083 if let Some(host_identity) = host_identity
4084 && !wired_to.iter().any(|peer| peer == host_identity)
4085 {
4086 wired_to.push(host_identity.to_string());
4087 }
4088 members.push(ConsoleMember {
4089 agent_identity: identity,
4090 role: entry.role.to_string(),
4091 state: match entry.state {
4092 meerkat_mob::MemberState::Active => MEMBER_STATE_ACTIVE.to_string(),
4093 meerkat_mob::MemberState::Retiring => MEMBER_STATE_RETIRING.to_string(),
4094 },
4095 model_capabilities,
4096 runtime_mode: Some(entry.runtime_mode.to_string()),
4097 session_id,
4098 wired_to,
4099 labels,
4100 });
4101 }
4102 (members, session_owner_by_id)
4103}
4104
4105async fn build_aggregator_live_snapshot(
4106 aggregator: &MobKitConsoleAggregator,
4107 config_module_ids: &[String],
4108) -> Result<ConsoleLiveSnapshot, Box<dyn std::error::Error + Send + Sync>> {
4109 let identities = aggregator.list_identities().await?;
4110 let mut members = Vec::with_capacity(identities.len());
4111 for identity in &identities {
4112 let mut labels = identity.labels.clone();
4113 labels
4114 .entry("display_name".to_string())
4115 .or_insert_with(|| identity.display_name.clone());
4116 labels
4117 .entry("addressable".to_string())
4118 .or_insert_with(|| identity.addressable.to_string());
4119 let wired_to = aggregator
4120 .inspect_identity(&identity.identity)
4121 .await
4122 .ok()
4123 .flatten()
4124 .map(|inspection| inspection.peers)
4125 .unwrap_or_default();
4126 members.push(ConsoleMember {
4127 agent_identity: identity.identity.clone(),
4128 role: labels
4129 .get("role")
4130 .cloned()
4131 .unwrap_or_else(|| "identity".to_string()),
4132 state: identity.health.clone(),
4133 model_capabilities: ConsoleModelCapabilities::default(),
4134 runtime_mode: Some("console_aggregator".to_string()),
4135 session_id: identity.session_id.clone(),
4136 wired_to,
4137 labels,
4138 });
4139 }
4140 members.sort_by(|left, right| left.agent_identity.cmp(&right.agent_identity));
4141 let agents = members
4142 .iter()
4143 .map(|member| ConsoleAgentLiveSnapshot {
4144 agent_id: member.agent_identity.clone(),
4145 member_id: member.agent_identity.clone(),
4146 label: member
4147 .labels
4148 .get("display_name")
4149 .cloned()
4150 .unwrap_or_else(|| member.agent_identity.clone()),
4151 kind: "meerkat".to_string(),
4152 identity: Some(member.agent_identity.clone()),
4153 role: Some(member.role.clone()),
4154 state: Some(member.state.clone()),
4155 session_id: member.session_id.clone(),
4156 model_capabilities: member.model_capabilities.clone(),
4157 response_phase: None,
4158 watched: None,
4159 alert_level: None,
4160 degraded: None,
4161 degraded_reason: None,
4162 })
4163 .collect::<Vec<_>>();
4164 let loaded_modules = if config_module_ids.is_empty() {
4165 members
4166 .iter()
4167 .map(|member| member.agent_identity.clone())
4168 .collect()
4169 } else {
4170 config_module_ids.to_vec()
4171 };
4172 Ok(ConsoleLiveSnapshot::new(
4173 Some("console-aggregator".to_string()),
4174 true,
4175 loaded_modules,
4176 agents,
4177 members,
4178 true,
4179 ))
4180}
4181
4182pub async fn console_frontend_index_handler() -> impl IntoResponse {
4183 (
4184 [
4185 (header::CONTENT_TYPE, "text/html; charset=utf-8"),
4186 (header::CACHE_CONTROL, "no-store"),
4187 ],
4188 CONSOLE_FRONTEND_INDEX_HTML,
4189 )
4190}
4191
4192pub async fn console_frontend_app_js_handler() -> impl IntoResponse {
4193 (
4194 [
4195 (
4196 header::CONTENT_TYPE,
4197 "application/javascript; charset=utf-8",
4198 ),
4199 (header::CACHE_CONTROL, "no-store"),
4200 ],
4201 CONSOLE_FRONTEND_APP_JS,
4202 )
4203}
4204
4205pub async fn console_frontend_app_css_handler() -> impl IntoResponse {
4206 (
4207 [
4208 (header::CONTENT_TYPE, "text/css; charset=utf-8"),
4209 (header::CACHE_CONTROL, "no-store"),
4210 ],
4211 CONSOLE_FRONTEND_APP_CSS,
4212 )
4213}
4214
4215#[cfg(test)]
4216mod tests {
4217 use super::{
4218 ConsoleSnapshotReadModel, ConsoleSnapshotReadModelState, MAX_MULTIPART_BODY_BYTES,
4219 MAX_MULTIPART_IMAGE_BYTES, MultipartImageUpload, apply_console_visibility_policy,
4220 collect_console_snapshot_read_model, console_send_identity_first, cursor_is_after,
4221 dedupe_console_members_by_identity, externalize_image_upload_placeholders,
4222 externalize_single_image_upload, handle_console_aggregator_rpc,
4223 project_console_members_from_handle, query_timeline_snapshot,
4224 };
4225 use crate::blob_store::{BinaryBlobStore, ObjectStoreBlobStore};
4226 use crate::console_aggregator::{
4227 AllowAllConsoleVisibilityPolicy, HideImplicitDelegateMembersConsoleVisibilityPolicy,
4228 };
4229 use crate::console_aggregator::{
4230 ConsoleCursor, ConsoleFrameSource, ConsoleFrameSourceKind, ConsoleFrameStatus,
4231 ConsoleTimelineQuery, MobKitConsoleAggregator, NewConsoleFrame,
4232 };
4233 use crate::identity_first::{
4234 AgentAddressability, AgentIdentity, AgentRuntimeId, CheckpointVersion,
4235 ContinuityGeneration, ContinuityRecord, DurabilityPolicy, DurableAgentSpec, FencingToken,
4236 IdentityLifecycleState, IdentityRuntime, IdentityRuntimeConfig, LeaseGrant,
4237 LocalContinuityStore, LocalLeaseProvider,
4238 };
4239 use crate::mob_handle_runtime::{MobRuntime, model_capabilities_for_role};
4240 use crate::rpc::{JSONRPC_VERSION, JsonRpcRequest};
4241 use crate::runtime::{ConsoleAgentLiveSnapshot, ConsoleLiveSnapshot, ConsoleMember};
4242 use crate::unified_runtime::ConsoleEventStore;
4243 use crate::{MobBootstrapOptions, MobBootstrapSpec};
4244 use bytes::Bytes;
4245 use meerkat::{AgentFactory, Config, build_ephemeral_service};
4246 use meerkat_client::TestClient;
4247 use meerkat_mob::ProfileName;
4248 use meerkat_mob::{MobDefinition, MobStorage, SpawnMemberSpec};
4249 use serde_json::{Value, json};
4250 use std::collections::BTreeMap;
4251 use std::sync::Arc;
4252 use std::time::Duration;
4253
4254 async fn build_empty_console_test_runtime(
4255 mob_id: &str,
4256 ) -> Result<(tempfile::TempDir, MobRuntime), Box<dyn std::error::Error + Send + Sync>> {
4257 let temp_dir = tempfile::tempdir()?;
4258 let session_path = temp_dir.path().join("sessions");
4259 std::fs::create_dir_all(&session_path)?;
4260 let factory = AgentFactory::new(&session_path).comms(true);
4261 let session_service = Arc::new(build_ephemeral_service(factory, Config::default(), 16));
4262 let definition = MobDefinition::from_toml(&format!(
4263 r#"
4264[mob]
4265id = "{mob_id}"
4266
4267[profiles.worker]
4268model = "gpt-5.5"
4269external_addressable = true
4270
4271[profiles.worker.tools]
4272comms = true
4273"#
4274 ))?;
4275 let runtime = MobRuntime::bootstrap(
4276 MobBootstrapSpec::new(definition, MobStorage::in_memory(), session_service)
4277 .with_options(MobBootstrapOptions {
4278 allow_ephemeral_sessions: true,
4279 notify_orchestrator_on_resume: true,
4280 default_llm_client: Some(Arc::new(TestClient::default())),
4281 }),
4282 )
4283 .await?;
4284 Ok((temp_dir, runtime))
4285 }
4286
4287 fn rpc_request(method: &str) -> JsonRpcRequest {
4288 JsonRpcRequest {
4289 jsonrpc: JSONRPC_VERSION.to_string(),
4290 id: Some(json!(1)),
4291 method: method.to_string(),
4292 params: json!({}),
4293 }
4294 }
4295
4296 #[tokio::test]
4297 async fn identity_first_console_send_reserves_timeline_and_uses_identity_runtime()
4298 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4299 let identity = AgentIdentity::parse("agent:console")?;
4300 let record = ContinuityRecord {
4301 identity: identity.clone(),
4302 agent_runtime_id: AgentRuntimeId::parse("rt:agent:console:0")?,
4303 session_id: meerkat_core::types::SessionId::new(),
4304 generation: ContinuityGeneration::new(0),
4305 checkpoint_version: CheckpointVersion::new(0),
4306 };
4307 let runtime = IdentityRuntime::new(IdentityRuntimeConfig {
4308 continuity_store: Arc::new(LocalContinuityStore::in_memory()?),
4309 lease_provider: Arc::new(LocalLeaseProvider::new()),
4310 runtime_instance_id: "console-test".to_string(),
4311 has_runtime_store: true,
4312 durability_policy: DurabilityPolicy::SyncWriteThrough,
4313 bridge: None,
4314 default_timeout: None,
4315 });
4316 runtime
4317 .register(
4318 DurableAgentSpec {
4319 identity: identity.clone(),
4320 profile: ProfileName::from("default"),
4321 addressability: AgentAddressability::Addressable,
4322 display_name: None,
4323 labels: BTreeMap::new(),
4324 context: None,
4325 additional_instructions: Vec::new(),
4326 initial_message: None,
4327 runtime_mode_override: None,
4328 },
4329 IdentityLifecycleState::Active,
4330 Some(record.clone()),
4331 Some(LeaseGrant {
4332 identity: identity.clone(),
4333 fencing_token: FencingToken::new(7),
4334 ttl: Duration::from_mins(1),
4335 }),
4336 )
4337 .await;
4338
4339 let aggregator = MobKitConsoleAggregator::in_memory();
4340 let events = ConsoleEventStore::new();
4341 let accepted = console_send_identity_first(
4342 &aggregator,
4343 &runtime,
4344 Some(&events),
4345 crate::console_aggregator::ConsoleSendRequest {
4346 identity: identity.as_str().to_string(),
4347 content: serde_json::to_value(meerkat_core::ContentInput::Text(
4348 "hello".to_string(),
4349 ))?,
4350 origin: "test".to_string(),
4351 idempotency_key: "idem-1".to_string(),
4352 handling_mode: None,
4353 },
4354 )
4355 .await?;
4356
4357 assert_eq!(accepted.identity, identity.as_str());
4358 assert_eq!(accepted.status, ConsoleFrameStatus::Accepted);
4359 assert_eq!(accepted.session_id, Some(record.session_id.to_string()));
4360
4361 let page = aggregator
4362 .query_timeline(ConsoleTimelineQuery {
4363 identity: Some(identity.as_str().to_string()),
4364 ..ConsoleTimelineQuery::default()
4365 })
4366 .await?;
4367 assert_eq!(page.frames.len(), 1);
4368 assert_eq!(page.frames[0].runtime_key, "identity-first");
4369 assert_eq!(page.frames[0].status, ConsoleFrameStatus::Accepted);
4370 assert_eq!(
4371 page.frames[0].session_id,
4372 Some(record.session_id.to_string())
4373 );
4374 Ok(())
4375 }
4376
4377 #[test]
4378 fn multipart_body_limit_covers_configured_image_limit() {
4379 const _: () = assert!(MAX_MULTIPART_BODY_BYTES > MAX_MULTIPART_IMAGE_BYTES);
4380 const _: () = assert!(MAX_MULTIPART_BODY_BYTES > 2 * 1024 * 1024);
4381 }
4382
4383 #[tokio::test]
4396 async fn cold_cache_waiter_resumes_when_refresh_lock_drops()
4397 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4398 use std::sync::atomic::Ordering;
4399 use tokio::time::Duration;
4400
4401 let model = ConsoleSnapshotReadModel::default();
4402 let guard = model
4403 .refresh_lock
4404 .clone()
4405 .try_lock_owned()
4406 .map_err(|_| "refresh_lock unexpectedly contended at test start")?;
4407
4408 let model_for_waiter = model.clone();
4409 let waiter = tokio::spawn(async move {
4410 if model_for_waiter
4413 .primed
4414 .load(std::sync::atomic::Ordering::Acquire)
4415 {
4416 return;
4417 }
4418 let _wait_guard = model_for_waiter.refresh_lock.clone().lock_owned().await;
4419 assert!(
4422 model_for_waiter
4423 .primed
4424 .load(std::sync::atomic::Ordering::Acquire),
4425 "waiter acquired lock but primed is still false"
4426 );
4427 });
4428
4429 tokio::time::sleep(Duration::from_millis(20)).await;
4431
4432 model.primed.store(true, Ordering::Release);
4435 drop(guard);
4436
4437 let result = tokio::time::timeout(Duration::from_secs(1), waiter).await;
4438 assert!(
4439 result.is_ok(),
4440 "waiter should resume once the refresh lock drops"
4441 );
4442 Ok(())
4443 }
4444
4445 #[tokio::test]
4450 async fn snapshot_skips_refresh_lock_when_already_primed()
4451 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4452 use std::sync::atomic::Ordering;
4453 use tokio::time::Duration;
4454
4455 let model = ConsoleSnapshotReadModel::default();
4456 model.primed.store(true, Ordering::Release);
4457 let _guard = model
4459 .refresh_lock
4460 .clone()
4461 .try_lock_owned()
4462 .map_err(|_| "refresh_lock unexpectedly contended at test start")?;
4463
4464 let snap_fast_path = async {
4470 assert!(
4471 model.primed.load(Ordering::Acquire),
4472 "primed precondition for hot-cache path"
4473 );
4474 };
4475 let result = tokio::time::timeout(Duration::from_millis(100), snap_fast_path).await;
4476 assert!(result.is_ok(), "hot-cache snapshot path should not block");
4477 Ok(())
4478 }
4479
4480 #[tokio::test]
4481 async fn console_aggregator_reset_all_rpc_force_refreshes_identity_cache()
4482 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4483 let (_temp_dir, runtime) =
4484 build_empty_console_test_runtime("console-reset-fresh-identity-cache").await?;
4485 let aggregator = MobKitConsoleAggregator::in_memory();
4486 aggregator.register_runtime_handles_with_policy(
4487 "runtime-reset",
4488 "reset",
4489 runtime.clone(),
4490 ConsoleEventStore::new(),
4491 Arc::new(AllowAllConsoleVisibilityPolicy),
4492 );
4493 let primed_empty = aggregator.list_identities().await?;
4494 assert!(
4495 primed_empty.is_empty(),
4496 "test precondition: identity cache should be primed empty before late spawn"
4497 );
4498
4499 runtime
4500 .handle()
4501 .spawn_spec(SpawnMemberSpec::from_wire(
4502 "worker".to_string(),
4503 "agent-reset".to_string(),
4504 Some("You are agent-reset.".into()),
4505 None,
4506 None,
4507 ))
4508 .await?;
4509
4510 let response =
4511 handle_console_aggregator_rpc(Some(aggregator), rpc_request("mobkit/reset_all"), true)
4512 .await;
4513
4514 assert_eq!(response["error"], Value::Null);
4515 assert_eq!(response["result"]["retired"], json!(["reset/agent-reset"]));
4516 let _ = runtime.handle().stop().await;
4517 Ok(())
4518 }
4519
4520 #[test]
4521 fn timeline_stream_cursor_filter_uses_numeric_console_sequence() {
4522 assert!(cursor_is_after(
4523 &ConsoleCursor::from("console:10"),
4524 &ConsoleCursor::from("console:9")
4525 ));
4526 assert!(!cursor_is_after(
4527 &ConsoleCursor::from("console:9"),
4528 &ConsoleCursor::from("console:10")
4529 ));
4530 }
4531
4532 #[test]
4533 fn console_live_snapshot_dedupes_repeated_delegate_identities() {
4534 let mut members = vec![
4535 ConsoleMember {
4536 agent_identity: "incident-commander".to_string(),
4537 role: "commander".to_string(),
4538 state: "active".to_string(),
4539 model_capabilities: Default::default(),
4540 runtime_mode: None,
4541 session_id: None,
4542 wired_to: Vec::new(),
4543 labels: BTreeMap::new(),
4544 },
4545 ConsoleMember {
4546 agent_identity: "qa-child".to_string(),
4547 role: "delegate".to_string(),
4548 state: "active".to_string(),
4549 model_capabilities: Default::default(),
4550 runtime_mode: None,
4551 session_id: Some("first".to_string()),
4552 wired_to: vec!["qa-parent".to_string()],
4553 labels: BTreeMap::from([(
4554 "delegate_host_identity".to_string(),
4555 "qa-parent".to_string(),
4556 )]),
4557 },
4558 ConsoleMember {
4559 agent_identity: "qa-child".to_string(),
4560 role: "delegate".to_string(),
4561 state: "active".to_string(),
4562 model_capabilities: Default::default(),
4563 runtime_mode: None,
4564 session_id: Some("second".to_string()),
4565 wired_to: vec!["qa-parent".to_string()],
4566 labels: BTreeMap::from([(
4567 "delegate_host_identity".to_string(),
4568 "qa-parent".to_string(),
4569 )]),
4570 },
4571 ];
4572
4573 dedupe_console_members_by_identity(&mut members);
4574
4575 assert_eq!(
4576 members
4577 .iter()
4578 .map(|member| member.agent_identity.as_str())
4579 .collect::<Vec<_>>(),
4580 vec!["incident-commander", "qa-child"]
4581 );
4582 assert_eq!(members[1].session_id.as_deref(), Some("first"));
4583 }
4584
4585 #[test]
4586 fn console_visibility_policy_hides_implicit_delegate_members_from_snapshot() {
4587 let mut snapshot = ConsoleLiveSnapshot::new(
4588 Some("runtime".to_string()),
4589 true,
4590 vec!["incident-commander".to_string(), "qa-child".to_string()],
4591 vec![
4592 ConsoleAgentLiveSnapshot {
4593 agent_id: "incident-commander".to_string(),
4594 member_id: "incident-commander".to_string(),
4595 label: "Incident Commander".to_string(),
4596 kind: "meerkat".to_string(),
4597 identity: Some("incident-commander".to_string()),
4598 role: Some("commander".to_string()),
4599 state: Some("active".to_string()),
4600 session_id: None,
4601 model_capabilities: Default::default(),
4602 response_phase: None,
4603 watched: None,
4604 alert_level: None,
4605 degraded: None,
4606 degraded_reason: None,
4607 },
4608 ConsoleAgentLiveSnapshot {
4609 agent_id: "qa-child".to_string(),
4610 member_id: "qa-child".to_string(),
4611 label: "QA Child".to_string(),
4612 kind: "meerkat".to_string(),
4613 identity: Some("qa-child".to_string()),
4614 role: Some("delegate".to_string()),
4615 state: Some("active".to_string()),
4616 session_id: Some("delegate-session".to_string()),
4617 model_capabilities: Default::default(),
4618 response_phase: None,
4619 watched: None,
4620 alert_level: None,
4621 degraded: None,
4622 degraded_reason: None,
4623 },
4624 ],
4625 vec![
4626 ConsoleMember {
4627 agent_identity: "incident-commander".to_string(),
4628 role: "commander".to_string(),
4629 state: "active".to_string(),
4630 model_capabilities: Default::default(),
4631 runtime_mode: None,
4632 session_id: None,
4633 wired_to: Vec::new(),
4634 labels: BTreeMap::new(),
4635 },
4636 ConsoleMember {
4637 agent_identity: "qa-child".to_string(),
4638 role: "delegate".to_string(),
4639 state: "active".to_string(),
4640 model_capabilities: Default::default(),
4641 runtime_mode: None,
4642 session_id: Some("delegate-session".to_string()),
4643 wired_to: vec!["qa-parent".to_string()],
4644 labels: BTreeMap::from([(
4645 "source_mob_id".to_string(),
4646 "implicit-qa-mob".to_string(),
4647 )]),
4648 },
4649 ],
4650 true,
4651 );
4652
4653 apply_console_visibility_policy(
4654 &mut snapshot,
4655 &HideImplicitDelegateMembersConsoleVisibilityPolicy,
4656 );
4657
4658 assert_eq!(
4659 snapshot
4660 .members
4661 .iter()
4662 .map(|member| member.agent_identity.as_str())
4663 .collect::<Vec<_>>(),
4664 vec!["incident-commander"]
4665 );
4666 assert_eq!(
4667 snapshot
4668 .agents
4669 .iter()
4670 .map(|agent| agent.agent_id.as_str())
4671 .collect::<Vec<_>>(),
4672 vec!["incident-commander"]
4673 );
4674 assert_eq!(snapshot.loaded_modules, vec!["incident-commander"]);
4675 }
4676
4677 #[tokio::test]
4678 async fn live_snapshot_member_projection_uses_roster_profile_capabilities()
4679 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4680 let temp_dir = tempfile::tempdir()?;
4681 let session_path = temp_dir.path().join("sessions");
4682 std::fs::create_dir_all(&session_path)?;
4683 let factory = AgentFactory::new(&session_path).comms(true);
4684 let session_service = Arc::new(build_ephemeral_service(factory, Config::default(), 16));
4685 let definition = MobDefinition::from_toml(
4686 r#"
4687[mob]
4688id = "console-snapshot-test"
4689
4690[profiles.worker]
4691model = "gpt-5.5"
4692
4693[profiles.worker.tools]
4694comms = true
4695"#,
4696 )?;
4697 let expected = model_capabilities_for_role(&definition, "worker");
4698 let runtime = MobRuntime::bootstrap(
4699 MobBootstrapSpec::new(definition, MobStorage::in_memory(), session_service)
4700 .with_options(MobBootstrapOptions {
4701 allow_ephemeral_sessions: true,
4702 notify_orchestrator_on_resume: true,
4703 default_llm_client: Some(Arc::new(TestClient::default())),
4704 }),
4705 )
4706 .await?;
4707 runtime
4708 .handle()
4709 .spawn_spec(SpawnMemberSpec::from_wire(
4710 "worker".to_string(),
4711 "worker:one".to_string(),
4712 Some("You are worker one.".into()),
4713 None,
4714 None,
4715 ))
4716 .await?;
4717
4718 let empty_read_model = ConsoleSnapshotReadModelState::default();
4719 let (members, session_owner_by_id) =
4720 project_console_members_from_handle(&runtime.handle(), None, None, &empty_read_model)
4721 .await;
4722
4723 assert_eq!(members.len(), 1);
4724 assert_eq!(members[0].model_capabilities, expected);
4725 assert_eq!(members[0].session_id, None);
4726 assert!(session_owner_by_id.is_empty());
4727
4728 let refreshed_read_model = collect_console_snapshot_read_model(&runtime).await;
4729 let (members, session_owner_by_id) = project_console_members_from_handle(
4730 &runtime.handle(),
4731 None,
4732 None,
4733 &refreshed_read_model,
4734 )
4735 .await;
4736 assert_eq!(
4737 members[0].session_id.as_ref(),
4738 session_owner_by_id.keys().next()
4739 );
4740
4741 assert_eq!(
4747 refreshed_read_model.primary_members.len(),
4748 members.len(),
4749 "primary_members cache should hold the same members as live projection"
4750 );
4751 assert_eq!(
4752 refreshed_read_model.primary_members[0].agent_identity,
4753 members[0].agent_identity
4754 );
4755 assert_eq!(
4756 refreshed_read_model.primary_members[0].session_id,
4757 members[0].session_id
4758 );
4759 Ok(())
4760 }
4761
4762 #[tokio::test]
4763 async fn fresh_timeline_snapshot_reads_tail_without_full_log_replay()
4764 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4765 let aggregator = MobKitConsoleAggregator::in_memory();
4766 for idx in 0..25_000 {
4767 aggregator
4768 .store()
4769 .append_if_absent(NewConsoleFrame {
4770 id: None,
4771 dedupe_key: format!("event-{idx}"),
4772 timestamp_ms: idx,
4773 runtime_key: "runtime-a".to_string(),
4774 identity: "agent-a".to_string(),
4775 conversation_id: Some("agent-a".to_string()),
4776 session_id: None,
4777 kind: "text_delta".to_string(),
4778 status: ConsoleFrameStatus::Completed,
4779 payload: json!({ "delta": idx }),
4780 source: ConsoleFrameSource {
4781 kind: ConsoleFrameSourceKind::ConsoleEvent,
4782 source_cursor: None,
4783 },
4784 source_event_id: Some(format!("event-{idx}")),
4785 interaction_id: None,
4786 turn_id: None,
4787 run_id: None,
4788 parent_frame_id: None,
4789 caused_by_frame_id: None,
4790 })
4791 .await?;
4792 }
4793
4794 let (frames, cursor) = query_timeline_snapshot(
4795 &aggregator,
4796 ConsoleTimelineQuery {
4797 identity: Some("agent-a".to_string()),
4798 after: None,
4799 limit: 200,
4800 ..ConsoleTimelineQuery::default()
4801 },
4802 )
4803 .await?;
4804
4805 assert!(!frames.is_empty());
4806 assert_eq!(cursor.as_ref().and_then(ConsoleCursor::seq), Some(25_000));
4807 assert_eq!(
4808 frames.last().and_then(|frame| frame.cursor.seq()),
4809 Some(25_000)
4810 );
4811 Ok(())
4812 }
4813
4814 #[tokio::test]
4815 async fn fresh_timeline_snapshot_keeps_sparse_identity_frames()
4816 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4817 let aggregator = MobKitConsoleAggregator::in_memory();
4818 aggregator
4819 .store()
4820 .append_if_absent(NewConsoleFrame {
4821 id: None,
4822 dedupe_key: "sparse-event".to_string(),
4823 timestamp_ms: 1,
4824 runtime_key: "runtime-a".to_string(),
4825 identity: "sparse-agent".to_string(),
4826 conversation_id: Some("sparse-agent".to_string()),
4827 session_id: None,
4828 kind: "text_complete".to_string(),
4829 status: ConsoleFrameStatus::Completed,
4830 payload: json!({ "text": "still visible" }),
4831 source: ConsoleFrameSource {
4832 kind: ConsoleFrameSourceKind::ConsoleEvent,
4833 source_cursor: None,
4834 },
4835 source_event_id: Some("sparse-event".to_string()),
4836 interaction_id: None,
4837 turn_id: None,
4838 run_id: None,
4839 parent_frame_id: None,
4840 caused_by_frame_id: None,
4841 })
4842 .await?;
4843 for idx in 0..25_000 {
4844 aggregator
4845 .store()
4846 .append_if_absent(NewConsoleFrame {
4847 id: None,
4848 dedupe_key: format!("other-event-{idx}"),
4849 timestamp_ms: idx + 2,
4850 runtime_key: "runtime-a".to_string(),
4851 identity: "busy-agent".to_string(),
4852 conversation_id: Some("busy-agent".to_string()),
4853 session_id: None,
4854 kind: "text_delta".to_string(),
4855 status: ConsoleFrameStatus::Completed,
4856 payload: json!({ "delta": idx }),
4857 source: ConsoleFrameSource {
4858 kind: ConsoleFrameSourceKind::ConsoleEvent,
4859 source_cursor: None,
4860 },
4861 source_event_id: Some(format!("other-event-{idx}")),
4862 interaction_id: None,
4863 turn_id: None,
4864 run_id: None,
4865 parent_frame_id: None,
4866 caused_by_frame_id: None,
4867 })
4868 .await?;
4869 }
4870
4871 let (frames, cursor) = query_timeline_snapshot(
4872 &aggregator,
4873 ConsoleTimelineQuery {
4874 identity: Some("sparse-agent".to_string()),
4875 after: None,
4876 limit: 200,
4877 ..ConsoleTimelineQuery::default()
4878 },
4879 )
4880 .await?;
4881
4882 assert_eq!(frames.len(), 1);
4883 assert_eq!(frames[0].identity, "sparse-agent");
4884 assert_eq!(frames[0].payload["text"], json!("still visible"));
4885 assert_eq!(cursor.as_ref().and_then(ConsoleCursor::seq), Some(1));
4886 Ok(())
4887 }
4888
4889 #[tokio::test]
4890 async fn timeline_snapshot_clamps_requested_limit_to_store_page_size()
4891 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4892 let aggregator = MobKitConsoleAggregator::in_memory();
4893 for idx in 0..2_500 {
4894 aggregator
4895 .store()
4896 .append_if_absent(NewConsoleFrame {
4897 id: None,
4898 dedupe_key: format!("clamp-event-{idx}"),
4899 timestamp_ms: idx,
4900 runtime_key: "runtime-a".to_string(),
4901 identity: "agent-a".to_string(),
4902 conversation_id: Some("agent-a".to_string()),
4903 session_id: None,
4904 kind: "text_delta".to_string(),
4905 status: ConsoleFrameStatus::Completed,
4906 payload: json!({ "delta": idx }),
4907 source: ConsoleFrameSource {
4908 kind: ConsoleFrameSourceKind::ConsoleEvent,
4909 source_cursor: None,
4910 },
4911 source_event_id: Some(format!("clamp-event-{idx}")),
4912 interaction_id: None,
4913 turn_id: None,
4914 run_id: None,
4915 parent_frame_id: None,
4916 caused_by_frame_id: None,
4917 })
4918 .await?;
4919 }
4920
4921 let (frames, cursor) = query_timeline_snapshot(
4922 &aggregator,
4923 ConsoleTimelineQuery {
4924 identity: Some("agent-a".to_string()),
4925 after: Some(ConsoleCursor::from("console:100")),
4926 limit: 5_000,
4927 ..ConsoleTimelineQuery::default()
4928 },
4929 )
4930 .await?;
4931
4932 assert_eq!(frames.len(), 2_400);
4933 assert_eq!(
4934 frames.first().and_then(|frame| frame.cursor.seq()),
4935 Some(101)
4936 );
4937 assert_eq!(
4938 frames.last().and_then(|frame| frame.cursor.seq()),
4939 Some(2_500)
4940 );
4941 assert_eq!(cursor.as_ref().and_then(ConsoleCursor::seq), Some(2_500));
4942 Ok(())
4943 }
4944
4945 #[tokio::test]
4946 async fn multipart_blob_upload_stores_one_file() -> Result<(), Box<dyn std::error::Error>> {
4947 let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
4948 let mut files = BTreeMap::new();
4949 files.insert(
4950 "upload-1".to_string(),
4951 MultipartImageUpload {
4952 media_type: "image/png".to_string(),
4953 bytes: Bytes::from_static(b"png-data"),
4954 },
4955 );
4956 let result = externalize_single_image_upload(
4957 &json!({
4958 "upload": {
4959 "type": "image_upload",
4960 "upload_id": "upload-1",
4961 "media_type": "image/png"
4962 }
4963 }),
4964 files,
4965 store.clone(),
4966 )
4967 .await
4968 .map_err(std::io::Error::other)?;
4969
4970 assert_eq!(result["media_type"], json!("image/png"));
4971 assert_eq!(result["size"], json!(8));
4972 let Some(blob_id) = result["blob_id"].as_str() else {
4973 return Err(std::io::Error::other("blob id").into());
4974 };
4975 let payload = store
4976 .get_bytes(&meerkat_core::BlobId::from(blob_id))
4977 .await?;
4978 assert_eq!(payload.data.as_ref(), b"png-data");
4979 Ok(())
4980 }
4981
4982 #[tokio::test]
4983 async fn multipart_blob_upload_accepts_part_name_alias()
4984 -> Result<(), Box<dyn std::error::Error>> {
4985 let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
4986 let mut files = BTreeMap::new();
4987 files.insert(
4988 "image-field".to_string(),
4989 MultipartImageUpload {
4990 media_type: "image/png".to_string(),
4991 bytes: Bytes::from_static(b"png-data"),
4992 },
4993 );
4994 let result = externalize_single_image_upload(
4995 &json!({
4996 "upload": {
4997 "type": "image_upload",
4998 "part_name": "image-field",
4999 "media_type": "image/png"
5000 }
5001 }),
5002 files,
5003 store,
5004 )
5005 .await
5006 .map_err(std::io::Error::other)?;
5007
5008 assert_eq!(result["media_type"], json!("image/png"));
5009 assert!(
5010 result["blob_id"]
5011 .as_str()
5012 .is_some_and(|value| value.starts_with("sha256:"))
5013 );
5014 Ok(())
5015 }
5016
5017 #[tokio::test]
5018 async fn multipart_blob_upload_rejects_media_mismatch() -> Result<(), Box<dyn std::error::Error>>
5019 {
5020 let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
5021 let mut files = BTreeMap::new();
5022 files.insert(
5023 "upload-1".to_string(),
5024 MultipartImageUpload {
5025 media_type: "image/jpeg".to_string(),
5026 bytes: Bytes::from_static(b"jpeg-data"),
5027 },
5028 );
5029 let err = match externalize_single_image_upload(
5030 &json!({
5031 "upload": {
5032 "type": "image_upload",
5033 "upload_id": "upload-1",
5034 "media_type": "image/png"
5035 }
5036 }),
5037 files,
5038 store,
5039 )
5040 .await
5041 {
5042 Ok(_) => return Err(std::io::Error::other("media mismatch").into()),
5043 Err(err) => err,
5044 };
5045 assert!(
5046 err.contains("media type mismatch"),
5047 "unexpected error: {err}"
5048 );
5049 Ok(())
5050 }
5051
5052 #[tokio::test]
5053 async fn multipart_blob_upload_rejects_extra_file() -> Result<(), Box<dyn std::error::Error>> {
5054 let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
5055 let mut files = BTreeMap::new();
5056 for id in ["upload-1", "upload-2"] {
5057 files.insert(
5058 id.to_string(),
5059 MultipartImageUpload {
5060 media_type: "image/png".to_string(),
5061 bytes: Bytes::from_static(b"png"),
5062 },
5063 );
5064 }
5065 let err = match externalize_single_image_upload(
5066 &json!({
5067 "upload": {
5068 "type": "image_upload",
5069 "upload_id": "upload-1",
5070 "media_type": "image/png"
5071 }
5072 }),
5073 files,
5074 store,
5075 )
5076 .await
5077 {
5078 Ok(_) => return Err(std::io::Error::other("one file only").into()),
5079 Err(err) => err,
5080 };
5081 assert!(
5082 err.contains("exactly one file part"),
5083 "unexpected error: {err}"
5084 );
5085 Ok(())
5086 }
5087
5088 #[tokio::test]
5089 async fn multipart_send_replaces_placeholders_and_removes_shadow_message()
5090 -> Result<(), Box<dyn std::error::Error>> {
5091 let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
5092 let mut files = BTreeMap::new();
5093 files.insert(
5094 "upload-1".to_string(),
5095 MultipartImageUpload {
5096 media_type: "image/webp".to_string(),
5097 bytes: Bytes::from_static(b"webp-data"),
5098 },
5099 );
5100 let mut params = json!({
5101 "member_id": "artist",
5102 "message": "stale shadow text",
5103 "content": [
5104 { "type": "text", "text": "describe" },
5105 {
5106 "type": "image_upload",
5107 "upload_id": "upload-1",
5108 "media_type": "image/webp"
5109 }
5110 ]
5111 });
5112 externalize_image_upload_placeholders(&mut params, files, store)
5113 .await
5114 .map_err(std::io::Error::other)?;
5115
5116 assert!(params.get("message").is_none());
5117 assert_eq!(params["content"][1]["type"], json!("image"));
5118 assert_eq!(params["content"][1]["source"], json!("blob"));
5119 assert_eq!(params["content"][1]["media_type"], json!("image/webp"));
5120 assert!(
5121 params["content"][1]["blob_id"]
5122 .as_str()
5123 .is_some_and(|value| value.starts_with("sha256:"))
5124 );
5125 Ok(())
5126 }
5127
5128 #[tokio::test]
5129 async fn multipart_send_accepts_part_name_placeholder() -> Result<(), Box<dyn std::error::Error>>
5130 {
5131 let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
5132 let mut files = BTreeMap::new();
5133 files.insert(
5134 "image-field".to_string(),
5135 MultipartImageUpload {
5136 media_type: "image/png".to_string(),
5137 bytes: Bytes::from_static(b"png-data"),
5138 },
5139 );
5140 let mut params = json!({
5141 "member_id": "analyst",
5142 "content": [
5143 { "type": "text", "text": "describe" },
5144 {
5145 "type": "image_upload",
5146 "part_name": "image-field",
5147 "media_type": "image/png"
5148 }
5149 ]
5150 });
5151
5152 externalize_image_upload_placeholders(&mut params, files, store)
5153 .await
5154 .map_err(std::io::Error::other)?;
5155
5156 assert_eq!(params["content"][1]["type"], json!("image"));
5157 assert_eq!(params["content"][1]["source"], json!("blob"));
5158 assert_eq!(params["content"][1]["media_type"], json!("image/png"));
5159 Ok(())
5160 }
5161
5162 #[tokio::test]
5163 async fn multipart_send_rejects_placeholder_without_file()
5164 -> Result<(), Box<dyn std::error::Error>> {
5165 let store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
5166 let mut params = json!({
5167 "content": [{
5168 "type": "image_upload",
5169 "upload_id": "missing",
5170 "media_type": "image/png"
5171 }]
5172 });
5173 let err = match externalize_image_upload_placeholders(&mut params, BTreeMap::new(), store)
5174 .await
5175 {
5176 Ok(()) => return Err(std::io::Error::other("missing file").into()),
5177 Err(err) => err,
5178 };
5179 assert!(err.contains("missing file part"), "unexpected error: {err}");
5180 Ok(())
5181 }
5182}