1use std::collections::{BTreeMap, BTreeSet};
4use std::future::Future;
5use std::pin::Pin;
6use std::time::Duration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11use crate::console_aggregator::is_implicit_delegate_member;
12use crate::runtime::{
13 BigQuerySessionStoreAdapter, BigQuerySessionStoreError, ConsoleRestJsonRequest,
14 ConsoleRestJsonResponse, DeliveryHistoryRequest, DeliverySendError, DeliverySendRequest,
15 ElephantMemoryStoreError, GatingDecideError, GatingDecideRequest, GatingDecision,
16 GatingEvaluateRequest, GatingRiskTier, MemoryIndexError, MemoryIndexRequest,
17 MemoryQueryRequest, MobkitRuntimeHandle, ModuleRouteError, ModuleRouteRequest,
18 ROUTING_RETRY_MAX_CAP, RoutingResolveError, RoutingResolveRequest, RuntimeDecisionState,
19 RuntimeRoute, RuntimeRouteMutationError, ScheduleDefinition, ScheduleValidationError,
20 SessionPersistenceRow, SubscribeError, SubscribeRequest, SubscribeScope,
21 handle_console_rest_json_route, route_module_call, validate_schedules,
22};
23use crate::unified_runtime::{EventQuery, UnifiedRuntime};
24
25mod console_ingress;
26mod gating_methods;
27mod memory_methods;
28pub(crate) mod mob_methods;
29pub(crate) mod params;
30mod routing_delivery_methods;
31mod scheduling_methods;
32mod session_store_methods;
33mod subscribe_methods;
34
35pub use console_ingress::handle_console_ingress_json;
36
37use gating_methods::{
38 GatingParamsError, parse_gating_audit_params, parse_gating_decide_params,
39 parse_gating_evaluate_params, parse_gating_pending_params,
40};
41use memory_methods::{
42 MemoryParamsError, parse_memory_index_params, parse_memory_query_params,
43 parse_memory_stores_params,
44};
45use routing_delivery_methods::{
46 RoutingDeliveryParamsError, parse_delivery_history_params, parse_delivery_send_params,
47 parse_routing_resolve_params, parse_routing_route_add_params,
48 parse_routing_route_delete_params, parse_routing_routes_list_params,
49};
50use scheduling_methods::{format_schedule_validation_error, parse_scheduling_params};
51use session_store_methods::{
52 BigQuerySessionStoreRpcError, format_bigquery_store_error, parse_bigquery_session_store_params,
53 run_bigquery_session_store_request,
54};
55use subscribe_methods::{SubscribeParamsError, parse_subscribe_request};
56
57pub const JSONRPC_VERSION: &str = "2.0";
58pub const MOBKIT_CONTRACT_VERSION: &str = "0.4.0";
59pub const MAX_SCHEDULES_PER_REQUEST: usize = 256;
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub enum RpcCapabilitiesError {
63 InvalidJson,
64 InvalidSchema,
65 MissingContractVersion,
66 InvalidContractVersion,
67}
68
69impl std::fmt::Display for RpcCapabilitiesError {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 match self {
72 Self::InvalidJson => write!(f, "invalid JSON"),
73 Self::InvalidSchema => write!(f, "invalid schema"),
74 Self::MissingContractVersion => write!(f, "missing contract version"),
75 Self::InvalidContractVersion => write!(f, "invalid contract version"),
76 }
77 }
78}
79
80impl std::error::Error for RpcCapabilitiesError {}
81
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83pub struct RpcCapabilities {
84 pub contract_version: String,
85 #[serde(flatten)]
86 pub extra: BTreeMap<String, Value>,
87}
88
89pub fn parse_rpc_capabilities(line: &str) -> Result<RpcCapabilities, RpcCapabilitiesError> {
90 let raw: Value = serde_json::from_str(line).map_err(|_| RpcCapabilitiesError::InvalidJson)?;
91 let object = raw.as_object().ok_or(RpcCapabilitiesError::InvalidSchema)?;
92 let contract = object
93 .get("contract_version")
94 .ok_or(RpcCapabilitiesError::MissingContractVersion)?;
95 let contract_str = contract
96 .as_str()
97 .ok_or(RpcCapabilitiesError::InvalidContractVersion)?;
98 if contract_str.trim().is_empty() {
99 return Err(RpcCapabilitiesError::InvalidContractVersion);
100 }
101 serde_json::from_value(raw).map_err(|_| RpcCapabilitiesError::InvalidSchema)
102}
103
104pub const MOB_EVENTS_STALE_CURSOR_CODE: i64 = -32010;
111
112pub const MEMORY_BACKEND_UNAVAILABLE_CODE: i64 = -32012;
119
120pub const CONSOLE_TIMELINE_REPLAY_UNAVAILABLE_CODE: i64 = -32013;
125
126#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
127pub struct JsonRpcRequest {
128 pub jsonrpc: String,
129 #[serde(default)]
130 pub id: Option<Value>,
131 pub method: String,
132 #[serde(default)]
133 pub params: Value,
134}
135
136#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
137pub struct JsonRpcError {
138 pub code: i64,
139 pub message: String,
140 #[serde(default, skip_serializing_if = "Option::is_none")]
145 pub data: Option<Value>,
146}
147
148impl JsonRpcError {
149 pub fn new(code: i64, message: impl Into<String>) -> Self {
150 Self {
151 code,
152 message: message.into(),
153 data: None,
154 }
155 }
156
157 #[must_use]
158 pub fn with_data(mut self, data: Value) -> Self {
159 self.data = Some(data);
160 self
161 }
162}
163
164#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
165pub struct JsonRpcResponse {
166 pub jsonrpc: String,
167 pub id: Value,
168 #[serde(skip_serializing_if = "Option::is_none")]
169 pub result: Option<Value>,
170 #[serde(skip_serializing_if = "Option::is_none")]
171 pub error: Option<JsonRpcError>,
172}
173
174pub fn handle_mobkit_rpc_json(
175 runtime: &mut MobkitRuntimeHandle,
176 request_json: &str,
177 timeout: Duration,
178) -> String {
179 let raw_request: Value = match serde_json::from_str(request_json) {
180 Ok(raw_request) => raw_request,
181 Err(_) => {
182 return serialize_response(&JsonRpcResponse {
183 jsonrpc: JSONRPC_VERSION.to_string(),
184 id: Value::Null,
185 result: None,
186 error: Some(JsonRpcError {
187 code: -32700,
188 message: "Parse error".to_string(),
189 data: None,
190 }),
191 });
192 }
193 };
194 let response_id = raw_request
195 .as_object()
196 .and_then(|object| object.get("id"))
197 .cloned()
198 .unwrap_or(Value::Null);
199 let request: JsonRpcRequest = match serde_json::from_value(raw_request) {
200 Ok(request) => request,
201 Err(_) => {
202 return serialize_response(&JsonRpcResponse {
203 jsonrpc: JSONRPC_VERSION.to_string(),
204 id: response_id,
205 result: None,
206 error: Some(JsonRpcError {
207 code: -32600,
208 message: "Invalid Request".to_string(),
209 data: None,
210 }),
211 });
212 }
213 };
214 let is_notification = request.id.is_none();
215 let response_id = request.id.clone().unwrap_or(Value::Null);
216
217 if request.jsonrpc != "2.0" {
218 let response = JsonRpcResponse {
219 jsonrpc: JSONRPC_VERSION.to_string(),
220 id: response_id,
221 result: None,
222 error: Some(JsonRpcError {
223 code: -32600,
224 message: "Invalid Request".to_string(),
225 data: None,
226 }),
227 };
228 return if is_notification {
229 String::new()
230 } else {
231 serialize_response(&response)
232 };
233 }
234
235 let response = match request.method.as_str() {
236 "mobkit/status" => JsonRpcResponse {
237 jsonrpc: JSONRPC_VERSION.to_string(),
238 id: response_id,
239 result: Some(serde_json::json!({
240 "contract_version": MOBKIT_CONTRACT_VERSION,
241 "running": runtime.is_running(),
242 "loaded_modules": runtime.loaded_modules(),
243 })),
244 error: None,
245 },
246 "mobkit/capabilities" => JsonRpcResponse {
247 jsonrpc: JSONRPC_VERSION.to_string(),
248 id: response_id,
249 result: Some(serde_json::json!({
250 "contract_version": MOBKIT_CONTRACT_VERSION,
251 "methods": [
252 "mobkit/status",
253 "mobkit/capabilities",
254 "mobkit/reconcile",
255 "mobkit/spawn_member",
256 "mobkit/scheduling/evaluate",
257 "mobkit/scheduling/dispatch",
258 "mobkit/routing/resolve",
259 "mobkit/routing/routes/list",
260 "mobkit/routing/routes/add",
261 "mobkit/routing/routes/delete",
262 "mobkit/delivery/send",
263 "mobkit/delivery/history",
264 "mobkit/events/subscribe",
265 "mobkit/memory/stores",
266 "mobkit/memory/index",
267 "mobkit/memory/query",
268 "mobkit/session_store/bigquery",
269 "mobkit/gating/evaluate",
270 "mobkit/gating/pending",
271 "mobkit/gating/decide",
272 "mobkit/gating/audit",
273 "mobkit/call_tool",
274 "mobkit/models/catalog"
275 ],
276 "loaded_modules": runtime.loaded_modules(),
277 "runtime_capabilities": {
278 "can_spawn_members": false,
279 "can_send_messages": false,
280 "can_wire_members": false,
281 "can_retire_members": false,
282 "available_spawn_modes": ["module"],
283 }
284 })),
285 error: None,
286 },
287 "mobkit/models/catalog" => JsonRpcResponse {
288 jsonrpc: JSONRPC_VERSION.to_string(),
289 id: response_id,
290 result: Some(build_models_catalog_result()),
291 error: None,
292 },
293 "mobkit/reconcile" => {
294 let modules = match params::required_string_array(&request.params, "modules") {
295 Ok(m) => m,
296 Err(reason) => {
297 return serialize_response(&JsonRpcResponse {
298 jsonrpc: JSONRPC_VERSION.to_string(),
299 id: response_id,
300 result: None,
301 error: Some(JsonRpcError {
302 code: -32602,
303 message: format!("Invalid params: {reason}"),
304 data: None,
305 }),
306 });
307 }
308 };
309
310 match runtime.reconcile_modules(modules.clone(), timeout) {
311 Ok(added) => JsonRpcResponse {
312 jsonrpc: JSONRPC_VERSION.to_string(),
313 id: response_id,
314 result: Some(serde_json::json!({
315 "accepted": true,
316 "reconciled_modules": modules,
317 "added": added
318 })),
319 error: None,
320 },
321 Err(err) => JsonRpcResponse {
322 jsonrpc: JSONRPC_VERSION.to_string(),
323 id: response_id,
324 result: None,
325 error: Some(JsonRpcError {
326 code: -32602,
327 message: format!("Invalid params: {err:?}"),
328 data: None,
329 }),
330 },
331 }
332 }
333 "mobkit/spawn_member" => {
334 let module_id = request
335 .params
336 .get("module_id")
337 .and_then(Value::as_str)
338 .unwrap_or_default()
339 .to_string();
340 if module_id.is_empty() {
341 JsonRpcResponse {
342 jsonrpc: JSONRPC_VERSION.to_string(),
343 id: response_id,
344 result: None,
345 error: Some(JsonRpcError {
346 code: -32602,
347 message: "Invalid params: module_id required".to_string(),
348 data: None,
349 }),
350 }
351 } else {
352 match runtime.spawn_member(&module_id, timeout) {
353 Ok(()) => JsonRpcResponse {
354 jsonrpc: JSONRPC_VERSION.to_string(),
355 id: response_id,
356 result: Some(serde_json::json!({
357 "accepted": true,
358 "module_id": module_id
359 })),
360 error: None,
361 },
362 Err(err) => JsonRpcResponse {
363 jsonrpc: JSONRPC_VERSION.to_string(),
364 id: response_id,
365 result: None,
366 error: Some(JsonRpcError {
367 code: -32602,
368 message: format!("Invalid params: {err:?}"),
369 data: None,
370 }),
371 },
372 }
373 }
374 }
375 "mobkit/scheduling/evaluate" => match parse_scheduling_params(&request.params) {
376 Ok((schedules, tick_ms)) => match runtime.evaluate_schedule_tick(&schedules, tick_ms) {
377 Ok(evaluation) => JsonRpcResponse {
378 jsonrpc: JSONRPC_VERSION.to_string(),
379 id: response_id,
380 result: Some(serde_json::to_value(evaluation).unwrap_or(Value::Null)),
381 error: None,
382 },
383 Err(err) => JsonRpcResponse {
384 jsonrpc: JSONRPC_VERSION.to_string(),
385 id: response_id,
386 result: None,
387 error: Some(JsonRpcError {
388 code: -32602,
389 message: format!(
390 "Invalid params: {}",
391 format_schedule_validation_error(err)
392 ),
393 data: None,
394 }),
395 },
396 },
397 Err(message) => JsonRpcResponse {
398 jsonrpc: JSONRPC_VERSION.to_string(),
399 id: response_id,
400 result: None,
401 error: Some(JsonRpcError {
402 code: -32602,
403 message: format!("Invalid params: {message}"),
404 data: None,
405 }),
406 },
407 },
408 "mobkit/scheduling/dispatch" => match parse_scheduling_params(&request.params) {
409 Ok((schedules, tick_ms)) => match runtime.dispatch_schedule_tick(&schedules, tick_ms) {
410 Ok(dispatch) => JsonRpcResponse {
411 jsonrpc: JSONRPC_VERSION.to_string(),
412 id: response_id,
413 result: Some(serde_json::to_value(dispatch).unwrap_or(Value::Null)),
414 error: None,
415 },
416 Err(err) => JsonRpcResponse {
417 jsonrpc: JSONRPC_VERSION.to_string(),
418 id: response_id,
419 result: None,
420 error: Some(JsonRpcError {
421 code: -32602,
422 message: format!(
423 "Invalid params: {}",
424 format_schedule_validation_error(err)
425 ),
426 data: None,
427 }),
428 },
429 },
430 Err(message) => JsonRpcResponse {
431 jsonrpc: JSONRPC_VERSION.to_string(),
432 id: response_id,
433 result: None,
434 error: Some(JsonRpcError {
435 code: -32602,
436 message: format!("Invalid params: {message}"),
437 data: None,
438 }),
439 },
440 },
441 "mobkit/routing/resolve" => {
442 match parse_routing_resolve_params(&request.params).and_then(|resolve_request| {
443 runtime
444 .resolve_routing(resolve_request)
445 .map_err(RoutingDeliveryParamsError::Routing)
446 }) {
447 Ok(resolution) => JsonRpcResponse {
448 jsonrpc: JSONRPC_VERSION.to_string(),
449 id: response_id,
450 result: Some(serde_json::to_value(resolution).unwrap_or(Value::Null)),
451 error: None,
452 },
453 Err(err) => JsonRpcResponse {
454 jsonrpc: JSONRPC_VERSION.to_string(),
455 id: response_id,
456 result: None,
457 error: Some(JsonRpcError {
458 code: -32602,
459 message: format!("Invalid params: {}", err.message()),
460 data: None,
461 }),
462 },
463 }
464 }
465 "mobkit/routing/routes/list" => match parse_routing_routes_list_params(&request.params) {
466 Ok(()) => JsonRpcResponse {
467 jsonrpc: JSONRPC_VERSION.to_string(),
468 id: response_id,
469 result: Some(serde_json::json!({
470 "routes": runtime.list_runtime_routes()
471 })),
472 error: None,
473 },
474 Err(err) => JsonRpcResponse {
475 jsonrpc: JSONRPC_VERSION.to_string(),
476 id: response_id,
477 result: None,
478 error: Some(JsonRpcError {
479 code: -32602,
480 message: format!("Invalid params: {}", err.message()),
481 data: None,
482 }),
483 },
484 },
485 "mobkit/routing/routes/add" => match parse_routing_route_add_params(&request.params)
486 .and_then(|route| {
487 runtime
488 .add_runtime_route(route)
489 .map_err(RoutingDeliveryParamsError::RouteMutation)
490 }) {
491 Ok(route) => JsonRpcResponse {
492 jsonrpc: JSONRPC_VERSION.to_string(),
493 id: response_id,
494 result: Some(serde_json::json!({ "route": route })),
495 error: None,
496 },
497 Err(err) => JsonRpcResponse {
498 jsonrpc: JSONRPC_VERSION.to_string(),
499 id: response_id,
500 result: None,
501 error: Some(JsonRpcError {
502 code: -32602,
503 message: format!("Invalid params: {}", err.message()),
504 data: None,
505 }),
506 },
507 },
508 "mobkit/routing/routes/delete" => match parse_routing_route_delete_params(&request.params)
509 .and_then(|route_key| {
510 runtime
511 .delete_runtime_route(&route_key)
512 .map_err(RoutingDeliveryParamsError::RouteMutation)
513 }) {
514 Ok(route) => JsonRpcResponse {
515 jsonrpc: JSONRPC_VERSION.to_string(),
516 id: response_id,
517 result: Some(serde_json::json!({ "deleted": route })),
518 error: None,
519 },
520 Err(err) => JsonRpcResponse {
521 jsonrpc: JSONRPC_VERSION.to_string(),
522 id: response_id,
523 result: None,
524 error: Some(JsonRpcError {
525 code: -32602,
526 message: format!("Invalid params: {}", err.message()),
527 data: None,
528 }),
529 },
530 },
531 "mobkit/delivery/send" => {
532 match parse_delivery_send_params(&request.params).and_then(|send_request| {
533 runtime
534 .send_delivery(send_request)
535 .map_err(RoutingDeliveryParamsError::Delivery)
536 }) {
537 Ok(record) => JsonRpcResponse {
538 jsonrpc: JSONRPC_VERSION.to_string(),
539 id: response_id,
540 result: Some(serde_json::to_value(record).unwrap_or(Value::Null)),
541 error: None,
542 },
543 Err(err) => JsonRpcResponse {
544 jsonrpc: JSONRPC_VERSION.to_string(),
545 id: response_id,
546 result: None,
547 error: Some(JsonRpcError {
548 code: -32602,
549 message: format!("Invalid params: {}", err.message()),
550 data: None,
551 }),
552 },
553 }
554 }
555 "mobkit/delivery/history" => match parse_delivery_history_params(&request.params) {
556 Ok(history_request) => JsonRpcResponse {
557 jsonrpc: JSONRPC_VERSION.to_string(),
558 id: response_id,
559 result: Some(
560 serde_json::to_value(runtime.delivery_history(history_request))
561 .unwrap_or(Value::Null),
562 ),
563 error: None,
564 },
565 Err(err) => JsonRpcResponse {
566 jsonrpc: JSONRPC_VERSION.to_string(),
567 id: response_id,
568 result: None,
569 error: Some(JsonRpcError {
570 code: -32602,
571 message: format!("Invalid params: {}", err.message()),
572 data: None,
573 }),
574 },
575 },
576 "mobkit/events/subscribe" => {
577 match parse_subscribe_request(&request.params).and_then(|subscribe_request| {
578 runtime
579 .subscribe_events(subscribe_request)
580 .map_err(SubscribeParamsError::Runtime)
581 }) {
582 Ok(subscribe_result) => JsonRpcResponse {
583 jsonrpc: JSONRPC_VERSION.to_string(),
584 id: response_id,
585 result: Some(serde_json::to_value(subscribe_result).unwrap_or(Value::Null)),
586 error: None,
587 },
588 Err(err) => JsonRpcResponse {
589 jsonrpc: JSONRPC_VERSION.to_string(),
590 id: response_id,
591 result: None,
592 error: Some(JsonRpcError {
593 code: -32602,
594 message: format!("Invalid params: {}", err.message()),
595 data: None,
596 }),
597 },
598 }
599 }
600 "mobkit/memory/stores" => match parse_memory_stores_params(&request.params) {
601 Ok(()) => JsonRpcResponse {
602 jsonrpc: JSONRPC_VERSION.to_string(),
603 id: response_id,
604 result: Some(serde_json::json!({
605 "stores": runtime.memory_stores(),
606 })),
607 error: None,
608 },
609 Err(err) => JsonRpcResponse {
610 jsonrpc: JSONRPC_VERSION.to_string(),
611 id: response_id,
612 result: None,
613 error: Some(JsonRpcError {
614 code: -32602,
615 message: format!("Invalid params: {}", err.message()),
616 data: None,
617 }),
618 },
619 },
620 "mobkit/memory/index" => match parse_memory_index_params(&request.params) {
621 Ok(index_request) => match runtime.memory_index(index_request) {
622 Ok(indexed) => JsonRpcResponse {
623 jsonrpc: JSONRPC_VERSION.to_string(),
624 id: response_id,
625 result: Some(serde_json::to_value(indexed).unwrap_or(Value::Null)),
626 error: None,
627 },
628 Err(MemoryIndexError::BackendPersistFailed(error)) => JsonRpcResponse {
629 jsonrpc: JSONRPC_VERSION.to_string(),
630 id: response_id,
631 result: None,
632 error: Some(JsonRpcError {
633 code: MEMORY_BACKEND_UNAVAILABLE_CODE,
634 message: format!(
635 "Memory backend unavailable: {}",
636 MemoryParamsError::backend_message(&error)
637 ),
638 data: None,
639 }),
640 },
641 Err(err) => JsonRpcResponse {
642 jsonrpc: JSONRPC_VERSION.to_string(),
643 id: response_id,
644 result: None,
645 error: Some(JsonRpcError {
646 code: -32602,
647 message: format!(
648 "Invalid params: {}",
649 MemoryParamsError::Index(err).message()
650 ),
651 data: None,
652 }),
653 },
654 },
655 Err(err) => JsonRpcResponse {
656 jsonrpc: JSONRPC_VERSION.to_string(),
657 id: response_id,
658 result: None,
659 error: Some(JsonRpcError {
660 code: -32602,
661 message: format!("Invalid params: {}", err.message()),
662 data: None,
663 }),
664 },
665 },
666 "mobkit/memory/query" => match parse_memory_query_params(&request.params) {
667 Ok(query_request) => JsonRpcResponse {
668 jsonrpc: JSONRPC_VERSION.to_string(),
669 id: response_id,
670 result: Some(
671 serde_json::to_value(runtime.memory_query(query_request))
672 .unwrap_or(Value::Null),
673 ),
674 error: None,
675 },
676 Err(err) => JsonRpcResponse {
677 jsonrpc: JSONRPC_VERSION.to_string(),
678 id: response_id,
679 result: None,
680 error: Some(JsonRpcError {
681 code: -32602,
682 message: format!("Invalid params: {}", err.message()),
683 data: None,
684 }),
685 },
686 },
687 "mobkit/session_store/bigquery" => {
688 match parse_bigquery_session_store_params(&request.params)
689 .and_then(run_bigquery_session_store_request)
690 {
691 Ok(result) => JsonRpcResponse {
692 jsonrpc: JSONRPC_VERSION.to_string(),
693 id: response_id,
694 result: Some(result),
695 error: None,
696 },
697 Err(BigQuerySessionStoreRpcError::Params(message)) => JsonRpcResponse {
698 jsonrpc: JSONRPC_VERSION.to_string(),
699 id: response_id,
700 result: None,
701 error: Some(JsonRpcError {
702 code: -32602,
703 message: format!("Invalid params: {message}"),
704 data: None,
705 }),
706 },
707 Err(BigQuerySessionStoreRpcError::Store(error)) => JsonRpcResponse {
708 jsonrpc: JSONRPC_VERSION.to_string(),
709 id: response_id,
710 result: None,
711 error: Some(JsonRpcError {
712 code: -32011,
713 message: format!(
714 "BigQuery session store request failed: {}",
715 format_bigquery_store_error(&error)
716 ),
717 data: None,
718 }),
719 },
720 }
721 }
722 "mobkit/gating/evaluate" => match parse_gating_evaluate_params(&request.params) {
723 Ok(gating_request) => JsonRpcResponse {
724 jsonrpc: JSONRPC_VERSION.to_string(),
725 id: response_id,
726 result: Some(
727 serde_json::to_value(runtime.evaluate_gating_action(gating_request))
728 .unwrap_or(Value::Null),
729 ),
730 error: None,
731 },
732 Err(err) => JsonRpcResponse {
733 jsonrpc: JSONRPC_VERSION.to_string(),
734 id: response_id,
735 result: None,
736 error: Some(JsonRpcError {
737 code: -32602,
738 message: format!("Invalid params: {}", err.message()),
739 data: None,
740 }),
741 },
742 },
743 "mobkit/gating/pending" => match parse_gating_pending_params(&request.params) {
744 Ok(()) => JsonRpcResponse {
745 jsonrpc: JSONRPC_VERSION.to_string(),
746 id: response_id,
747 result: Some(serde_json::json!({
748 "pending": runtime.list_gating_pending(),
749 })),
750 error: None,
751 },
752 Err(err) => JsonRpcResponse {
753 jsonrpc: JSONRPC_VERSION.to_string(),
754 id: response_id,
755 result: None,
756 error: Some(JsonRpcError {
757 code: -32602,
758 message: format!("Invalid params: {}", err.message()),
759 data: None,
760 }),
761 },
762 },
763 "mobkit/gating/decide" => {
764 match parse_gating_decide_params(&request.params).and_then(|decide_request| {
765 runtime
766 .decide_gating_action(decide_request)
767 .map_err(GatingParamsError::Decision)
768 }) {
769 Ok(result) => JsonRpcResponse {
770 jsonrpc: JSONRPC_VERSION.to_string(),
771 id: response_id,
772 result: Some(serde_json::to_value(result).unwrap_or(Value::Null)),
773 error: None,
774 },
775 Err(err) => JsonRpcResponse {
776 jsonrpc: JSONRPC_VERSION.to_string(),
777 id: response_id,
778 result: None,
779 error: Some(JsonRpcError {
780 code: -32602,
781 message: format!("Invalid params: {}", err.message()),
782 data: None,
783 }),
784 },
785 }
786 }
787 "mobkit/gating/audit" => match parse_gating_audit_params(&request.params) {
788 Ok(limit) => JsonRpcResponse {
789 jsonrpc: JSONRPC_VERSION.to_string(),
790 id: response_id,
791 result: Some(serde_json::json!({
792 "entries": runtime.gating_audit_entries(limit),
793 })),
794 error: None,
795 },
796 Err(err) => JsonRpcResponse {
797 jsonrpc: JSONRPC_VERSION.to_string(),
798 id: response_id,
799 result: None,
800 error: Some(JsonRpcError {
801 code: -32602,
802 message: format!("Invalid params: {}", err.message()),
803 data: None,
804 }),
805 },
806 },
807 "mobkit/call_tool" => {
808 let module_id = request.params.get("module_id").and_then(Value::as_str);
809 let tool = request.params.get("tool").and_then(Value::as_str);
810 let arguments = request
811 .params
812 .get("arguments")
813 .cloned()
814 .unwrap_or(serde_json::json!({}));
815
816 match (module_id, tool) {
817 (Some(module_id), Some(tool)) if !module_id.is_empty() && !tool.is_empty() => {
818 let route = route_module_call(
819 runtime,
820 &ModuleRouteRequest {
821 module_id: module_id.to_string(),
822 method: tool.to_string(),
823 params: arguments,
824 },
825 timeout,
826 );
827 match route {
828 Ok(response) => JsonRpcResponse {
829 jsonrpc: JSONRPC_VERSION.to_string(),
830 id: response_id,
831 result: Some(serde_json::json!({
832 "module_id": response.module_id,
833 "tool": response.method,
834 "result": response.payload
835 })),
836 error: None,
837 },
838 Err(ModuleRouteError::UnloadedModule(mid)) => JsonRpcResponse {
839 jsonrpc: JSONRPC_VERSION.to_string(),
840 id: response_id,
841 result: None,
842 error: Some(JsonRpcError {
843 code: -32601,
844 message: format!("Module '{mid}' not loaded"),
845 data: None,
846 }),
847 },
848 Err(err) => JsonRpcResponse {
849 jsonrpc: JSONRPC_VERSION.to_string(),
850 id: response_id,
851 result: None,
852 error: Some(JsonRpcError {
853 code: -32000,
854 message: format!("Tool call failed: {err:?}"),
855 data: None,
856 }),
857 },
858 }
859 }
860 _ => JsonRpcResponse {
861 jsonrpc: JSONRPC_VERSION.to_string(),
862 id: response_id,
863 result: None,
864 error: Some(JsonRpcError {
865 code: -32602,
866 message: "Invalid params: module_id and tool required".to_string(),
867 data: None,
868 }),
869 },
870 }
871 }
872 method if method.contains('/') && !method.starts_with("mobkit/") => {
873 let module_id = method
874 .split('/')
875 .next()
876 .map(ToString::to_string)
877 .unwrap_or_default();
878 let route = route_module_call(
879 runtime,
880 &ModuleRouteRequest {
881 module_id,
882 method: method.to_string(),
883 params: request.params,
884 },
885 timeout,
886 );
887 match route {
888 Ok(response) => JsonRpcResponse {
889 jsonrpc: JSONRPC_VERSION.to_string(),
890 id: response_id,
891 result: Some(serde_json::json!({
892 "module_id": response.module_id,
893 "method": response.method,
894 "payload": response.payload
895 })),
896 error: None,
897 },
898 Err(ModuleRouteError::UnloadedModule(module_id)) => JsonRpcResponse {
899 jsonrpc: JSONRPC_VERSION.to_string(),
900 id: response_id,
901 result: None,
902 error: Some(JsonRpcError {
903 code: -32601,
904 message: format!("Module '{module_id}' not loaded"),
905 data: None,
906 }),
907 },
908 Err(err) => JsonRpcResponse {
909 jsonrpc: JSONRPC_VERSION.to_string(),
910 id: response_id,
911 result: None,
912 error: Some(JsonRpcError {
913 code: -32000,
914 message: format!("Module route failed: {err:?}"),
915 data: None,
916 }),
917 },
918 }
919 }
920 _ => JsonRpcResponse {
921 jsonrpc: JSONRPC_VERSION.to_string(),
922 id: response_id,
923 result: None,
924 error: Some(JsonRpcError {
925 code: -32601,
926 message: "Method not found".to_string(),
927 data: None,
928 }),
929 },
930 };
931 if is_notification {
932 String::new()
933 } else {
934 serialize_response(&response)
935 }
936}
937
938pub struct IdentityFirstContext {
940 pub runtime: std::sync::Arc<crate::identity_first::IdentityRuntime>,
941 pub roster_provider: std::sync::Arc<dyn crate::identity_first::contracts::RosterProvider>,
942 pub topology_provider:
943 Option<std::sync::Arc<dyn crate::identity_first::contracts::TopologyProvider>>,
944 pub customizer: Option<std::sync::Arc<dyn crate::identity_first::contracts::AgentCustomizer>>,
945}
946
947pub fn handle_unified_rpc_json<'a>(
948 runtime: &'a UnifiedRuntime,
949 request_json: &'a str,
950 timeout: Duration,
951 http_base_url: Option<&'a str>,
952 identity_ctx: Option<&'a IdentityFirstContext>,
953) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
954 Box::pin(handle_unified_rpc_json_inner(
955 runtime,
956 request_json,
957 timeout,
958 http_base_url,
959 identity_ctx,
960 ))
961}
962
963async fn handle_unified_rpc_json_inner(
964 runtime: &UnifiedRuntime,
965 request_json: &str,
966 timeout: Duration,
967 http_base_url: Option<&str>,
968 identity_ctx: Option<&IdentityFirstContext>,
969) -> String {
970 let raw_request: Value = match serde_json::from_str(request_json) {
971 Ok(raw_request) => raw_request,
972 Err(_) => {
973 return serialize_response(&JsonRpcResponse {
974 jsonrpc: JSONRPC_VERSION.to_string(),
975 id: Value::Null,
976 result: None,
977 error: Some(JsonRpcError {
978 code: -32700,
979 message: "Parse error".to_string(),
980 data: None,
981 }),
982 });
983 }
984 };
985 let response_id = raw_request
986 .as_object()
987 .and_then(|object| object.get("id"))
988 .cloned()
989 .unwrap_or(Value::Null);
990 let request: JsonRpcRequest = match serde_json::from_value(raw_request) {
991 Ok(request) => request,
992 Err(_) => {
993 return serialize_response(&JsonRpcResponse {
994 jsonrpc: JSONRPC_VERSION.to_string(),
995 id: response_id,
996 result: None,
997 error: Some(JsonRpcError {
998 code: -32600,
999 message: "Invalid Request".to_string(),
1000 data: None,
1001 }),
1002 });
1003 }
1004 };
1005 let is_notification = request.id.is_none();
1006 let response_id = request.id.clone().unwrap_or(Value::Null);
1007
1008 if request.jsonrpc != "2.0" {
1009 let response = JsonRpcResponse {
1010 jsonrpc: JSONRPC_VERSION.to_string(),
1011 id: response_id,
1012 result: None,
1013 error: Some(JsonRpcError {
1014 code: -32600,
1015 message: "Invalid Request".to_string(),
1016 data: None,
1017 }),
1018 };
1019 return if is_notification {
1020 String::new()
1021 } else {
1022 serialize_response(&response)
1023 };
1024 }
1025
1026 let response = match request.method.as_str() {
1027 "mobkit/status" => {
1028 let mob_state = Some(runtime.mob_handle().status_observation_snapshot());
1029 let is_running = runtime.module_is_running().await;
1030 let loaded = runtime.loaded_modules().await;
1031 let mut result = serde_json::json!({
1032 "contract_version": MOBKIT_CONTRACT_VERSION,
1033 "running": is_running,
1034 "loaded_modules": loaded,
1035 "mob_state": format!("{mob_state:?}"),
1036 });
1037 if let Some(url) = http_base_url {
1038 result["http_base_url"] = Value::String(url.to_string());
1039 }
1040 JsonRpcResponse {
1041 jsonrpc: JSONRPC_VERSION.to_string(),
1042 id: response_id,
1043 result: Some(result),
1044 error: None,
1045 }
1046 }
1047 "mobkit/capabilities" => {
1048 let loaded = runtime.loaded_modules().await;
1049 let mut methods = vec![
1050 "mobkit/init",
1051 "mobkit/status",
1052 "mobkit/capabilities",
1053 "mobkit/reconcile",
1054 "mobkit/spawn_member",
1055 "mobkit/scheduling/evaluate",
1056 "mobkit/scheduling/dispatch",
1057 "mobkit/routing/resolve",
1058 "mobkit/routing/routes/list",
1059 "mobkit/routing/routes/add",
1060 "mobkit/routing/routes/delete",
1061 "mobkit/delivery/send",
1062 "mobkit/delivery/history",
1063 "mobkit/events/subscribe",
1064 "mobkit/query_events",
1065 "mobkit/memory/stores",
1066 "mobkit/memory/index",
1067 "mobkit/memory/query",
1068 "mobkit/session_store/bigquery",
1069 "mobkit/gating/evaluate",
1070 "mobkit/gating/pending",
1071 "mobkit/gating/decide",
1072 "mobkit/gating/audit",
1073 "mobkit/call_tool",
1074 "mobkit/models/catalog",
1075 "mobkit/blob/get",
1076 "mobkit/send_message",
1077 "mobkit/find_members",
1078 "mobkit/ensure_member",
1079 "mobkit/list_members",
1080 "mobkit/get_member",
1081 "mobkit/retire_member",
1082 "mobkit/respawn_member",
1083 "mobkit/reconcile_edges",
1084 "mobkit/rediscover",
1085 "mobkit/mob_events/query",
1086 "mobkit/mob_events/subscribe",
1087 "mobkit/cross_mob/peer_info",
1089 "mobkit/cross_mob/wire_local",
1090 "mobkit/cross_mob/unwire_local",
1091 "mobkit/peer_pubkey",
1092 "mobkit/member_status",
1093 "mobkit/force_cancel_member",
1094 "mobkit/spawn_helper",
1095 "mobkit/fork_helper",
1096 "mobkit/attach_existing_session",
1097 "mobkit/cancel_flow",
1098 "mobkit/flow_status",
1099 "mobkit/list_flows",
1100 "mobkit/list_runs",
1101 "mobkit/run_flow",
1102 "mobkit/collect_completed",
1103 "mobkit/wait_ready",
1104 "mobkit/mob_labels/set",
1105 "mobkit/mob_labels/get",
1106 "mobkit/mob_labels/delete",
1107 "mobkit/run_labels/set",
1108 "mobkit/run_labels/get",
1109 "mobkit/run_labels/delete",
1110 ];
1111 if identity_ctx.is_some() {
1112 methods.extend_from_slice(&[
1113 "mobkit/send",
1114 "mobkit/interact",
1115 "mobkit/dispatch",
1116 "mobkit/subscribe",
1117 "mobkit/status_identity",
1118 "mobkit/respawn",
1119 "mobkit/retire",
1120 "mobkit/reset",
1121 "mobkit/delete_identity",
1122 "mobkit/inspect_identity",
1123 "mobkit/reconcile_identity",
1124 ]);
1125 }
1126 if runtime.has_contact_directory() {
1128 methods.push("mobkit/cross_mob/directory");
1129 }
1130 if runtime.has_peer_mob_handles().await && runtime.has_inproc_contacts() {
1134 methods.extend_from_slice(&[
1135 "mobkit/cross_mob/wire",
1136 "mobkit/cross_mob/unwire",
1137 "mobkit/cross_mob/send",
1138 ]);
1139 }
1140 JsonRpcResponse {
1141 jsonrpc: JSONRPC_VERSION.to_string(),
1142 id: response_id,
1143 result: Some(serde_json::json!({
1144 "contract_version": MOBKIT_CONTRACT_VERSION,
1145 "runtime_type": "unified",
1146 "methods": methods,
1147 "loaded_modules": loaded,
1148 "runtime_capabilities": {
1149 "can_spawn_members": true,
1150 "can_send_messages": true,
1151 "can_wire_members": true,
1152 "can_retire_members": true,
1153 "available_spawn_modes": ["module", "profile"],
1154 }
1155 })),
1156 error: None,
1157 }
1158 }
1159 "mobkit/reconcile" => {
1160 let modules = match params::required_string_array(&request.params, "modules") {
1161 Ok(m) => m,
1162 Err(reason) => {
1163 return serialize_response(&JsonRpcResponse {
1164 jsonrpc: JSONRPC_VERSION.to_string(),
1165 id: response_id,
1166 result: None,
1167 error: Some(JsonRpcError {
1168 code: -32602,
1169 message: format!("Invalid params: {reason}"),
1170 data: None,
1171 }),
1172 });
1173 }
1174 };
1175
1176 match runtime.reconcile_modules(modules.clone(), timeout).await {
1177 Ok(added) => JsonRpcResponse {
1178 jsonrpc: JSONRPC_VERSION.to_string(),
1179 id: response_id,
1180 result: Some(serde_json::json!({
1181 "accepted": true,
1182 "reconciled_modules": modules,
1183 "added": added
1184 })),
1185 error: None,
1186 },
1187 Err(err) => JsonRpcResponse {
1188 jsonrpc: JSONRPC_VERSION.to_string(),
1189 id: response_id,
1190 result: None,
1191 error: Some(JsonRpcError {
1192 code: -32602,
1193 message: format!("Invalid params: {err:?}"),
1194 data: None,
1195 }),
1196 },
1197 }
1198 }
1199 "mobkit/spawn_member" => {
1200 let module_id = request.params.get("module_id").and_then(Value::as_str);
1202 let profile = request.params.get("profile").and_then(Value::as_str);
1203 let meerkat_id = request.params.get("meerkat_id").and_then(Value::as_str);
1204
1205 if let Some(module_id) = module_id {
1206 if module_id.is_empty() {
1208 JsonRpcResponse {
1209 jsonrpc: JSONRPC_VERSION.to_string(),
1210 id: response_id,
1211 result: None,
1212 error: Some(JsonRpcError {
1213 code: -32602,
1214 message: "Invalid params: module_id required".to_string(),
1215 data: None,
1216 }),
1217 }
1218 } else {
1219 match runtime.spawn_member(module_id, timeout).await {
1220 Ok(()) => JsonRpcResponse {
1221 jsonrpc: JSONRPC_VERSION.to_string(),
1222 id: response_id,
1223 result: Some(serde_json::json!({
1224 "accepted": true,
1225 "module_id": module_id
1226 })),
1227 error: None,
1228 },
1229 Err(err) => JsonRpcResponse {
1230 jsonrpc: JSONRPC_VERSION.to_string(),
1231 id: response_id,
1232 result: None,
1233 error: Some(JsonRpcError {
1234 code: -32602,
1235 message: format!("Invalid params: {err:?}"),
1236 data: None,
1237 }),
1238 },
1239 }
1240 }
1241 } else if let (Some(profile), Some(meerkat_id)) = (profile, meerkat_id) {
1242 let spec = meerkat_mob::SpawnMemberSpec::from_wire(
1244 profile.to_string(),
1245 meerkat_id.to_string(),
1246 request
1247 .params
1248 .get("initial_message")
1249 .and_then(Value::as_str)
1250 .map(|s| meerkat_core::ContentInput::from(s.to_string())),
1251 None,
1252 None,
1253 );
1254 match runtime.spawn(spec).await {
1255 Ok(_member_ref) => JsonRpcResponse {
1256 jsonrpc: JSONRPC_VERSION.to_string(),
1257 id: response_id,
1258 result: Some(serde_json::json!({
1259 "accepted": true,
1260 "meerkat_id": meerkat_id
1261 })),
1262 error: None,
1263 },
1264 Err(err) => JsonRpcResponse {
1265 jsonrpc: JSONRPC_VERSION.to_string(),
1266 id: response_id,
1267 result: None,
1268 error: Some(JsonRpcError {
1269 code: -32602,
1270 message: format!("Invalid params: {err}"),
1271 data: None,
1272 }),
1273 },
1274 }
1275 } else {
1276 JsonRpcResponse {
1277 jsonrpc: JSONRPC_VERSION.to_string(),
1278 id: response_id,
1279 result: None,
1280 error: Some(JsonRpcError {
1281 code: -32602,
1282 message: "Invalid params: module_id or (profile + meerkat_id) required"
1283 .to_string(),
1284 data: None,
1285 }),
1286 }
1287 }
1288 }
1289 "mobkit/scheduling/evaluate" => match parse_scheduling_params(&request.params) {
1290 Ok((schedules, tick_ms)) => {
1291 match runtime.evaluate_schedule_tick(&schedules, tick_ms).await {
1292 Ok(evaluation) => JsonRpcResponse {
1293 jsonrpc: JSONRPC_VERSION.to_string(),
1294 id: response_id,
1295 result: Some(serde_json::to_value(evaluation).unwrap_or(Value::Null)),
1296 error: None,
1297 },
1298 Err(err) => JsonRpcResponse {
1299 jsonrpc: JSONRPC_VERSION.to_string(),
1300 id: response_id,
1301 result: None,
1302 error: Some(JsonRpcError {
1303 code: -32602,
1304 message: format!(
1305 "Invalid params: {}",
1306 format_schedule_validation_error(err)
1307 ),
1308 data: None,
1309 }),
1310 },
1311 }
1312 }
1313 Err(message) => JsonRpcResponse {
1314 jsonrpc: JSONRPC_VERSION.to_string(),
1315 id: response_id,
1316 result: None,
1317 error: Some(JsonRpcError {
1318 code: -32602,
1319 message: format!("Invalid params: {message}"),
1320 data: None,
1321 }),
1322 },
1323 },
1324 "mobkit/scheduling/dispatch" => match parse_scheduling_params(&request.params) {
1325 Ok((schedules, tick_ms)) => {
1326 match runtime.dispatch_schedule_tick(&schedules, tick_ms).await {
1327 Ok(dispatch) => JsonRpcResponse {
1328 jsonrpc: JSONRPC_VERSION.to_string(),
1329 id: response_id,
1330 result: Some(serde_json::to_value(dispatch).unwrap_or(Value::Null)),
1331 error: None,
1332 },
1333 Err(err) => JsonRpcResponse {
1334 jsonrpc: JSONRPC_VERSION.to_string(),
1335 id: response_id,
1336 result: None,
1337 error: Some(JsonRpcError {
1338 code: -32602,
1339 message: format!("Invalid params: {err}"),
1340 data: None,
1341 }),
1342 },
1343 }
1344 }
1345 Err(message) => JsonRpcResponse {
1346 jsonrpc: JSONRPC_VERSION.to_string(),
1347 id: response_id,
1348 result: None,
1349 error: Some(JsonRpcError {
1350 code: -32602,
1351 message: format!("Invalid params: {message}"),
1352 data: None,
1353 }),
1354 },
1355 },
1356 "mobkit/routing/resolve" => {
1357 let resolve_result = match parse_routing_resolve_params(&request.params) {
1358 Ok(resolve_request) => runtime
1359 .resolve_routing(resolve_request)
1360 .await
1361 .map_err(RoutingDeliveryParamsError::Routing),
1362 Err(e) => Err(e),
1363 };
1364 match resolve_result {
1365 Ok(resolution) => JsonRpcResponse {
1366 jsonrpc: JSONRPC_VERSION.to_string(),
1367 id: response_id,
1368 result: Some(serde_json::to_value(resolution).unwrap_or(Value::Null)),
1369 error: None,
1370 },
1371 Err(err) => JsonRpcResponse {
1372 jsonrpc: JSONRPC_VERSION.to_string(),
1373 id: response_id,
1374 result: None,
1375 error: Some(JsonRpcError {
1376 code: -32602,
1377 message: format!("Invalid params: {}", err.message()),
1378 data: None,
1379 }),
1380 },
1381 }
1382 }
1383 "mobkit/routing/routes/list" => match parse_routing_routes_list_params(&request.params) {
1384 Ok(()) => {
1385 let routes = runtime.list_runtime_routes().await;
1386 JsonRpcResponse {
1387 jsonrpc: JSONRPC_VERSION.to_string(),
1388 id: response_id,
1389 result: Some(serde_json::json!({
1390 "routes": routes
1391 })),
1392 error: None,
1393 }
1394 }
1395 Err(err) => JsonRpcResponse {
1396 jsonrpc: JSONRPC_VERSION.to_string(),
1397 id: response_id,
1398 result: None,
1399 error: Some(JsonRpcError {
1400 code: -32602,
1401 message: format!("Invalid params: {}", err.message()),
1402 data: None,
1403 }),
1404 },
1405 },
1406 "mobkit/routing/routes/add" => {
1407 let add_result = match parse_routing_route_add_params(&request.params) {
1408 Ok(route) => runtime
1409 .add_runtime_route(route)
1410 .await
1411 .map_err(RoutingDeliveryParamsError::RouteMutation),
1412 Err(e) => Err(e),
1413 };
1414 match add_result {
1415 Ok(route) => JsonRpcResponse {
1416 jsonrpc: JSONRPC_VERSION.to_string(),
1417 id: response_id,
1418 result: Some(serde_json::json!({ "route": route })),
1419 error: None,
1420 },
1421 Err(err) => JsonRpcResponse {
1422 jsonrpc: JSONRPC_VERSION.to_string(),
1423 id: response_id,
1424 result: None,
1425 error: Some(JsonRpcError {
1426 code: -32602,
1427 message: format!("Invalid params: {}", err.message()),
1428 data: None,
1429 }),
1430 },
1431 }
1432 }
1433 "mobkit/routing/routes/delete" => {
1434 let delete_result = match parse_routing_route_delete_params(&request.params) {
1435 Ok(route_key) => runtime
1436 .delete_runtime_route(&route_key)
1437 .await
1438 .map_err(RoutingDeliveryParamsError::RouteMutation),
1439 Err(e) => Err(e),
1440 };
1441 match delete_result {
1442 Ok(route) => JsonRpcResponse {
1443 jsonrpc: JSONRPC_VERSION.to_string(),
1444 id: response_id,
1445 result: Some(serde_json::json!({ "deleted": route })),
1446 error: None,
1447 },
1448 Err(err) => JsonRpcResponse {
1449 jsonrpc: JSONRPC_VERSION.to_string(),
1450 id: response_id,
1451 result: None,
1452 error: Some(JsonRpcError {
1453 code: -32602,
1454 message: format!("Invalid params: {}", err.message()),
1455 data: None,
1456 }),
1457 },
1458 }
1459 }
1460 "mobkit/delivery/send" => {
1461 let send_result = match parse_delivery_send_params(&request.params) {
1462 Ok(send_request) => runtime
1463 .send_delivery(send_request)
1464 .await
1465 .map_err(RoutingDeliveryParamsError::Delivery),
1466 Err(e) => Err(e),
1467 };
1468 match send_result {
1469 Ok(record) => JsonRpcResponse {
1470 jsonrpc: JSONRPC_VERSION.to_string(),
1471 id: response_id,
1472 result: Some(serde_json::to_value(record).unwrap_or(Value::Null)),
1473 error: None,
1474 },
1475 Err(err) => JsonRpcResponse {
1476 jsonrpc: JSONRPC_VERSION.to_string(),
1477 id: response_id,
1478 result: None,
1479 error: Some(JsonRpcError {
1480 code: -32602,
1481 message: format!("Invalid params: {}", err.message()),
1482 data: None,
1483 }),
1484 },
1485 }
1486 }
1487 "mobkit/delivery/history" => match parse_delivery_history_params(&request.params) {
1488 Ok(history_request) => {
1489 let history = runtime.delivery_history(history_request).await;
1490 JsonRpcResponse {
1491 jsonrpc: JSONRPC_VERSION.to_string(),
1492 id: response_id,
1493 result: Some(serde_json::to_value(history).unwrap_or(Value::Null)),
1494 error: None,
1495 }
1496 }
1497 Err(err) => JsonRpcResponse {
1498 jsonrpc: JSONRPC_VERSION.to_string(),
1499 id: response_id,
1500 result: None,
1501 error: Some(JsonRpcError {
1502 code: -32602,
1503 message: format!("Invalid params: {}", err.message()),
1504 data: None,
1505 }),
1506 },
1507 },
1508 "mobkit/events/subscribe" => match parse_subscribe_request(&request.params) {
1509 Ok(subscribe_request) => match runtime.subscribe_events(subscribe_request).await {
1510 Ok(subscribe_result) => JsonRpcResponse {
1511 jsonrpc: JSONRPC_VERSION.to_string(),
1512 id: response_id,
1513 result: Some(serde_json::to_value(subscribe_result).unwrap_or(Value::Null)),
1514 error: None,
1515 },
1516 Err(err) => JsonRpcResponse {
1517 jsonrpc: JSONRPC_VERSION.to_string(),
1518 id: response_id,
1519 result: None,
1520 error: Some(JsonRpcError {
1521 code: -32602,
1522 message: format!("Invalid params: {err}"),
1523 data: None,
1524 }),
1525 },
1526 },
1527 Err(err) => JsonRpcResponse {
1528 jsonrpc: JSONRPC_VERSION.to_string(),
1529 id: response_id,
1530 result: None,
1531 error: Some(JsonRpcError {
1532 code: -32602,
1533 message: format!("Invalid params: {}", err.message()),
1534 data: None,
1535 }),
1536 },
1537 },
1538 "mobkit/query_events" => {
1539 let query: EventQuery = if request.params.is_null() {
1540 EventQuery::default()
1541 } else {
1542 match serde_json::from_value(request.params.clone()) {
1543 Ok(query) => query,
1544 Err(err) => {
1545 return serde_json::to_string(&JsonRpcResponse {
1546 jsonrpc: JSONRPC_VERSION.to_string(),
1547 id: response_id,
1548 result: None,
1549 error: Some(JsonRpcError {
1550 code: -32602,
1551 message: format!("Invalid params: invalid query params: {err}"),
1552 data: None,
1553 }),
1554 })
1555 .unwrap_or_else(|_| r#"{"error":"serialization failed"}"#.to_string());
1556 }
1557 }
1558 };
1559 match runtime.event_log_store() {
1560 Some(store) => match store.query(query).await {
1561 Ok(events) => JsonRpcResponse {
1562 jsonrpc: JSONRPC_VERSION.to_string(),
1563 id: response_id,
1564 result: Some(serde_json::to_value(events).unwrap_or(Value::Null)),
1565 error: None,
1566 },
1567 Err(err) => JsonRpcResponse {
1568 jsonrpc: JSONRPC_VERSION.to_string(),
1569 id: response_id,
1570 result: None,
1571 error: Some(JsonRpcError {
1572 code: -32603,
1573 message: format!("query_events failed: {err}"),
1574 data: None,
1575 }),
1576 },
1577 },
1578 None => JsonRpcResponse {
1579 jsonrpc: JSONRPC_VERSION.to_string(),
1580 id: response_id,
1581 result: Some(serde_json::json!({
1582 "status": "no_event_log_configured",
1583 "events": [],
1584 })),
1585 error: None,
1586 },
1587 }
1588 }
1589 "mobkit/memory/stores" => match parse_memory_stores_params(&request.params) {
1590 Ok(()) => {
1591 let stores = runtime.memory_stores().await;
1592 JsonRpcResponse {
1593 jsonrpc: JSONRPC_VERSION.to_string(),
1594 id: response_id,
1595 result: Some(serde_json::json!({
1596 "stores": stores,
1597 })),
1598 error: None,
1599 }
1600 }
1601 Err(err) => JsonRpcResponse {
1602 jsonrpc: JSONRPC_VERSION.to_string(),
1603 id: response_id,
1604 result: None,
1605 error: Some(JsonRpcError {
1606 code: -32602,
1607 message: format!("Invalid params: {}", err.message()),
1608 data: None,
1609 }),
1610 },
1611 },
1612 "mobkit/memory/index" => match parse_memory_index_params(&request.params) {
1613 Ok(index_request) => match runtime.memory_index(index_request).await {
1614 Ok(indexed) => JsonRpcResponse {
1615 jsonrpc: JSONRPC_VERSION.to_string(),
1616 id: response_id,
1617 result: Some(serde_json::to_value(indexed).unwrap_or(Value::Null)),
1618 error: None,
1619 },
1620 Err(MemoryIndexError::BackendPersistFailed(error)) => JsonRpcResponse {
1621 jsonrpc: JSONRPC_VERSION.to_string(),
1622 id: response_id,
1623 result: None,
1624 error: Some(JsonRpcError {
1625 code: MEMORY_BACKEND_UNAVAILABLE_CODE,
1626 message: format!(
1627 "Memory backend unavailable: {}",
1628 MemoryParamsError::backend_message(&error)
1629 ),
1630 data: None,
1631 }),
1632 },
1633 Err(err) => JsonRpcResponse {
1634 jsonrpc: JSONRPC_VERSION.to_string(),
1635 id: response_id,
1636 result: None,
1637 error: Some(JsonRpcError {
1638 code: -32602,
1639 message: format!(
1640 "Invalid params: {}",
1641 MemoryParamsError::Index(err).message()
1642 ),
1643 data: None,
1644 }),
1645 },
1646 },
1647 Err(err) => JsonRpcResponse {
1648 jsonrpc: JSONRPC_VERSION.to_string(),
1649 id: response_id,
1650 result: None,
1651 error: Some(JsonRpcError {
1652 code: -32602,
1653 message: format!("Invalid params: {}", err.message()),
1654 data: None,
1655 }),
1656 },
1657 },
1658 "mobkit/memory/query" => match parse_memory_query_params(&request.params) {
1659 Ok(query_request) => {
1660 let query_result = runtime.memory_query(query_request).await;
1661 JsonRpcResponse {
1662 jsonrpc: JSONRPC_VERSION.to_string(),
1663 id: response_id,
1664 result: Some(serde_json::to_value(query_result).unwrap_or(Value::Null)),
1665 error: None,
1666 }
1667 }
1668 Err(err) => JsonRpcResponse {
1669 jsonrpc: JSONRPC_VERSION.to_string(),
1670 id: response_id,
1671 result: None,
1672 error: Some(JsonRpcError {
1673 code: -32602,
1674 message: format!("Invalid params: {}", err.message()),
1675 data: None,
1676 }),
1677 },
1678 },
1679 "mobkit/session_store/bigquery" => {
1680 match parse_bigquery_session_store_params(&request.params)
1681 .and_then(run_bigquery_session_store_request)
1682 {
1683 Ok(result) => JsonRpcResponse {
1684 jsonrpc: JSONRPC_VERSION.to_string(),
1685 id: response_id,
1686 result: Some(result),
1687 error: None,
1688 },
1689 Err(BigQuerySessionStoreRpcError::Params(message)) => JsonRpcResponse {
1690 jsonrpc: JSONRPC_VERSION.to_string(),
1691 id: response_id,
1692 result: None,
1693 error: Some(JsonRpcError {
1694 code: -32602,
1695 message: format!("Invalid params: {message}"),
1696 data: None,
1697 }),
1698 },
1699 Err(BigQuerySessionStoreRpcError::Store(error)) => JsonRpcResponse {
1700 jsonrpc: JSONRPC_VERSION.to_string(),
1701 id: response_id,
1702 result: None,
1703 error: Some(JsonRpcError {
1704 code: -32011,
1705 message: format!(
1706 "BigQuery session store request failed: {}",
1707 format_bigquery_store_error(&error)
1708 ),
1709 data: None,
1710 }),
1711 },
1712 }
1713 }
1714 "mobkit/gating/evaluate" => match parse_gating_evaluate_params(&request.params) {
1715 Ok(gating_request) => {
1716 let gating_result = runtime.evaluate_gating_action(gating_request).await;
1717 JsonRpcResponse {
1718 jsonrpc: JSONRPC_VERSION.to_string(),
1719 id: response_id,
1720 result: Some(serde_json::to_value(gating_result).unwrap_or(Value::Null)),
1721 error: None,
1722 }
1723 }
1724 Err(err) => JsonRpcResponse {
1725 jsonrpc: JSONRPC_VERSION.to_string(),
1726 id: response_id,
1727 result: None,
1728 error: Some(JsonRpcError {
1729 code: -32602,
1730 message: format!("Invalid params: {}", err.message()),
1731 data: None,
1732 }),
1733 },
1734 },
1735 "mobkit/gating/pending" => match parse_gating_pending_params(&request.params) {
1736 Ok(()) => {
1737 let pending = runtime.list_gating_pending().await;
1738 JsonRpcResponse {
1739 jsonrpc: JSONRPC_VERSION.to_string(),
1740 id: response_id,
1741 result: Some(serde_json::json!({
1742 "pending": pending,
1743 })),
1744 error: None,
1745 }
1746 }
1747 Err(err) => JsonRpcResponse {
1748 jsonrpc: JSONRPC_VERSION.to_string(),
1749 id: response_id,
1750 result: None,
1751 error: Some(JsonRpcError {
1752 code: -32602,
1753 message: format!("Invalid params: {}", err.message()),
1754 data: None,
1755 }),
1756 },
1757 },
1758 "mobkit/gating/decide" => {
1759 let decide_result = match parse_gating_decide_params(&request.params) {
1760 Ok(decide_request) => runtime
1761 .decide_gating_action(decide_request)
1762 .await
1763 .map_err(GatingParamsError::Decision),
1764 Err(e) => Err(e),
1765 };
1766 match decide_result {
1767 Ok(result) => JsonRpcResponse {
1768 jsonrpc: JSONRPC_VERSION.to_string(),
1769 id: response_id,
1770 result: Some(serde_json::to_value(result).unwrap_or(Value::Null)),
1771 error: None,
1772 },
1773 Err(err) => JsonRpcResponse {
1774 jsonrpc: JSONRPC_VERSION.to_string(),
1775 id: response_id,
1776 result: None,
1777 error: Some(JsonRpcError {
1778 code: -32602,
1779 message: format!("Invalid params: {}", err.message()),
1780 data: None,
1781 }),
1782 },
1783 }
1784 }
1785 "mobkit/gating/audit" => match parse_gating_audit_params(&request.params) {
1786 Ok(limit) => {
1787 let entries = runtime.gating_audit_entries(limit).await;
1788 JsonRpcResponse {
1789 jsonrpc: JSONRPC_VERSION.to_string(),
1790 id: response_id,
1791 result: Some(serde_json::json!({
1792 "entries": entries,
1793 })),
1794 error: None,
1795 }
1796 }
1797 Err(err) => JsonRpcResponse {
1798 jsonrpc: JSONRPC_VERSION.to_string(),
1799 id: response_id,
1800 result: None,
1801 error: Some(JsonRpcError {
1802 code: -32602,
1803 message: format!("Invalid params: {}", err.message()),
1804 data: None,
1805 }),
1806 },
1807 },
1808 "mobkit/call_tool" => {
1809 let module_id = request.params.get("module_id").and_then(Value::as_str);
1810 let tool = request.params.get("tool").and_then(Value::as_str);
1811 let arguments = request
1812 .params
1813 .get("arguments")
1814 .cloned()
1815 .unwrap_or(serde_json::json!({}));
1816
1817 match (module_id, tool) {
1818 (Some(module_id), Some(tool)) if !module_id.is_empty() && !tool.is_empty() => {
1819 let route = runtime
1820 .route_module_call(
1821 &ModuleRouteRequest {
1822 module_id: module_id.to_string(),
1823 method: tool.to_string(),
1824 params: arguments,
1825 },
1826 timeout,
1827 )
1828 .await;
1829 match route {
1830 Ok(response) => JsonRpcResponse {
1831 jsonrpc: JSONRPC_VERSION.to_string(),
1832 id: response_id,
1833 result: Some(serde_json::json!({
1834 "module_id": response.module_id,
1835 "tool": response.method,
1836 "result": response.payload
1837 })),
1838 error: None,
1839 },
1840 Err(ModuleRouteError::UnloadedModule(mid)) => JsonRpcResponse {
1841 jsonrpc: JSONRPC_VERSION.to_string(),
1842 id: response_id,
1843 result: None,
1844 error: Some(JsonRpcError {
1845 code: -32601,
1846 message: format!("Module '{mid}' not loaded"),
1847 data: None,
1848 }),
1849 },
1850 Err(err) => JsonRpcResponse {
1851 jsonrpc: JSONRPC_VERSION.to_string(),
1852 id: response_id,
1853 result: None,
1854 error: Some(JsonRpcError {
1855 code: -32000,
1856 message: format!("Tool call failed: {err:?}"),
1857 data: None,
1858 }),
1859 },
1860 }
1861 }
1862 _ => JsonRpcResponse {
1863 jsonrpc: JSONRPC_VERSION.to_string(),
1864 id: response_id,
1865 result: None,
1866 error: Some(JsonRpcError {
1867 code: -32602,
1868 message: "Invalid params: module_id and tool required".to_string(),
1869 data: None,
1870 }),
1871 },
1872 }
1873 }
1874 "mobkit/models/catalog" => JsonRpcResponse {
1875 jsonrpc: JSONRPC_VERSION.to_string(),
1876 id: response_id,
1877 result: Some(build_models_catalog_result()),
1878 error: None,
1879 },
1880 "mobkit/blob/get" => {
1881 mob_methods::handle_blob_get(runtime, response_id, &request.params).await
1882 }
1883 "mobkit/send_message" => {
1884 mob_methods::handle_send_message(runtime, response_id, &request.params).await
1885 }
1886 "mobkit/find_members" => {
1887 mob_methods::handle_find_members(runtime, response_id, &request.params).await
1888 }
1889 "mobkit/ensure_member" => {
1890 mob_methods::handle_ensure_member(runtime, response_id, &request.params).await
1891 }
1892 "mobkit/list_members" => mob_methods::handle_list_members(runtime, response_id).await,
1893 "mobkit/get_member" => {
1894 mob_methods::handle_get_member(runtime, response_id, &request.params).await
1895 }
1896 "mobkit/retire_member" => {
1897 mob_methods::handle_retire_member(runtime, response_id, &request.params).await
1898 }
1899 "mobkit/respawn_member" => {
1900 mob_methods::handle_respawn_member(runtime, response_id, &request.params).await
1901 }
1902 "mobkit/reconcile_edges" => mob_methods::handle_reconcile_edges(runtime, response_id).await,
1903 "mobkit/rediscover" => mob_methods::handle_rediscover(runtime, response_id).await,
1904 "mobkit/mob_events/query" => {
1905 mob_methods::handle_mob_events_query(runtime, response_id, request.params).await
1906 }
1907 "mobkit/mob_events/subscribe" => {
1908 mob_methods::handle_mob_events_subscribe(runtime, response_id, request.params).await
1909 }
1910 "mobkit/cross_mob/wire" => {
1911 mob_methods::handle_cross_mob_wire(runtime, response_id, &request.params).await
1912 }
1913 "mobkit/cross_mob/unwire" => {
1914 mob_methods::handle_cross_mob_unwire(runtime, response_id, &request.params).await
1915 }
1916 "mobkit/cross_mob/send" => {
1917 mob_methods::handle_cross_mob_send(runtime, response_id, &request.params).await
1918 }
1919 "mobkit/cross_mob/directory" => {
1920 mob_methods::handle_cross_mob_directory(runtime, response_id).await
1921 }
1922 "mobkit/cross_mob/peer_info" => {
1923 mob_methods::handle_cross_mob_peer_info(runtime, response_id, &request.params).await
1924 }
1925 "mobkit/cross_mob/wire_local" => {
1926 mob_methods::handle_cross_mob_wire_local(runtime, response_id, &request.params).await
1927 }
1928 "mobkit/cross_mob/unwire_local" => {
1929 mob_methods::handle_cross_mob_unwire_local(runtime, response_id, &request.params).await
1930 }
1931 "mobkit/peer_pubkey" => mob_methods::handle_peer_pubkey(runtime, response_id).await,
1932 "mobkit/member_status" => {
1933 mob_methods::handle_member_status(runtime, response_id, &request.params).await
1934 }
1935 "mobkit/force_cancel_member" => {
1936 mob_methods::handle_force_cancel_member(runtime, response_id, &request.params).await
1937 }
1938 "mobkit/spawn_helper" => {
1939 mob_methods::handle_spawn_helper(runtime, response_id, &request.params).await
1940 }
1941 "mobkit/fork_helper" => {
1942 mob_methods::handle_fork_helper(runtime, response_id, &request.params).await
1943 }
1944 "mobkit/attach_existing_session" => {
1945 mob_methods::handle_attach_existing_session(runtime, response_id, &request.params).await
1946 }
1947 "mobkit/cancel_flow" => {
1948 mob_methods::handle_cancel_flow(runtime, response_id, &request.params).await
1949 }
1950 "mobkit/flow_status" => {
1951 mob_methods::handle_flow_status(runtime, response_id, &request.params).await
1952 }
1953 "mobkit/list_flows" => mob_methods::handle_list_flows(runtime, response_id).await,
1954 "mobkit/list_runs" => {
1955 mob_methods::handle_list_runs(runtime, response_id, &request.params).await
1956 }
1957 "mobkit/run_flow" => {
1958 mob_methods::handle_run_flow(runtime, response_id, &request.params).await
1959 }
1960 "mobkit/collect_completed" => {
1961 mob_methods::handle_collect_completed(runtime, response_id).await
1962 }
1963 "mobkit/wait_ready" => {
1964 mob_methods::handle_wait_ready(runtime, response_id, &request.params).await
1965 }
1966 "mobkit/mob_labels/set" => {
1967 mob_methods::handle_mob_labels_set(runtime, response_id, &request.params).await
1968 }
1969 "mobkit/mob_labels/get" => mob_methods::handle_mob_labels_get(runtime, response_id).await,
1970 "mobkit/mob_labels/delete" => {
1971 mob_methods::handle_mob_labels_delete(runtime, response_id).await
1972 }
1973 "mobkit/run_labels/set" => {
1974 mob_methods::handle_run_labels_set(runtime, response_id, &request.params).await
1975 }
1976 "mobkit/run_labels/get" => {
1977 mob_methods::handle_run_labels_get(runtime, response_id, &request.params).await
1978 }
1979 "mobkit/run_labels/delete" => {
1980 mob_methods::handle_run_labels_delete(runtime, response_id, &request.params).await
1981 }
1982 "mobkit/send" => {
1984 let identity_rt = match identity_ctx {
1985 Some(ctx) => &*ctx.runtime,
1986 None => return maybe_identity_not_configured(is_notification, response_id),
1987 };
1988 let identity_str = request
1989 .params
1990 .get("identity")
1991 .and_then(|v| v.as_str())
1992 .unwrap_or("");
1993 let target =
1994 match resolve_rpc_identity_control_target(runtime, identity_rt, identity_str).await
1995 {
1996 Ok(target) => target,
1997 Err(e) => {
1998 return maybe_error_response(
1999 is_notification,
2000 response_id,
2001 -32602,
2002 format!("invalid identity: {e}"),
2003 );
2004 }
2005 };
2006 let identity = target.identity.clone();
2007 if let Some(response) =
2008 rpc_stale_live_alias_error_response(identity_rt, &target, response_id.clone()).await
2009 {
2010 return if is_notification {
2011 String::new()
2012 } else {
2013 serialize_response(&response)
2014 };
2015 }
2016 let content_val = request
2017 .params
2018 .get("content")
2019 .cloned()
2020 .unwrap_or(Value::Null);
2021 let content = match serde_json::from_value::<meerkat_core::ContentInput>(content_val) {
2022 Ok(content) => content,
2023 Err(err) => {
2024 return maybe_error_response(
2025 is_notification,
2026 response_id,
2027 -32602,
2028 format!("invalid content: {err}"),
2029 );
2030 }
2031 };
2032 match identity_rt.send(&identity, &content).await {
2033 Ok(token) => JsonRpcResponse {
2034 jsonrpc: JSONRPC_VERSION.to_string(),
2035 id: response_id,
2036 result: Some(serde_json::json!({ "fencing_token": token.get() })),
2037 error: None,
2038 },
2039 Err(e) => identity_error_response(response_id, &e),
2040 }
2041 }
2042 "mobkit/interact" => {
2043 let identity_rt = match identity_ctx {
2044 Some(ctx) => &*ctx.runtime,
2045 None => return maybe_identity_not_configured(is_notification, response_id),
2046 };
2047 let identity_str = request
2048 .params
2049 .get("identity")
2050 .and_then(|v| v.as_str())
2051 .unwrap_or("");
2052 let target =
2053 match resolve_rpc_identity_control_target(runtime, identity_rt, identity_str).await
2054 {
2055 Ok(target) => target,
2056 Err(e) => {
2057 return maybe_error_response(
2058 is_notification,
2059 response_id,
2060 -32602,
2061 format!("invalid identity: {e}"),
2062 );
2063 }
2064 };
2065 let identity = target.identity.clone();
2066 if let Some(response) =
2067 rpc_stale_live_alias_error_response(identity_rt, &target, response_id.clone()).await
2068 {
2069 return if is_notification {
2070 String::new()
2071 } else {
2072 serialize_response(&response)
2073 };
2074 }
2075 let content_val = request
2076 .params
2077 .get("content")
2078 .cloned()
2079 .unwrap_or(Value::Null);
2080 let content =
2081 match serde_json::from_value::<meerkat_core::ContentInput>(content_val.clone()) {
2082 Ok(content) => content,
2083 Err(err) => {
2084 return maybe_error_response(
2085 is_notification,
2086 response_id,
2087 -32602,
2088 format!("invalid content: {err}"),
2089 );
2090 }
2091 };
2092 let origin = request
2093 .params
2094 .get("origin")
2095 .and_then(|v| v.as_str())
2096 .unwrap_or("console");
2097 let interaction_id = request
2098 .params
2099 .get("interaction_id")
2100 .and_then(|v| v.as_str())
2101 .map(ToString::to_string)
2102 .unwrap_or_else(|| meerkat_core::types::SessionId::new().to_string());
2103 let runtime_member_id = identity_rt
2104 .status(&identity)
2105 .await
2106 .ok()
2107 .and_then(|status| status.agent_runtime_id.map(|id| id.as_str().to_string()));
2108
2109 if let Err(err) = runtime
2110 .reserve_identity_interaction(
2111 identity.as_str(),
2112 runtime_member_id.as_deref(),
2113 &interaction_id,
2114 origin,
2115 content_val,
2116 )
2117 .await
2118 {
2119 return maybe_error_response(
2120 is_notification,
2121 response_id,
2122 -32003,
2123 format!("failed to reserve interaction: {err}"),
2124 );
2125 }
2126
2127 match identity_rt.send(&identity, &content).await {
2128 Ok(token) => JsonRpcResponse {
2129 jsonrpc: JSONRPC_VERSION.to_string(),
2130 id: response_id,
2131 result: Some(serde_json::json!({
2132 "interaction_id": interaction_id,
2133 "fencing_token": token.get(),
2134 "stream": {
2135 "route": format!("/console/identity/{}/stream", identity.as_str()),
2136 "identity": identity.as_str(),
2137 }
2138 })),
2139 error: None,
2140 },
2141 Err(e) => {
2142 runtime
2143 .record_console_lifecycle(
2144 identity.as_str(),
2145 "interaction_failed",
2146 serde_json::json!({
2147 "interaction_id": interaction_id,
2148 "origin": origin,
2149 "error": e.to_string(),
2150 }),
2151 )
2152 .await;
2153 identity_error_response(response_id, &e)
2154 }
2155 }
2156 }
2157 "mobkit/dispatch" => {
2158 let identity_rt = match identity_ctx {
2159 Some(ctx) => &*ctx.runtime,
2160 None => return maybe_identity_not_configured(is_notification, response_id),
2161 };
2162 let identity_str = request
2163 .params
2164 .get("identity")
2165 .and_then(|v| v.as_str())
2166 .unwrap_or("");
2167 let target =
2168 match resolve_rpc_identity_control_target(runtime, identity_rt, identity_str).await
2169 {
2170 Ok(target) => target,
2171 Err(e) => {
2172 return maybe_error_response(
2173 is_notification,
2174 response_id,
2175 -32602,
2176 format!("invalid identity: {e}"),
2177 );
2178 }
2179 };
2180 let identity = target.identity.clone();
2181 if let Some(response) =
2182 rpc_stale_live_alias_error_response(identity_rt, &target, response_id.clone()).await
2183 {
2184 return if is_notification {
2185 String::new()
2186 } else {
2187 serialize_response(&response)
2188 };
2189 }
2190 let di_val = request
2191 .params
2192 .get("dispatch_input")
2193 .cloned()
2194 .unwrap_or(Value::Null);
2195 let content_val = di_val
2196 .get("content")
2197 .cloned()
2198 .unwrap_or_else(|| Value::String(String::new()));
2199 let content = match serde_json::from_value::<meerkat_core::ContentInput>(content_val) {
2200 Ok(content) => content,
2201 Err(err) => {
2202 return maybe_error_response(
2203 is_notification,
2204 response_id,
2205 -32602,
2206 format!("invalid dispatch_input.content: {err}"),
2207 );
2208 }
2209 };
2210 let origin_str = di_val
2211 .get("origin")
2212 .and_then(|v| v.as_str())
2213 .unwrap_or("system");
2214 let origin = match origin_str {
2215 "connector" => crate::identity_first::DispatchOrigin::Connector,
2216 "scheduler" => crate::identity_first::DispatchOrigin::Scheduler,
2217 "policy" => crate::identity_first::DispatchOrigin::Policy,
2218 "flow" => crate::identity_first::DispatchOrigin::Flow,
2219 _ => crate::identity_first::DispatchOrigin::System,
2220 };
2221 let correlation_id = di_val
2222 .get("correlation_id")
2223 .and_then(|v| v.as_str())
2224 .map(crate::identity_first::CorrelationId::new);
2225 let idempotency_key = di_val
2226 .get("idempotency_key")
2227 .and_then(|v| v.as_str())
2228 .map(crate::identity_first::DispatchIdempotencyKey::new);
2229 let dispatch_input = crate::identity_first::DispatchInput {
2230 content,
2231 origin,
2232 correlation_id,
2233 idempotency_key,
2234 };
2235 match identity_rt.dispatch(&identity, &dispatch_input).await {
2236 Ok((token, durable)) => JsonRpcResponse {
2237 jsonrpc: JSONRPC_VERSION.to_string(),
2238 id: response_id,
2239 result: Some(
2240 serde_json::json!({ "fencing_token": token.get(), "durable": durable }),
2241 ),
2242 error: None,
2243 },
2244 Err(e) => identity_error_response(response_id, &e),
2245 }
2246 }
2247 "mobkit/subscribe" => {
2248 let identity_rt = match identity_ctx {
2249 Some(ctx) => &*ctx.runtime,
2250 None => return maybe_identity_not_configured(is_notification, response_id),
2251 };
2252 let identity_str = request
2253 .params
2254 .get("identity")
2255 .and_then(|v| v.as_str())
2256 .unwrap_or("");
2257 let target =
2258 match resolve_rpc_identity_control_target(runtime, identity_rt, identity_str).await
2259 {
2260 Ok(target) => target,
2261 Err(e) => {
2262 return maybe_error_response(
2263 is_notification,
2264 response_id,
2265 -32602,
2266 format!("invalid identity: {e}"),
2267 );
2268 }
2269 };
2270 let identity = target.identity.clone();
2271 if let Some(response) =
2272 rpc_stale_live_alias_error_response(identity_rt, &target, response_id.clone()).await
2273 {
2274 return if is_notification {
2275 String::new()
2276 } else {
2277 serialize_response(&response)
2278 };
2279 }
2280 match identity_rt.subscribe(&identity).await {
2281 Ok(_receiver) => JsonRpcResponse {
2282 jsonrpc: JSONRPC_VERSION.to_string(),
2283 id: response_id,
2284 result: Some(serde_json::json!({
2285 "identity": identity.as_str(),
2286 "stream_id": identity.as_str(),
2287 "subscribed": true,
2288 })),
2289 error: None,
2290 },
2291 Err(e) => identity_error_response(response_id, &e),
2292 }
2293 }
2294 "mobkit/status_identity" => {
2295 let identity_rt = match identity_ctx {
2296 Some(ctx) => &*ctx.runtime,
2297 None => return maybe_identity_not_configured(is_notification, response_id),
2298 };
2299 let identity_str = request
2300 .params
2301 .get("identity")
2302 .and_then(|v| v.as_str())
2303 .unwrap_or("");
2304 let target =
2305 match resolve_rpc_identity_control_target(runtime, identity_rt, identity_str).await
2306 {
2307 Ok(target) => target,
2308 Err(e) => {
2309 return maybe_error_response(
2310 is_notification,
2311 response_id,
2312 -32602,
2313 format!("invalid identity: {e}"),
2314 );
2315 }
2316 };
2317 let identity = target.identity.clone();
2318 if let Some(response) =
2319 rpc_stale_live_alias_error_response(identity_rt, &target, response_id.clone()).await
2320 {
2321 return if is_notification {
2322 String::new()
2323 } else {
2324 serialize_response(&response)
2325 };
2326 }
2327 match identity_rt.status(&identity).await {
2328 Ok(status) => {
2329 let continuity_health =
2330 serde_json::to_value(&status.continuity_health).unwrap_or(Value::Null);
2331 let result = serde_json::json!({
2332 "state": format!("{:?}", status.state),
2333 "identity": status.identity.as_str(),
2334 "agent_runtime_id": status.agent_runtime_id.as_ref().map(super::identity_first::AgentRuntimeId::as_str),
2335 "session_id": status.session_id.as_ref().map(ToString::to_string),
2336 "profile": status.profile.as_ref().map(meerkat_mob::ProfileName::as_str),
2337 "addressability": addressability_json(status.addressability),
2338 "display_name": status.display_name.as_ref().map(super::identity_first::DisplayName::as_str),
2339 "labels": status.labels,
2340 "generation": status.generation.map(super::identity_first::ContinuityGeneration::get),
2341 "checkpoint_version": status.checkpoint_version.map(super::identity_first::CheckpointVersion::get),
2342 "continuity_health": continuity_health,
2343 "lease_healthy": status.lease.as_ref().map(|lease| lease.healthy),
2344 "lease": status.lease.as_ref().map(|lease| serde_json::json!({
2345 "fencing_token": lease.fencing_token.get(),
2346 "ttl_remaining_ms": lease.ttl_remaining.as_millis() as u64,
2347 "healthy": lease.healthy,
2348 })),
2349 });
2350 JsonRpcResponse {
2351 jsonrpc: JSONRPC_VERSION.to_string(),
2352 id: response_id,
2353 result: Some(result),
2354 error: None,
2355 }
2356 }
2357 Err(e @ crate::identity_first::IdentityRuntimeError::UnknownIdentity(_)) => {
2358 if let Some(live) = target.live.as_ref() {
2359 JsonRpcResponse {
2360 jsonrpc: JSONRPC_VERSION.to_string(),
2361 id: response_id,
2362 result: Some(rpc_live_identity_status_json(live)),
2363 error: None,
2364 }
2365 } else {
2366 identity_error_response(response_id, &e)
2367 }
2368 }
2369 Err(e) => identity_error_response(response_id, &e),
2370 }
2371 }
2372 "mobkit/respawn" => {
2373 let identity_rt = match identity_ctx {
2374 Some(ctx) => &*ctx.runtime,
2375 None => return maybe_identity_not_configured(is_notification, response_id),
2376 };
2377 let identity_str = request
2378 .params
2379 .get("identity")
2380 .and_then(|v| v.as_str())
2381 .unwrap_or("");
2382 let target =
2383 match resolve_rpc_identity_control_target(runtime, identity_rt, identity_str).await
2384 {
2385 Ok(target) => target,
2386 Err(e) => {
2387 return maybe_error_response(
2388 is_notification,
2389 response_id,
2390 -32602,
2391 format!("invalid identity: {e}"),
2392 );
2393 }
2394 };
2395 let identity = target.identity.clone();
2396 if let Some(response) =
2397 rpc_stale_live_alias_error_response(identity_rt, &target, response_id.clone()).await
2398 {
2399 return if is_notification {
2400 String::new()
2401 } else {
2402 serialize_response(&response)
2403 };
2404 }
2405 let registered_status = match identity_rt.status(&identity).await {
2406 Ok(status) => Some(status),
2407 Err(crate::identity_first::IdentityRuntimeError::UnknownIdentity(_)) => None,
2408 Err(e) => {
2409 let response = identity_error_response(response_id, &e);
2410 return if is_notification {
2411 String::new()
2412 } else {
2413 serialize_response(&response)
2414 };
2415 }
2416 };
2417 match identity_rt.respawn(&identity).await {
2418 Ok(mut record) => {
2419 let live_respawn_warning = match respawn_rpc_runtime_member_id(
2420 runtime,
2421 record.agent_runtime_id.as_str(),
2422 )
2423 .await
2424 {
2425 Ok(live_result) => {
2426 let live_session_id =
2427 live_result.get("session_id").and_then(Value::as_str);
2428 if let Some(live_session_id) = live_session_id {
2429 match meerkat_core::types::SessionId::parse(live_session_id) {
2430 Ok(session_id) => {
2431 match identity_rt
2432 .rebind_session_after_live_respawn(
2433 &identity, session_id,
2434 )
2435 .await
2436 {
2437 Ok(updated_record) => {
2438 record = updated_record;
2439 None
2440 }
2441 Err(err) => Some(serde_json::json!({
2442 "kind": "identity_rebind_failed_after_member_respawn",
2443 "message": err.to_string(),
2444 "identity": identity.as_str(),
2445 "agent_runtime_id": record.agent_runtime_id.as_str(),
2446 "live_session_id": live_session_id,
2447 })),
2448 }
2449 }
2450 Err(err) => Some(serde_json::json!({
2451 "kind": "member_respawn_session_id_invalid",
2452 "message": err.to_string(),
2453 "identity": identity.as_str(),
2454 "agent_runtime_id": record.agent_runtime_id.as_str(),
2455 "live_session_id": live_session_id,
2456 })),
2457 }
2458 } else {
2459 None
2460 }
2461 }
2462 Err(err) => Some(serde_json::json!({
2463 "kind": "member_respawn_failed_after_identity_refresh",
2464 "message": err,
2465 "identity": identity.as_str(),
2466 "agent_runtime_id": record.agent_runtime_id.as_str(),
2467 })),
2468 };
2469 let cleanup_warning = if registered_status.is_some()
2470 && let Err(err) = retire_stale_rpc_members_for_identity(
2471 runtime,
2472 identity.as_str(),
2473 Some(record.agent_runtime_id.as_str()),
2474 )
2475 .await
2476 {
2477 Some(serde_json::json!({
2478 "kind": "stale_member_cleanup_failed_after_identity_respawn",
2479 "message": err,
2480 "identity": identity.as_str(),
2481 "agent_runtime_id": record.agent_runtime_id.as_str(),
2482 }))
2483 } else {
2484 None
2485 };
2486 runtime
2487 .record_console_lifecycle(
2488 identity.as_str(),
2489 "identity_respawned",
2490 serde_json::json!({
2491 "generation": record.generation.get(),
2492 "checkpoint_version": record.checkpoint_version.get(),
2493 "live_respawn_warning": live_respawn_warning.clone(),
2494 "cleanup_warning": cleanup_warning.clone(),
2495 }),
2496 )
2497 .await;
2498 JsonRpcResponse {
2499 jsonrpc: JSONRPC_VERSION.to_string(),
2500 id: response_id,
2501 result: Some(serde_json::json!({
2502 "identity": record.identity.as_str(),
2503 "agent_runtime_id": record.agent_runtime_id.as_str(),
2504 "session_id": record.session_id.to_string(),
2505 "generation": record.generation.get(),
2506 "checkpoint_version": record.checkpoint_version.get(),
2507 "live_respawn_warning": live_respawn_warning,
2508 "cleanup_warning": cleanup_warning,
2509 })),
2510 error: None,
2511 }
2512 }
2513 Err(e @ crate::identity_first::IdentityRuntimeError::UnknownIdentity(_)) => {
2514 if let Some(live) = target.live.as_ref() {
2515 match respawn_rpc_live_identity(runtime, live).await {
2516 Ok(result) => {
2517 runtime
2518 .record_console_lifecycle(
2519 live.identity.as_str(),
2520 "identity_respawned",
2521 serde_json::json!({}),
2522 )
2523 .await;
2524 JsonRpcResponse {
2525 jsonrpc: JSONRPC_VERSION.to_string(),
2526 id: response_id,
2527 result: Some(result),
2528 error: None,
2529 }
2530 }
2531 Err(err) => JsonRpcResponse {
2532 jsonrpc: JSONRPC_VERSION.to_string(),
2533 id: response_id,
2534 result: None,
2535 error: Some(JsonRpcError {
2536 code: -32000,
2537 message: format!("respawn failed: {err}"),
2538 data: None,
2539 }),
2540 },
2541 }
2542 } else {
2543 identity_error_response(response_id, &e)
2544 }
2545 }
2546 Err(e) => identity_error_response(response_id, &e),
2547 }
2548 }
2549 "mobkit/retire" => {
2550 let identity_rt = match identity_ctx {
2551 Some(ctx) => &*ctx.runtime,
2552 None => return maybe_identity_not_configured(is_notification, response_id),
2553 };
2554 let identity_str = request
2555 .params
2556 .get("identity")
2557 .and_then(|v| v.as_str())
2558 .unwrap_or("");
2559 let target =
2560 match resolve_rpc_identity_control_target(runtime, identity_rt, identity_str).await
2561 {
2562 Ok(target) => target,
2563 Err(e) => {
2564 return maybe_error_response(
2565 is_notification,
2566 response_id,
2567 -32602,
2568 format!("invalid identity: {e}"),
2569 );
2570 }
2571 };
2572 let identity = target.identity.clone();
2573 if let Some(response) =
2574 rpc_stale_live_alias_error_response(identity_rt, &target, response_id.clone()).await
2575 {
2576 return if is_notification {
2577 String::new()
2578 } else {
2579 serialize_response(&response)
2580 };
2581 }
2582 let registered_status = match identity_rt.status(&identity).await {
2583 Ok(status) => Some(status),
2584 Err(crate::identity_first::IdentityRuntimeError::UnknownIdentity(_)) => None,
2585 Err(e) => {
2586 let response = identity_error_response(response_id, &e);
2587 return if is_notification {
2588 String::new()
2589 } else {
2590 serialize_response(&response)
2591 };
2592 }
2593 };
2594 match identity_rt.retire(&identity).await {
2595 Ok(token) => {
2596 let keep_runtime_member_id = registered_status
2597 .as_ref()
2598 .and_then(|status| status.agent_runtime_id.as_ref())
2599 .filter(|_| identity_rt.has_session_bridge())
2600 .map(crate::identity_first::AgentRuntimeId::as_str);
2601 let cleanup_warning = if registered_status.is_some()
2602 && let Err(err) = retire_stale_rpc_members_for_identity(
2603 runtime,
2604 identity.as_str(),
2605 keep_runtime_member_id,
2606 )
2607 .await
2608 {
2609 Some(serde_json::json!({
2610 "kind": "stale_member_cleanup_failed_after_identity_retire",
2611 "message": err,
2612 "identity": identity.as_str(),
2613 }))
2614 } else {
2615 None
2616 };
2617 runtime
2618 .record_console_lifecycle(
2619 identity.as_str(),
2620 "identity_retired",
2621 serde_json::json!({
2622 "fencing_token": token.get(),
2623 "cleanup_warning": cleanup_warning.clone(),
2624 }),
2625 )
2626 .await;
2627 JsonRpcResponse {
2628 jsonrpc: JSONRPC_VERSION.to_string(),
2629 id: response_id,
2630 result: Some(serde_json::json!({
2631 "fencing_token": token.get(),
2632 "cleanup_warning": cleanup_warning,
2633 })),
2634 error: None,
2635 }
2636 }
2637 Err(e @ crate::identity_first::IdentityRuntimeError::UnknownIdentity(_)) => {
2638 if let Some(live) = target.live.as_ref() {
2639 match retire_rpc_live_identity(runtime, live).await {
2640 Ok(()) => {
2641 runtime
2642 .record_console_lifecycle(
2643 live.identity.as_str(),
2644 "identity_retired",
2645 serde_json::json!({}),
2646 )
2647 .await;
2648 JsonRpcResponse {
2649 jsonrpc: JSONRPC_VERSION.to_string(),
2650 id: response_id,
2651 result: Some(
2652 serde_json::json!({ "identity": live.identity.as_str() }),
2653 ),
2654 error: None,
2655 }
2656 }
2657 Err(err) => JsonRpcResponse {
2658 jsonrpc: JSONRPC_VERSION.to_string(),
2659 id: response_id,
2660 result: None,
2661 error: Some(JsonRpcError {
2662 code: -32000,
2663 message: format!("retire failed: {err}"),
2664 data: None,
2665 }),
2666 },
2667 }
2668 } else {
2669 identity_error_response(response_id, &e)
2670 }
2671 }
2672 Err(e) => identity_error_response(response_id, &e),
2673 }
2674 }
2675 "mobkit/reset" => {
2676 let identity_rt = match identity_ctx {
2677 Some(ctx) => &*ctx.runtime,
2678 None => return maybe_identity_not_configured(is_notification, response_id),
2679 };
2680 let identity_str = request
2681 .params
2682 .get("identity")
2683 .and_then(|v| v.as_str())
2684 .unwrap_or("");
2685 let target =
2686 match resolve_rpc_identity_control_target(runtime, identity_rt, identity_str).await
2687 {
2688 Ok(target) => target,
2689 Err(e) => {
2690 return maybe_error_response(
2691 is_notification,
2692 response_id,
2693 -32602,
2694 format!("invalid identity: {e}"),
2695 );
2696 }
2697 };
2698 let identity = target.identity.clone();
2699 if let Some(response) =
2700 rpc_stale_live_alias_error_response(identity_rt, &target, response_id.clone()).await
2701 {
2702 return if is_notification {
2703 String::new()
2704 } else {
2705 serialize_response(&response)
2706 };
2707 }
2708 let _registered_status = match identity_rt.status(&identity).await {
2709 Ok(status) => {
2710 if !identity_rt.has_session_bridge() {
2711 let response = rpc_reset_requires_session_bridge_response(response_id);
2712 return if is_notification {
2713 String::new()
2714 } else {
2715 serialize_response(&response)
2716 };
2717 }
2718 status
2719 }
2720 Err(e @ crate::identity_first::IdentityRuntimeError::UnknownIdentity(_)) => {
2721 if let Some(live) = target.live.as_ref() {
2722 let response = match respawn_rpc_live_identity(runtime, live).await {
2723 Ok(result) => {
2724 runtime
2725 .record_console_lifecycle(
2726 live.identity.as_str(),
2727 "identity_reset",
2728 serde_json::json!({}),
2729 )
2730 .await;
2731 JsonRpcResponse {
2732 jsonrpc: JSONRPC_VERSION.to_string(),
2733 id: response_id,
2734 result: Some(result),
2735 error: None,
2736 }
2737 }
2738 Err(err) => JsonRpcResponse {
2739 jsonrpc: JSONRPC_VERSION.to_string(),
2740 id: response_id,
2741 result: None,
2742 error: Some(JsonRpcError {
2743 code: -32000,
2744 message: format!("reset failed: {err}"),
2745 data: None,
2746 }),
2747 },
2748 };
2749 return if is_notification {
2750 String::new()
2751 } else {
2752 serialize_response(&response)
2753 };
2754 }
2755 let response = identity_error_response(response_id, &e);
2756 return if is_notification {
2757 String::new()
2758 } else {
2759 serialize_response(&response)
2760 };
2761 }
2762 Err(e) => {
2763 let response = identity_error_response(response_id, &e);
2764 return if is_notification {
2765 String::new()
2766 } else {
2767 serialize_response(&response)
2768 };
2769 }
2770 };
2771 match identity_rt.reset(&identity).await {
2772 Ok(record) => {
2773 let cleanup_warning = if let Err(err) = retire_stale_rpc_members_for_identity(
2774 runtime,
2775 identity.as_str(),
2776 Some(record.agent_runtime_id.as_str()),
2777 )
2778 .await
2779 {
2780 Some(serde_json::json!({
2781 "kind": "stale_member_cleanup_failed_after_identity_reset",
2782 "message": err,
2783 "identity": identity.as_str(),
2784 "agent_runtime_id": record.agent_runtime_id.as_str(),
2785 }))
2786 } else {
2787 None
2788 };
2789 runtime
2790 .record_console_lifecycle(
2791 identity.as_str(),
2792 "identity_reset",
2793 serde_json::json!({
2794 "generation": record.generation.get(),
2795 "checkpoint_version": record.checkpoint_version.get(),
2796 "cleanup_warning": cleanup_warning.clone(),
2797 }),
2798 )
2799 .await;
2800 JsonRpcResponse {
2801 jsonrpc: JSONRPC_VERSION.to_string(),
2802 id: response_id,
2803 result: Some(serde_json::json!({
2804 "identity": record.identity.as_str(),
2805 "agent_runtime_id": record.agent_runtime_id.as_str(),
2806 "session_id": record.session_id.to_string(),
2807 "generation": record.generation.get(),
2808 "checkpoint_version": record.checkpoint_version.get(),
2809 "cleanup_warning": cleanup_warning,
2810 })),
2811 error: None,
2812 }
2813 }
2814 Err(e @ crate::identity_first::IdentityRuntimeError::UnknownIdentity(_)) => {
2815 if let Some(live) = target.live.as_ref() {
2816 match respawn_rpc_live_identity(runtime, live).await {
2817 Ok(result) => {
2818 runtime
2819 .record_console_lifecycle(
2820 live.identity.as_str(),
2821 "identity_reset",
2822 serde_json::json!({}),
2823 )
2824 .await;
2825 JsonRpcResponse {
2826 jsonrpc: JSONRPC_VERSION.to_string(),
2827 id: response_id,
2828 result: Some(result),
2829 error: None,
2830 }
2831 }
2832 Err(err) => JsonRpcResponse {
2833 jsonrpc: JSONRPC_VERSION.to_string(),
2834 id: response_id,
2835 result: None,
2836 error: Some(JsonRpcError {
2837 code: -32000,
2838 message: format!("reset failed: {err}"),
2839 data: None,
2840 }),
2841 },
2842 }
2843 } else {
2844 identity_error_response(response_id, &e)
2845 }
2846 }
2847 Err(e) => identity_error_response(response_id, &e),
2848 }
2849 }
2850 "mobkit/delete_identity" => {
2851 let identity_rt = match identity_ctx {
2852 Some(ctx) => &*ctx.runtime,
2853 None => return maybe_identity_not_configured(is_notification, response_id),
2854 };
2855 let identity_str = request
2856 .params
2857 .get("identity")
2858 .and_then(|v| v.as_str())
2859 .unwrap_or("");
2860 let target =
2861 match resolve_rpc_identity_control_target(runtime, identity_rt, identity_str).await
2862 {
2863 Ok(target) => target,
2864 Err(e) => {
2865 return maybe_error_response(
2866 is_notification,
2867 response_id,
2868 -32602,
2869 format!("invalid identity: {e}"),
2870 );
2871 }
2872 };
2873 let identity = target.identity.clone();
2874 if let Some(response) =
2875 rpc_stale_live_alias_error_response(identity_rt, &target, response_id.clone()).await
2876 {
2877 return if is_notification {
2878 String::new()
2879 } else {
2880 serialize_response(&response)
2881 };
2882 }
2883 let registered_status = match identity_rt.status(&identity).await {
2884 Ok(status) => status,
2885 Err(e @ crate::identity_first::IdentityRuntimeError::UnknownIdentity(_)) => {
2886 if target.live.is_some() {
2887 let response = JsonRpcResponse {
2888 jsonrpc: JSONRPC_VERSION.to_string(),
2889 id: response_id,
2890 result: None,
2891 error: Some(JsonRpcError {
2892 code: -32602,
2893 message: format!(
2894 "delete_identity requires durable identity: {} is live-only",
2895 identity.as_str()
2896 ),
2897 data: Some(serde_json::json!({
2898 "kind": "live_only_identity_delete_unsupported",
2899 "identity": identity.as_str(),
2900 })),
2901 }),
2902 };
2903 return if is_notification {
2904 String::new()
2905 } else {
2906 serialize_response(&response)
2907 };
2908 }
2909 let response = identity_error_response(response_id, &e);
2910 return if is_notification {
2911 String::new()
2912 } else {
2913 serialize_response(&response)
2914 };
2915 }
2916 Err(e) => {
2917 let response = identity_error_response(response_id, &e);
2918 return if is_notification {
2919 String::new()
2920 } else {
2921 serialize_response(&response)
2922 };
2923 }
2924 };
2925 let keep_runtime_member_id = registered_status
2926 .agent_runtime_id
2927 .as_ref()
2928 .filter(|_| identity_rt.has_session_bridge())
2929 .map(crate::identity_first::AgentRuntimeId::as_str);
2930 match identity_rt.delete_identity(&identity).await {
2931 Ok(()) => {
2932 let cleanup_warning = if let Err(err) = retire_stale_rpc_members_for_identity(
2933 runtime,
2934 identity.as_str(),
2935 keep_runtime_member_id,
2936 )
2937 .await
2938 {
2939 Some(serde_json::json!({
2940 "kind": "stale_member_cleanup_failed_after_identity_delete",
2941 "identity": identity.as_str(),
2942 "message": err,
2943 }))
2944 } else {
2945 None
2946 };
2947 runtime
2948 .record_console_lifecycle(
2949 identity.as_str(),
2950 "identity_deleted",
2951 serde_json::json!({
2952 "cleanup_warning": cleanup_warning,
2953 }),
2954 )
2955 .await;
2956 JsonRpcResponse {
2957 jsonrpc: JSONRPC_VERSION.to_string(),
2958 id: response_id,
2959 result: Some(serde_json::json!({
2960 "identity": identity.as_str(),
2961 "cleanup_warning": cleanup_warning,
2962 })),
2963 error: None,
2964 }
2965 }
2966 Err(e) => identity_error_response(response_id, &e),
2967 }
2968 }
2969 "mobkit/inspect_identity" => {
2970 let identity_rt = match identity_ctx {
2971 Some(ctx) => &*ctx.runtime,
2972 None => return maybe_identity_not_configured(is_notification, response_id),
2973 };
2974 let identity_str = request
2975 .params
2976 .get("identity")
2977 .and_then(|v| v.as_str())
2978 .unwrap_or("");
2979 let target =
2980 match resolve_rpc_identity_control_target(runtime, identity_rt, identity_str).await
2981 {
2982 Ok(target) => target,
2983 Err(e) => {
2984 return maybe_error_response(
2985 is_notification,
2986 response_id,
2987 -32602,
2988 format!("invalid identity: {e}"),
2989 );
2990 }
2991 };
2992 let identity = target.identity.clone();
2993 let status = identity_rt.status(&identity).await;
2994 if let Some(response) =
2995 rpc_stale_live_alias_error_response(identity_rt, &target, response_id.clone()).await
2996 {
2997 return if is_notification {
2998 String::new()
2999 } else {
3000 serialize_response(&response)
3001 };
3002 }
3003 match identity_rt.inspect(&identity).await {
3004 Ok(inspection) => {
3005 let status = status.ok();
3006 JsonRpcResponse {
3007 jsonrpc: JSONRPC_VERSION.to_string(),
3008 id: response_id,
3009 result: Some(serde_json::json!({
3010 "identity": identity.as_str(),
3011 "state": status.as_ref().map(|status| format!("{:?}", status.state)),
3012 "profile": status.as_ref().and_then(|status| status.profile.as_ref().map(meerkat_mob::ProfileName::as_str)),
3013 "addressability": status.as_ref().map(|status| addressability_json(status.addressability)),
3014 "display_name": status.as_ref().and_then(|status| status.display_name.as_ref().map(super::identity_first::DisplayName::as_str)),
3015 "labels": status.as_ref().map(|status| status.labels.clone()).unwrap_or_default(),
3016 "generation": status.as_ref().and_then(|status| status.generation.map(super::identity_first::ContinuityGeneration::get)),
3017 "checkpoint_version": status.as_ref().and_then(|status| status.checkpoint_version.map(super::identity_first::CheckpointVersion::get)),
3018 "continuity_health": status.as_ref().and_then(|status| serde_json::to_value(&status.continuity_health).ok()).unwrap_or(Value::Null),
3019 "lease_healthy": status.as_ref().and_then(|status| status.lease.as_ref().map(|lease| lease.healthy)),
3020 "continuity": status.as_ref().map(|status| serde_json::json!({
3021 "generation": status.generation.map(super::identity_first::ContinuityGeneration::get),
3022 "checkpoint_version": status.checkpoint_version.map(super::identity_first::CheckpointVersion::get),
3023 "session_id": status.session_id.as_ref().map(ToString::to_string),
3024 "agent_runtime_id": status.agent_runtime_id.as_ref().map(super::identity_first::AgentRuntimeId::as_str),
3025 })).unwrap_or_else(|| serde_json::json!({})),
3026 "lease": status.as_ref().and_then(|status| status.lease.as_ref().map(|lease| serde_json::json!({
3027 "fencing_token": lease.fencing_token.get(),
3028 "ttl_remaining_ms": lease.ttl_remaining.as_millis() as u64,
3029 "healthy": lease.healthy,
3030 }))),
3031 "output_preview": inspection.output_preview,
3032 "is_final": inspection.is_final,
3033 "peer_reachable_count": inspection.peer_reachable_count,
3034 })),
3035 error: None,
3036 }
3037 }
3038 Err(e @ crate::identity_first::IdentityRuntimeError::UnknownIdentity(_)) => {
3039 if let Some(live) = target.live.as_ref() {
3040 JsonRpcResponse {
3041 jsonrpc: JSONRPC_VERSION.to_string(),
3042 id: response_id,
3043 result: Some(rpc_live_identity_inspect_json(runtime, live).await),
3044 error: None,
3045 }
3046 } else {
3047 identity_error_response(response_id, &e)
3048 }
3049 }
3050 Err(e) => identity_error_response(response_id, &e),
3051 }
3052 }
3053 "mobkit/reconcile_identity" => {
3054 let ctx = match identity_ctx {
3055 Some(ctx) => ctx,
3056 None => return maybe_identity_not_configured(is_notification, response_id),
3057 };
3058 let roster_specs = match ctx
3060 .roster_provider
3061 .roster(&crate::identity_first::RosterContext {
3062 mob_definition: None,
3063 previous_identities: Vec::new(),
3064 })
3065 .await
3066 {
3067 Ok(specs) => specs,
3068 Err(e) => {
3069 return maybe_error_response(
3070 is_notification,
3071 response_id,
3072 -32603,
3073 format!("roster provider failed: {e}"),
3074 );
3075 }
3076 };
3077 match crate::identity_first::restore_flow(
3078 &ctx.runtime,
3079 &roster_specs,
3080 ctx.topology_provider.as_deref(),
3081 ctx.customizer.as_deref(),
3082 )
3083 .await
3084 {
3085 Ok(result) => {
3086 let outcomes: serde_json::Map<String, Value> = result
3087 .outcomes
3088 .iter()
3089 .map(|(id, outcome)| {
3090 let val = match outcome {
3091 crate::identity_first::RestoreOutcome::Created {
3092 record, ..
3093 } => {
3094 serde_json::json!({
3095 "outcome": "created",
3096 "identity": record.identity.as_str(),
3097 "agent_runtime_id": record.agent_runtime_id.as_str(),
3098 "session_id": record.session_id.to_string(),
3099 "generation": record.generation.get(),
3100 })
3101 }
3102 crate::identity_first::RestoreOutcome::Dormant {
3103 record, ..
3104 } => {
3105 serde_json::json!({
3106 "outcome": "dormant",
3107 "identity": id.as_str(),
3108 "agent_runtime_id": record.as_ref().map(|record| record.agent_runtime_id.as_str()),
3109 "session_id": record.as_ref().map(|record| record.session_id.to_string()),
3110 "generation": record.as_ref().map(|record| record.generation.get()),
3111 })
3112 }
3113 crate::identity_first::RestoreOutcome::Resumed {
3114 record, ..
3115 } => {
3116 serde_json::json!({
3117 "outcome": "resumed",
3118 "identity": record.identity.as_str(),
3119 "agent_runtime_id": record.agent_runtime_id.as_str(),
3120 "session_id": record.session_id.to_string(),
3121 "generation": record.generation.get(),
3122 })
3123 }
3124 crate::identity_first::RestoreOutcome::Broken(failure) => {
3125 serde_json::json!({
3126 "outcome": "broken",
3127 "identity": failure.identity.as_str(),
3128 "detail": failure.detail,
3129 })
3130 }
3131 };
3132 (id.to_string(), val)
3133 })
3134 .collect();
3135 JsonRpcResponse {
3136 jsonrpc: JSONRPC_VERSION.to_string(),
3137 id: response_id,
3138 result: Some(serde_json::json!({
3139 "outcomes": outcomes,
3140 "managed_edges": result.managed_edges.len(),
3141 })),
3142 error: None,
3143 }
3144 }
3145 Err(e) => identity_error_response(response_id, &e),
3146 }
3147 }
3148 method if method.contains('/') && !method.starts_with("mobkit/") => {
3149 let module_id = method
3150 .split('/')
3151 .next()
3152 .map(ToString::to_string)
3153 .unwrap_or_default();
3154 let route = runtime
3155 .route_module_call(
3156 &ModuleRouteRequest {
3157 module_id: module_id.clone(),
3158 method: method.to_string(),
3159 params: request.params,
3160 },
3161 timeout,
3162 )
3163 .await;
3164 match route {
3165 Ok(response) => JsonRpcResponse {
3166 jsonrpc: JSONRPC_VERSION.to_string(),
3167 id: response_id,
3168 result: Some(serde_json::json!({
3169 "module_id": response.module_id,
3170 "method": response.method,
3171 "payload": response.payload
3172 })),
3173 error: None,
3174 },
3175 Err(ModuleRouteError::UnloadedModule(module_id)) => JsonRpcResponse {
3176 jsonrpc: JSONRPC_VERSION.to_string(),
3177 id: response_id,
3178 result: None,
3179 error: Some(JsonRpcError {
3180 code: -32601,
3181 message: format!("Module '{module_id}' not loaded"),
3182 data: None,
3183 }),
3184 },
3185 Err(err) => JsonRpcResponse {
3186 jsonrpc: JSONRPC_VERSION.to_string(),
3187 id: response_id,
3188 result: None,
3189 error: Some(JsonRpcError {
3190 code: -32000,
3191 message: format!("Module route failed: {err:?}"),
3192 data: None,
3193 }),
3194 },
3195 }
3196 }
3197 _ => JsonRpcResponse {
3198 jsonrpc: JSONRPC_VERSION.to_string(),
3199 id: response_id,
3200 result: None,
3201 error: Some(JsonRpcError {
3202 code: -32601,
3203 message: "Method not found".to_string(),
3204 data: None,
3205 }),
3206 },
3207 };
3208 if is_notification {
3209 String::new()
3210 } else {
3211 serialize_response(&response)
3212 }
3213}
3214
3215fn build_models_catalog_result() -> Value {
3216 let entries: Vec<Value> = meerkat_models::catalog()
3217 .iter()
3218 .filter_map(|e| {
3219 let mut val = serde_json::to_value(e).ok()?;
3220 if let Some(provider) = meerkat_core::Provider::parse_strict(e.provider)
3221 && let Some(profile) = meerkat_models::profile_for(provider, e.id)
3222 && let Ok(p) = serde_json::to_value(&profile)
3223 {
3224 val["profile"] = p;
3225 }
3226 Some(val)
3227 })
3228 .collect();
3229 let defaults: Vec<Value> = meerkat_models::provider_defaults()
3230 .iter()
3231 .filter_map(|d| serde_json::to_value(d).ok())
3232 .collect();
3233 serde_json::json!({
3234 "models": entries,
3235 "provider_defaults": defaults,
3236 })
3237}
3238
3239#[derive(Debug, Clone)]
3240struct RpcLiveIdentityAlias {
3241 identity: crate::identity_first::AgentIdentity,
3242 runtime_member_id: String,
3243 member: meerkat_mob::runtime::MobMemberListEntry,
3244 session_id: Option<String>,
3245}
3246
3247#[derive(Debug, Clone)]
3248struct RpcIdentityControlTarget {
3249 identity: crate::identity_first::AgentIdentity,
3250 live: Option<RpcLiveIdentityAlias>,
3251}
3252
3253fn rpc_member_durable_identity(member: &meerkat_mob::runtime::MobMemberListEntry) -> String {
3254 member
3255 .labels
3256 .get("agent_identity")
3257 .filter(|value| !value.trim().is_empty())
3258 .cloned()
3259 .unwrap_or_else(|| member.agent_identity.to_string())
3260}
3261
3262async fn resolve_rpc_live_identity_alias(
3263 runtime: &UnifiedRuntime,
3264 requested_identity: &str,
3265) -> Result<Option<RpcLiveIdentityAlias>, String> {
3266 let matches = resolve_rpc_live_identity_alias_candidates(runtime, requested_identity).await?;
3267 if matches.len() > 1 {
3268 return Err(format!(
3269 "ambiguous live identity alias {requested_identity}: candidates [{}]",
3270 matches
3271 .iter()
3272 .map(|entry| entry.runtime_member_id.clone())
3273 .collect::<Vec<_>>()
3274 .join(", ")
3275 ));
3276 }
3277 Ok(matches.into_iter().next())
3278}
3279
3280async fn resolve_rpc_live_runtime_member_alias(
3281 runtime: &UnifiedRuntime,
3282 runtime_member_id: &str,
3283) -> Result<Option<RpcLiveIdentityAlias>, String> {
3284 let requested_member_id = meerkat_mob::ids::MeerkatId::from(runtime_member_id);
3285 let handle = runtime.mob_handle();
3286 let Some(member) = handle
3287 .list_members_including_retiring()
3288 .await
3289 .into_iter()
3290 .find(|entry| entry.agent_identity == requested_member_id)
3291 else {
3292 return Ok(None);
3293 };
3294 if !rpc_live_identity_alias_member_visible(&member) {
3295 return Ok(None);
3296 }
3297 let durable_identity = rpc_member_durable_identity(&member);
3298 let identity = crate::identity_first::AgentIdentity::parse(&durable_identity)
3299 .map_err(|err| format!("invalid projected identity {durable_identity}: {err}"))?;
3300 let session_id = handle
3301 .resolve_bridge_session_id_observation(&member.agent_identity)
3302 .await
3303 .map(|session_id| session_id.to_string());
3304 Ok(Some(RpcLiveIdentityAlias {
3305 identity,
3306 runtime_member_id: member.agent_identity.to_string(),
3307 member,
3308 session_id,
3309 }))
3310}
3311
3312async fn rpc_runtime_member_alias_exists_hidden(
3313 runtime: &UnifiedRuntime,
3314 runtime_member_id: &str,
3315) -> bool {
3316 let requested_member_id = meerkat_mob::ids::MeerkatId::from(runtime_member_id);
3317 runtime
3318 .mob_handle()
3319 .list_members_including_retiring()
3320 .await
3321 .into_iter()
3322 .find(|entry| entry.agent_identity == requested_member_id)
3323 .is_some_and(|member| !rpc_live_identity_alias_member_visible(&member))
3324}
3325
3326async fn rpc_live_identity_alias_exists_hidden(
3327 runtime: &UnifiedRuntime,
3328 requested_identity: &str,
3329) -> bool {
3330 let requested_member_id = meerkat_mob::ids::MeerkatId::from(requested_identity);
3331 runtime
3332 .mob_handle()
3333 .list_members_including_retiring()
3334 .await
3335 .into_iter()
3336 .any(|member| {
3337 (member.agent_identity == requested_member_id
3338 || member
3339 .labels
3340 .get("agent_identity")
3341 .is_some_and(|identity| identity == requested_identity))
3342 && !rpc_live_identity_alias_member_visible(&member)
3343 })
3344}
3345
3346async fn resolve_rpc_live_identity_alias_candidates(
3347 runtime: &UnifiedRuntime,
3348 requested_identity: &str,
3349) -> Result<Vec<RpcLiveIdentityAlias>, String> {
3350 let requested_member_id = meerkat_mob::ids::MeerkatId::from(requested_identity);
3351 let handle = runtime.mob_handle();
3352 let members = handle.list_members_including_retiring().await;
3353 let exact_matches = members
3354 .iter()
3355 .filter(|entry| entry.agent_identity == requested_member_id)
3356 .cloned()
3357 .collect::<Vec<_>>();
3358 let label_matches = members
3359 .iter()
3360 .filter(|entry| {
3361 entry
3362 .labels
3363 .get("agent_identity")
3364 .is_some_and(|identity| identity == requested_identity)
3365 })
3366 .cloned()
3367 .collect::<Vec<_>>();
3368 let mut matches = exact_matches;
3369 matches.extend(label_matches);
3370 let mut seen_member_ids = BTreeSet::new();
3371 matches.retain(|entry| seen_member_ids.insert(entry.agent_identity.to_string()));
3372 let mut aliases = Vec::with_capacity(matches.len());
3373 for member in matches {
3374 if !rpc_live_identity_alias_member_visible(&member) {
3375 continue;
3376 }
3377 let durable_identity = rpc_member_durable_identity(&member);
3378 let identity = crate::identity_first::AgentIdentity::parse(&durable_identity)
3379 .map_err(|err| format!("invalid projected identity {durable_identity}: {err}"))?;
3380 let session_id = handle
3381 .resolve_bridge_session_id_observation(&member.agent_identity)
3382 .await
3383 .map(|session_id| session_id.to_string());
3384 aliases.push(RpcLiveIdentityAlias {
3385 identity,
3386 runtime_member_id: member.agent_identity.to_string(),
3387 member,
3388 session_id,
3389 });
3390 }
3391 Ok(aliases)
3392}
3393
3394fn rpc_live_identity_alias_member_visible(
3395 member: &meerkat_mob::runtime::MobMemberListEntry,
3396) -> bool {
3397 rpc_live_identity_alias_visible(member.role.as_str(), &member.labels)
3398}
3399
3400fn rpc_live_identity_alias_visible(
3401 member_role: &str,
3402 labels: &std::collections::BTreeMap<String, String>,
3403) -> bool {
3404 let projected_role = labels
3405 .get("role")
3406 .map(String::as_str)
3407 .unwrap_or(member_role);
3408 !is_implicit_delegate_member(member_role, labels)
3409 && !is_implicit_delegate_member(projected_role, labels)
3410}
3411
3412async fn resolve_rpc_identity_control_target(
3413 runtime: &UnifiedRuntime,
3414 identity_rt: &crate::identity_first::IdentityRuntime,
3415 requested_identity: &str,
3416) -> Result<RpcIdentityControlTarget, String> {
3417 if requested_identity.starts_with("rt:") {
3418 for status in identity_rt.statuses().await {
3419 if status
3420 .agent_runtime_id
3421 .as_ref()
3422 .is_some_and(|runtime_id| runtime_id.as_str() == requested_identity)
3423 {
3424 let identity = status.identity;
3425 let registered_live =
3426 resolve_rpc_live_runtime_member_alias(runtime, requested_identity).await?;
3427 if let Some(registered) = registered_live {
3428 return Ok(RpcIdentityControlTarget {
3429 identity,
3430 live: Some(registered),
3431 });
3432 }
3433 if rpc_runtime_member_alias_exists_hidden(runtime, requested_identity).await {
3434 return Err(format!("identity hidden by policy: {requested_identity}"));
3435 }
3436 let durable_live_candidates =
3437 resolve_rpc_live_identity_alias_candidates(runtime, identity.as_str()).await?;
3438 let durable_live = if durable_live_candidates.len() > 1 {
3439 return Err(format!(
3440 "ambiguous live identity alias {}: candidates [{}]",
3441 identity.as_str(),
3442 durable_live_candidates
3443 .iter()
3444 .map(|alias| alias.runtime_member_id.clone())
3445 .collect::<Vec<_>>()
3446 .join(", ")
3447 ));
3448 } else {
3449 durable_live_candidates.into_iter().next()
3450 };
3451 return Ok(RpcIdentityControlTarget {
3452 identity,
3453 live: durable_live,
3454 });
3455 }
3456 }
3457 let live = resolve_rpc_live_identity_alias(runtime, requested_identity).await?;
3458 if let Some(live_alias) = live {
3459 let live_identity_candidates =
3460 resolve_rpc_live_identity_alias_candidates(runtime, live_alias.identity.as_str())
3461 .await?;
3462 if live_identity_candidates.len() > 1 {
3463 return Err(format!(
3464 "ambiguous live identity alias {}: candidates [{}]",
3465 live_alias.identity.as_str(),
3466 live_identity_candidates
3467 .iter()
3468 .map(|alias| alias.runtime_member_id.clone())
3469 .collect::<Vec<_>>()
3470 .join(", ")
3471 ));
3472 }
3473 return Ok(RpcIdentityControlTarget {
3474 identity: live_alias.identity.clone(),
3475 live: Some(live_alias),
3476 });
3477 }
3478 if rpc_runtime_member_alias_exists_hidden(runtime, requested_identity).await {
3479 return Err(format!("identity hidden by policy: {requested_identity}"));
3480 }
3481 return Err(format!("runtime identity not found: {requested_identity}"));
3482 }
3483 if let Ok(identity) = crate::identity_first::AgentIdentity::parse(requested_identity) {
3484 match identity_rt.status(&identity).await {
3485 Ok(status) => {
3486 let registered_live = match status.agent_runtime_id.as_ref() {
3487 Some(runtime_id) => {
3488 resolve_rpc_live_runtime_member_alias(runtime, runtime_id.as_str()).await?
3489 }
3490 None => None,
3491 };
3492 if let Some(registered) = registered_live {
3493 return Ok(RpcIdentityControlTarget {
3494 identity,
3495 live: Some(registered),
3496 });
3497 }
3498 if let Some(runtime_id) = status.agent_runtime_id.as_ref()
3499 && rpc_runtime_member_alias_exists_hidden(runtime, runtime_id.as_str()).await
3500 {
3501 return Err(format!("identity hidden by policy: {requested_identity}"));
3502 }
3503 let requested_live_candidates =
3504 resolve_rpc_live_identity_alias_candidates(runtime, requested_identity).await?;
3505 let requested_live = if requested_live_candidates.len() > 1 {
3506 return Err(format!(
3507 "ambiguous live identity alias {requested_identity}: candidates [{}]",
3508 requested_live_candidates
3509 .iter()
3510 .map(|alias| alias.runtime_member_id.clone())
3511 .collect::<Vec<_>>()
3512 .join(", ")
3513 ));
3514 } else {
3515 requested_live_candidates.into_iter().next()
3516 };
3517 return Ok(RpcIdentityControlTarget {
3518 identity,
3519 live: requested_live,
3520 });
3521 }
3522 Err(crate::identity_first::IdentityRuntimeError::UnknownIdentity(_)) => {}
3523 Err(err) => return Err(err.to_string()),
3524 }
3525 }
3526 for status in identity_rt.statuses().await {
3527 if status
3528 .agent_runtime_id
3529 .as_ref()
3530 .is_some_and(|runtime_id| runtime_id.as_str() == requested_identity)
3531 {
3532 let identity = status.identity;
3533 let registered_live =
3534 resolve_rpc_live_runtime_member_alias(runtime, requested_identity).await?;
3535 let durable_live_candidates =
3536 resolve_rpc_live_identity_alias_candidates(runtime, identity.as_str()).await?;
3537 let durable_live = if durable_live_candidates.len() > 1 {
3538 return Err(format!(
3539 "ambiguous live identity alias {}: candidates [{}]",
3540 identity.as_str(),
3541 durable_live_candidates
3542 .iter()
3543 .map(|alias| alias.runtime_member_id.clone())
3544 .collect::<Vec<_>>()
3545 .join(", ")
3546 ));
3547 } else {
3548 durable_live_candidates.into_iter().next()
3549 };
3550 let live = match (registered_live, durable_live) {
3551 (Some(registered), Some(durable))
3552 if registered.runtime_member_id == durable.runtime_member_id =>
3553 {
3554 Some(registered)
3555 }
3556 (Some(registered), None) => Some(registered),
3557 (Some(_registered), Some(durable)) => Some(durable),
3558 (None, durable) => durable,
3559 };
3560 return Ok(RpcIdentityControlTarget { identity, live });
3561 }
3562 }
3563 let live = resolve_rpc_live_identity_alias(runtime, requested_identity).await?;
3564 if let Some(live_alias) = live {
3565 if let Some(bound_status) = identity_rt.statuses().await.into_iter().find(|status| {
3566 status
3567 .agent_runtime_id
3568 .as_ref()
3569 .is_some_and(|runtime_id| runtime_id.as_str() == live_alias.runtime_member_id)
3570 }) && bound_status.identity != live_alias.identity
3571 {
3572 return Err(format!(
3573 "stale live identity alias: live console alias {} resolves to {}, but identity runtime binding belongs to {}",
3574 live_alias.identity.as_str(),
3575 live_alias.runtime_member_id,
3576 bound_status.identity.as_str(),
3577 ));
3578 }
3579 let live_identity_candidates =
3580 resolve_rpc_live_identity_alias_candidates(runtime, live_alias.identity.as_str())
3581 .await?;
3582 if live_identity_candidates.len() > 1 {
3583 return Err(format!(
3584 "ambiguous live identity alias {}: candidates [{}]",
3585 live_alias.identity.as_str(),
3586 live_identity_candidates
3587 .iter()
3588 .map(|alias| alias.runtime_member_id.clone())
3589 .collect::<Vec<_>>()
3590 .join(", ")
3591 ));
3592 }
3593 return Ok(RpcIdentityControlTarget {
3594 identity: live_alias.identity.clone(),
3595 live: Some(live_alias),
3596 });
3597 }
3598 if rpc_live_identity_alias_exists_hidden(runtime, requested_identity).await {
3599 return Err(format!("identity hidden by policy: {requested_identity}"));
3600 }
3601 let identity = crate::identity_first::AgentIdentity::parse(requested_identity)
3602 .map_err(|err| err.to_string())?;
3603 Ok(RpcIdentityControlTarget {
3604 identity,
3605 live: None,
3606 })
3607}
3608
3609fn rpc_reset_requires_session_bridge_response(response_id: Value) -> JsonRpcResponse {
3610 JsonRpcResponse {
3611 jsonrpc: JSONRPC_VERSION.to_string(),
3612 id: response_id,
3613 result: None,
3614 error: Some(JsonRpcError {
3615 code: -32602,
3616 message: "reset requires an identity runtime with a session bridge".to_string(),
3617 data: Some(serde_json::json!({
3618 "kind": "identity_reset_requires_session_bridge",
3619 })),
3620 }),
3621 }
3622}
3623
3624fn rpc_live_alias_matches_status_runtime(
3625 alias: Option<&RpcLiveIdentityAlias>,
3626 status: &crate::identity_first::IdentityStatus,
3627) -> bool {
3628 let Some(alias) = alias else {
3629 return true;
3630 };
3631 let session_matches = match (
3632 status.session_id.as_ref().map(ToString::to_string),
3633 alias.session_id.as_deref(),
3634 ) {
3635 (Some(status_session), Some(live_session)) => status_session == live_session,
3636 _ => true,
3637 };
3638 status
3639 .agent_runtime_id
3640 .as_ref()
3641 .is_some_and(|runtime_id| runtime_id.as_str() == alias.runtime_member_id)
3642 && alias.identity == status.identity
3643 && session_matches
3644}
3645
3646async fn rpc_stale_live_alias_error_response(
3647 identity_rt: &crate::identity_first::IdentityRuntime,
3648 target: &RpcIdentityControlTarget,
3649 response_id: Value,
3650) -> Option<JsonRpcResponse> {
3651 let live = target.live.as_ref()?;
3652 let Ok(status) = identity_rt.status(&target.identity).await else {
3653 return None;
3654 };
3655 if rpc_live_alias_matches_status_runtime(Some(live), &status) {
3656 return None;
3657 }
3658 Some(JsonRpcResponse {
3659 jsonrpc: JSONRPC_VERSION.to_string(),
3660 id: response_id,
3661 result: None,
3662 error: Some(JsonRpcError {
3663 code: -32000,
3664 message: format!(
3665 "identity runtime binding for {} points at {}, but requested live member is {}",
3666 target.identity.as_str(),
3667 status
3668 .agent_runtime_id
3669 .as_ref()
3670 .map(crate::identity_first::AgentRuntimeId::as_str)
3671 .unwrap_or("<none>"),
3672 live.runtime_member_id
3673 ),
3674 data: Some(serde_json::json!({
3675 "kind": "stale_identity_runtime_binding",
3676 "identity": target.identity.as_str(),
3677 "registered_runtime_member_id": status.agent_runtime_id.as_ref().map(crate::identity_first::AgentRuntimeId::as_str),
3678 "live_runtime_member_id": live.runtime_member_id,
3679 "registered_session_id": status.session_id.as_ref().map(ToString::to_string),
3680 "live_session_id": live.session_id,
3681 })),
3682 }),
3683 })
3684}
3685
3686fn rpc_member_is_addressable(member: &meerkat_mob::runtime::MobMemberListEntry) -> bool {
3687 member
3688 .labels
3689 .get("addressable")
3690 .map(|value| !value.eq_ignore_ascii_case("false"))
3691 .unwrap_or(true)
3692}
3693
3694fn rpc_live_identity_status_json(alias: &RpcLiveIdentityAlias) -> Value {
3695 serde_json::json!({
3696 "state": format!("{:?}", alias.member.state),
3697 "identity": alias.identity.as_str(),
3698 "agent_runtime_id": alias.runtime_member_id,
3699 "session_id": alias.session_id,
3700 "profile": alias.member.role.to_string(),
3701 "addressability": if rpc_member_is_addressable(&alias.member) { "addressable" } else { "internal_only" },
3702 "display_name": alias.member.labels.get("display_name"),
3703 "labels": alias.member.labels,
3704 "generation": Value::Null,
3705 "checkpoint_version": Value::Null,
3706 "continuity_health": Value::Null,
3707 "lease_healthy": Value::Null,
3708 "lease": Value::Null,
3709 })
3710}
3711
3712async fn rpc_live_identity_inspect_json(
3713 runtime: &UnifiedRuntime,
3714 alias: &RpcLiveIdentityAlias,
3715) -> Value {
3716 let snapshot = runtime
3717 .mob_handle()
3718 .member_status(&meerkat_mob::ids::MeerkatId::from(
3719 alias.runtime_member_id.as_str(),
3720 ))
3721 .await
3722 .ok();
3723 serde_json::json!({
3724 "identity": alias.identity.as_str(),
3725 "state": format!("{:?}", alias.member.state),
3726 "profile": alias.member.role.to_string(),
3727 "addressability": if rpc_member_is_addressable(&alias.member) { "addressable" } else { "internal_only" },
3728 "display_name": alias.member.labels.get("display_name"),
3729 "labels": alias.member.labels,
3730 "generation": Value::Null,
3731 "checkpoint_version": Value::Null,
3732 "continuity_health": Value::Null,
3733 "lease_healthy": Value::Null,
3734 "continuity": {
3735 "generation": Value::Null,
3736 "checkpoint_version": Value::Null,
3737 "session_id": alias.session_id,
3738 "agent_runtime_id": alias.runtime_member_id,
3739 },
3740 "lease": Value::Null,
3741 "output_preview": snapshot.as_ref().and_then(|snapshot| snapshot.output_preview.clone()),
3742 "is_final": snapshot.as_ref().map(|snapshot| snapshot.is_final).unwrap_or(false),
3743 "peer_reachable_count": alias.member.wired_to.len(),
3744 })
3745}
3746
3747async fn retire_rpc_live_identity(
3748 runtime: &UnifiedRuntime,
3749 alias: &RpcLiveIdentityAlias,
3750) -> Result<(), String> {
3751 retire_rpc_runtime_member_id(runtime, alias.runtime_member_id.as_str()).await
3752}
3753
3754async fn retire_rpc_runtime_member_id(
3755 runtime: &UnifiedRuntime,
3756 runtime_member_id: &str,
3757) -> Result<(), String> {
3758 match runtime
3759 .mob_handle()
3760 .retire(meerkat_mob::ids::MeerkatId::from(runtime_member_id))
3761 .await
3762 {
3763 Ok(()) => Ok(()),
3764 Err(err) if mob_methods::lifecycle_archive_cleanup_completed(&err.to_string()) => Ok(()),
3765 Err(err) => Err(err.to_string()),
3766 }
3767}
3768
3769fn rpc_member_id_matches_durable_identity(member_id: &str, durable_identity: &str) -> bool {
3770 member_id == durable_identity
3771}
3772
3773async fn retire_stale_rpc_members_for_identity(
3774 runtime: &UnifiedRuntime,
3775 durable_identity: &str,
3776 keep_runtime_member_id: Option<&str>,
3777) -> Result<(), String> {
3778 let stale_members = runtime
3779 .mob_handle()
3780 .list_members_including_retiring()
3781 .await
3782 .into_iter()
3783 .filter(|member| {
3784 if !rpc_live_identity_alias_member_visible(member) {
3785 return false;
3786 }
3787 (rpc_member_id_matches_durable_identity(
3788 member.agent_identity.as_str(),
3789 durable_identity,
3790 ) || member
3791 .labels
3792 .get("agent_identity")
3793 .is_some_and(|identity| identity == durable_identity))
3794 && keep_runtime_member_id
3795 .map(|keep| member.agent_identity.as_str() != keep)
3796 .unwrap_or(true)
3797 })
3798 .map(|member| member.agent_identity.to_string())
3799 .collect::<Vec<_>>();
3800 for member_id in stale_members {
3801 retire_rpc_runtime_member_id(runtime, &member_id).await?;
3802 }
3803 Ok(())
3804}
3805
3806async fn respawn_rpc_live_identity(
3807 runtime: &UnifiedRuntime,
3808 alias: &RpcLiveIdentityAlias,
3809) -> Result<Value, String> {
3810 let mut result =
3811 respawn_rpc_runtime_member_id(runtime, alias.runtime_member_id.as_str()).await?;
3812 result["identity"] = serde_json::json!(alias.identity.as_str());
3813 Ok(result)
3814}
3815
3816async fn respawn_rpc_runtime_member_id(
3817 runtime: &UnifiedRuntime,
3818 runtime_member_id: &str,
3819) -> Result<Value, String> {
3820 let handle = runtime.mob_handle();
3821 let member_id = meerkat_mob::ids::MeerkatId::from(runtime_member_id);
3822 let entry_before_respawn = handle.get_member(&member_id).await;
3823 match handle.respawn(member_id.clone(), None).await {
3824 Ok(_receipt) => {}
3825 Err(err) if mob_methods::lifecycle_archive_cleanup_completed(&err.to_string()) => {
3826 if handle.get_member(&member_id).await.is_none()
3827 && let Some(entry) = entry_before_respawn
3828 {
3829 let mut spec =
3830 meerkat_mob::SpawnMemberSpec::new(entry.role.clone(), member_id.clone());
3831 if !entry.labels.is_empty() {
3832 spec = spec.with_labels(entry.labels.clone());
3833 }
3834 handle
3835 .ensure_member(spec)
3836 .await
3837 .map_err(|ensure_err| ensure_err.to_string())?;
3838 }
3839 }
3840 Err(err) => return Err(err.to_string()),
3841 }
3842 let session_id = handle
3843 .resolve_bridge_session_id_observation(&member_id)
3844 .await
3845 .map(|session_id| session_id.to_string());
3846 Ok(serde_json::json!({
3847 "agent_runtime_id": runtime_member_id,
3848 "session_id": session_id,
3849 "generation": Value::Null,
3850 "checkpoint_version": Value::Null,
3851 }))
3852}
3853
3854fn identity_not_configured(response_id: Value) -> String {
3855 error_response(response_id, -32601, "identity-first runtime not configured")
3856}
3857
3858fn maybe_identity_not_configured(is_notification: bool, response_id: Value) -> String {
3859 if is_notification {
3860 String::new()
3861 } else {
3862 identity_not_configured(response_id)
3863 }
3864}
3865
3866fn addressability_json(addressability: crate::identity_first::AgentAddressability) -> &'static str {
3867 match addressability {
3868 crate::identity_first::AgentAddressability::Addressable => "addressable",
3869 crate::identity_first::AgentAddressability::InternalOnly => "internal_only",
3870 }
3871}
3872
3873fn identity_error_response(
3874 response_id: Value,
3875 err: &crate::identity_first::IdentityRuntimeError,
3876) -> JsonRpcResponse {
3877 use crate::identity_first::IdentityRuntimeError;
3878 let (code, message) = match err {
3879 IdentityRuntimeError::UnknownIdentity(id) => (-32001, format!("unknown identity: {id}")),
3880 IdentityRuntimeError::NotAddressable(na) => {
3881 (-32002, format!("not addressable: {}", na.identity))
3882 }
3883 IdentityRuntimeError::NoActiveLease(id) => (-32003, format!("no active lease: {id}")),
3884 IdentityRuntimeError::LeaseLost(id) => (-32004, format!("lease lost: {id}")),
3885 _ => (-32603, format!("{err}")),
3886 };
3887 JsonRpcResponse {
3888 jsonrpc: JSONRPC_VERSION.to_string(),
3889 id: response_id,
3890 result: None,
3891 error: Some(JsonRpcError {
3892 code,
3893 message,
3894 data: None,
3895 }),
3896 }
3897}
3898
3899fn error_response(response_id: Value, code: i64, message: impl Into<String>) -> String {
3900 let message = message.into();
3901 let ambiguous_alias_rest = message
3902 .strip_prefix("ambiguous live identity alias ")
3903 .or_else(|| message.strip_prefix("invalid identity: ambiguous live identity alias "));
3904 let stale_live_alias_rest = message
3905 .strip_prefix("stale live identity alias: live console alias ")
3906 .or_else(|| {
3907 message.strip_prefix("invalid identity: stale live identity alias: live console alias ")
3908 });
3909 let hidden_policy_identity = message
3910 .strip_prefix("identity hidden by policy: ")
3911 .or_else(|| message.strip_prefix("invalid identity: identity hidden by policy: "));
3912 let data = if let Some(rest) = ambiguous_alias_rest {
3913 let (identity, candidates) = rest
3914 .split_once(": candidates [")
3915 .map(|(identity, candidates)| {
3916 (
3917 identity.to_string(),
3918 candidates
3919 .trim_end_matches(']')
3920 .split(',')
3921 .map(str::trim)
3922 .filter(|value| !value.is_empty())
3923 .map(str::to_string)
3924 .collect::<Vec<_>>(),
3925 )
3926 })
3927 .unwrap_or_else(|| (rest.to_string(), Vec::new()));
3928 Some(serde_json::json!({
3929 "kind": "ambiguous_live_identity_alias",
3930 "identity": identity,
3931 "candidates": candidates,
3932 }))
3933 } else if let Some(rest) = stale_live_alias_rest {
3934 let (identity, rest) = rest.split_once(" resolves to ").unwrap_or((rest, ""));
3935 let (runtime_member_id, bound_identity) = rest
3936 .split_once(", but identity runtime binding belongs to ")
3937 .unwrap_or((rest, ""));
3938 Some(serde_json::json!({
3939 "kind": "stale_live_identity_alias",
3940 "identity": identity,
3941 "live_runtime_member_id": runtime_member_id,
3942 "bound_identity": bound_identity,
3943 }))
3944 } else {
3945 hidden_policy_identity.map(|identity| {
3946 serde_json::json!({
3947 "kind": "identity_hidden_by_policy",
3948 "identity": identity,
3949 })
3950 })
3951 };
3952 serialize_response(&JsonRpcResponse {
3953 jsonrpc: JSONRPC_VERSION.to_string(),
3954 id: response_id,
3955 result: None,
3956 error: Some(JsonRpcError {
3957 code,
3958 message,
3959 data,
3960 }),
3961 })
3962}
3963
3964fn maybe_error_response(
3965 is_notification: bool,
3966 response_id: Value,
3967 code: i64,
3968 message: impl Into<String>,
3969) -> String {
3970 if is_notification {
3971 String::new()
3972 } else {
3973 error_response(response_id, code, message)
3974 }
3975}
3976
3977fn serialize_response(response: &JsonRpcResponse) -> String {
3978 serde_json::to_string(response).unwrap_or_else(|_| {
3979 r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}}"#
3980 .to_string()
3981 })
3982}
3983
3984#[cfg(test)]
3985#[allow(clippy::expect_used)]
3986mod tests {
3987 use super::{
3988 error_response, handle_unified_rpc_json, resolve_rpc_identity_control_target,
3989 rpc_live_identity_alias_visible, rpc_member_id_matches_durable_identity,
3990 };
3991 use crate::identity_first::contracts::RosterProvider;
3992 use crate::identity_first::{
3993 AgentAddressability, AgentIdentity, AgentRuntimeId, CheckpointVersion,
3994 ContinuityGeneration, ContinuityRecord, DurabilityPolicy, DurableAgentSpec, FencingToken,
3995 IdentityLifecycleState, IdentityRuntime, IdentityRuntimeConfig, LeaseGrant,
3996 LocalContinuityStore, LocalLeaseProvider, RosterContext, RosterError,
3997 };
3998 use crate::{
3999 DiscoverySpec, IdentityFirstContext, MobBootstrapOptions, MobBootstrapSpec, MobKitConfig,
4000 UnifiedRuntime,
4001 };
4002 use async_trait::async_trait;
4003 use meerkat::{AgentFactory, Config, build_ephemeral_service};
4004 use meerkat_client::TestClient;
4005 use meerkat_mob::{MobDefinition, MobStorage, SpawnMemberSpec};
4006 use serde_json::{Value, json};
4007 use std::collections::BTreeMap;
4008 use std::sync::Arc;
4009 use std::time::Duration;
4010
4011 #[derive(Debug, Default)]
4012 struct EmptyRosterProvider;
4013
4014 #[async_trait]
4015 impl RosterProvider for EmptyRosterProvider {
4016 async fn roster(
4017 &self,
4018 _context: &RosterContext,
4019 ) -> Result<Vec<DurableAgentSpec>, RosterError> {
4020 Ok(Vec::new())
4021 }
4022 }
4023
4024 fn rpc_test_mob_spec(
4025 temp_dir: &tempfile::TempDir,
4026 ) -> Result<MobBootstrapSpec, Box<dyn std::error::Error + Send + Sync>> {
4027 let session_path = temp_dir.path().join("sessions");
4028 std::fs::create_dir_all(&session_path)?;
4029 let factory = AgentFactory::new(&session_path).comms(true);
4030 let session_service = Arc::new(build_ephemeral_service(factory, Config::default(), 16));
4031 let definition = MobDefinition::from_toml(
4032 r#"
4033[mob]
4034id = "rpc-identity-alias-test"
4035
4036[profiles.worker]
4037model = "gpt-5.5"
4038external_addressable = true
4039
4040[profiles.worker.tools]
4041comms = true
4042"#,
4043 )?;
4044 Ok(
4045 MobBootstrapSpec::new(definition, MobStorage::in_memory(), session_service)
4046 .with_options(MobBootstrapOptions {
4047 allow_ephemeral_sessions: true,
4048 notify_orchestrator_on_resume: true,
4049 default_llm_client: Some(Arc::new(TestClient::default())),
4050 }),
4051 )
4052 }
4053
4054 #[test]
4055 fn generated_runtime_ids_match_their_durable_identity_prefix() {
4056 assert!(!rpc_member_id_matches_durable_identity(
4057 "rt:review:singleton:0",
4058 "review:singleton",
4059 ));
4060 assert!(!rpc_member_id_matches_durable_identity(
4061 "review:singleton:gen1",
4062 "review:singleton",
4063 ));
4064 assert!(!rpc_member_id_matches_durable_identity(
4065 "review:singleton:1",
4066 "review:singleton",
4067 ));
4068 assert!(!rpc_member_id_matches_durable_identity(
4069 "rt:reviewer:singleton:0",
4070 "review:singleton",
4071 ));
4072 assert!(!rpc_member_id_matches_durable_identity(
4073 "rt:review:singleton:qa:0",
4074 "review:singleton",
4075 ));
4076 assert!(!rpc_member_id_matches_durable_identity(
4077 "review:singleton:qa",
4078 "review:singleton",
4079 ));
4080 }
4081
4082 #[test]
4083 fn rpc_live_identity_visibility_matches_delegate_projection_labels() {
4084 assert!(rpc_live_identity_alias_visible("worker", &BTreeMap::new()));
4085
4086 let mut labels = BTreeMap::new();
4087 labels.insert("role".to_string(), "delegate".to_string());
4088 labels.insert("source_mob_id".to_string(), "mob-a".to_string());
4089 labels.insert("agent_identity".to_string(), "review:singleton".to_string());
4090 assert!(!rpc_live_identity_alias_visible("worker", &labels));
4091 assert!(!rpc_live_identity_alias_visible("delegate", &labels));
4092 }
4093
4094 #[test]
4095 fn ambiguous_live_alias_errors_include_structured_data() -> Result<(), serde_json::Error> {
4096 let response: Value = serde_json::from_str(&error_response(
4097 json!(1),
4098 -32602,
4099 "ambiguous live identity alias review:singleton: candidates [rt:review:singleton:0, rt:review:singleton:1]",
4100 ))?;
4101
4102 assert_eq!(
4103 response["error"]["data"]["kind"],
4104 json!("ambiguous_live_identity_alias")
4105 );
4106 assert_eq!(
4107 response["error"]["data"]["identity"],
4108 json!("review:singleton")
4109 );
4110 assert_eq!(
4111 response["error"]["data"]["candidates"],
4112 json!(["rt:review:singleton:0", "rt:review:singleton:1"])
4113 );
4114 Ok(())
4115 }
4116
4117 #[test]
4118 fn wrapped_ambiguous_live_alias_errors_include_structured_data() -> Result<(), serde_json::Error>
4119 {
4120 let response: Value = serde_json::from_str(&error_response(
4121 json!(1),
4122 -32602,
4123 "invalid identity: ambiguous live identity alias review:singleton: candidates [rt:review:singleton:0, rt:review:singleton:1]",
4124 ))?;
4125
4126 assert_eq!(
4127 response["error"]["data"]["kind"],
4128 json!("ambiguous_live_identity_alias")
4129 );
4130 assert_eq!(
4131 response["error"]["data"]["identity"],
4132 json!("review:singleton")
4133 );
4134 assert_eq!(
4135 response["error"]["data"]["candidates"],
4136 json!(["rt:review:singleton:0", "rt:review:singleton:1"])
4137 );
4138 Ok(())
4139 }
4140
4141 #[tokio::test]
4142 async fn runtime_id_live_only_resolution_rejects_duplicate_projected_identity()
4143 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4144 let temp_dir = tempfile::tempdir()?;
4145 let runtime = UnifiedRuntime::builder()
4146 .mob_spec(rpc_test_mob_spec(&temp_dir)?)
4147 .module_config(MobKitConfig {
4148 modules: Vec::new(),
4149 discovery: DiscoverySpec {
4150 namespace: "rpc-identity-alias-test".to_string(),
4151 modules: Vec::new(),
4152 },
4153 pre_spawn: Vec::new(),
4154 })
4155 .timeout(Duration::from_secs(1))
4156 .build()
4157 .await?;
4158 for runtime_id in ["rt:review:singleton:0", "rt:review:singleton:1"] {
4159 let mut labels = BTreeMap::new();
4160 labels.insert("agent_identity".to_string(), "review:singleton".to_string());
4161 runtime
4162 .spawn(
4163 SpawnMemberSpec::from_wire(
4164 "worker".to_string(),
4165 runtime_id.to_string(),
4166 Some("You are a duplicate Review Agent.".into()),
4167 None,
4168 None,
4169 )
4170 .with_labels(labels),
4171 )
4172 .await?;
4173 }
4174 let identity_rt = IdentityRuntime::new(IdentityRuntimeConfig {
4175 continuity_store: Arc::new(LocalContinuityStore::in_memory()?),
4176 lease_provider: Arc::new(LocalLeaseProvider::new()),
4177 runtime_instance_id: "rpc-identity-alias-test".to_string(),
4178 has_runtime_store: true,
4179 durability_policy: DurabilityPolicy::SyncWriteThrough,
4180 bridge: None,
4181 default_timeout: None,
4182 });
4183
4184 let err =
4185 resolve_rpc_identity_control_target(&runtime, &identity_rt, "rt:review:singleton:0")
4186 .await
4187 .expect_err("runtime-id live-only fallback should reject duplicate durable alias");
4188 assert!(
4189 err.contains("ambiguous live identity alias review:singleton"),
4190 "unexpected error: {err}"
4191 );
4192
4193 Ok(())
4194 }
4195
4196 #[tokio::test]
4197 async fn durable_resolution_prefers_registered_live_binding_over_stale_duplicates()
4198 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4199 let temp_dir = tempfile::tempdir()?;
4200 let runtime = UnifiedRuntime::builder()
4201 .mob_spec(rpc_test_mob_spec(&temp_dir)?)
4202 .module_config(MobKitConfig {
4203 modules: Vec::new(),
4204 discovery: DiscoverySpec {
4205 namespace: "rpc-identity-alias-test".to_string(),
4206 modules: Vec::new(),
4207 },
4208 pre_spawn: Vec::new(),
4209 })
4210 .timeout(Duration::from_secs(1))
4211 .build()
4212 .await?;
4213 for runtime_id in ["rt:review:singleton:0", "rt:review:singleton:1"] {
4214 let mut labels = BTreeMap::new();
4215 labels.insert("agent_identity".to_string(), "review:singleton".to_string());
4216 runtime
4217 .spawn(
4218 SpawnMemberSpec::from_wire(
4219 "worker".to_string(),
4220 runtime_id.to_string(),
4221 Some("You are a duplicate Review Agent.".into()),
4222 None,
4223 None,
4224 )
4225 .with_labels(labels),
4226 )
4227 .await?;
4228 }
4229 let identity_rt = IdentityRuntime::new(IdentityRuntimeConfig {
4230 continuity_store: Arc::new(LocalContinuityStore::in_memory()?),
4231 lease_provider: Arc::new(LocalLeaseProvider::new()),
4232 runtime_instance_id: "rpc-identity-alias-test".to_string(),
4233 has_runtime_store: true,
4234 durability_policy: DurabilityPolicy::SyncWriteThrough,
4235 bridge: None,
4236 default_timeout: None,
4237 });
4238 let identity = AgentIdentity::parse("review:singleton")?;
4239 let record = ContinuityRecord {
4240 identity: identity.clone(),
4241 agent_runtime_id: AgentRuntimeId::parse("rt:review:singleton:1")?,
4242 session_id: meerkat_core::types::SessionId::new(),
4243 generation: ContinuityGeneration::new(1),
4244 checkpoint_version: CheckpointVersion::new(0),
4245 };
4246 identity_rt
4247 .register(
4248 DurableAgentSpec {
4249 identity,
4250 profile: meerkat_mob::ProfileName::from("worker"),
4251 addressability: AgentAddressability::Addressable,
4252 display_name: None,
4253 labels: BTreeMap::new(),
4254 context: None,
4255 additional_instructions: Vec::new(),
4256 initial_message: None,
4257 runtime_mode_override: None,
4258 },
4259 IdentityLifecycleState::Active,
4260 Some(record),
4261 None,
4262 )
4263 .await;
4264
4265 let target =
4266 resolve_rpc_identity_control_target(&runtime, &identity_rt, "review:singleton").await?;
4267 assert_eq!(target.identity.as_str(), "review:singleton");
4268 assert_eq!(
4269 target
4270 .live
4271 .as_ref()
4272 .map(|alias| alias.runtime_member_id.as_str()),
4273 Some("rt:review:singleton:1")
4274 );
4275
4276 let target =
4277 resolve_rpc_identity_control_target(&runtime, &identity_rt, "rt:review:singleton:1")
4278 .await?;
4279 assert_eq!(
4280 target
4281 .live
4282 .as_ref()
4283 .map(|alias| alias.runtime_member_id.as_str()),
4284 Some("rt:review:singleton:1")
4285 );
4286
4287 Ok(())
4288 }
4289
4290 #[tokio::test]
4291 async fn durable_resolution_rejects_hidden_registered_live_binding()
4292 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4293 let temp_dir = tempfile::tempdir()?;
4294 let runtime = UnifiedRuntime::builder()
4295 .mob_spec(rpc_test_mob_spec(&temp_dir)?)
4296 .module_config(MobKitConfig {
4297 modules: Vec::new(),
4298 discovery: DiscoverySpec {
4299 namespace: "rpc-hidden-bound-test".to_string(),
4300 modules: Vec::new(),
4301 },
4302 pre_spawn: Vec::new(),
4303 })
4304 .timeout(Duration::from_secs(1))
4305 .build()
4306 .await?;
4307 runtime
4308 .spawn(
4309 SpawnMemberSpec::from_wire(
4310 "worker".to_string(),
4311 "rt:review:singleton:0".to_string(),
4312 Some("You are a hidden Review Agent.".into()),
4313 None,
4314 None,
4315 )
4316 .with_labels(BTreeMap::from([
4317 ("agent_identity".to_string(), "review:singleton".to_string()),
4318 ("role".to_string(), "delegate".to_string()),
4319 ("source_mob_id".to_string(), "upstream".to_string()),
4320 ])),
4321 )
4322 .await?;
4323 let identity_rt = IdentityRuntime::new(IdentityRuntimeConfig {
4324 continuity_store: Arc::new(LocalContinuityStore::in_memory()?),
4325 lease_provider: Arc::new(LocalLeaseProvider::new()),
4326 runtime_instance_id: "rpc-hidden-bound-test".to_string(),
4327 has_runtime_store: true,
4328 durability_policy: DurabilityPolicy::SyncWriteThrough,
4329 bridge: None,
4330 default_timeout: None,
4331 });
4332 let identity = AgentIdentity::parse("review:singleton")?;
4333 identity_rt
4334 .register(
4335 DurableAgentSpec {
4336 identity: identity.clone(),
4337 profile: meerkat_mob::ProfileName::from("worker"),
4338 addressability: AgentAddressability::Addressable,
4339 display_name: None,
4340 labels: BTreeMap::new(),
4341 context: None,
4342 additional_instructions: Vec::new(),
4343 initial_message: None,
4344 runtime_mode_override: None,
4345 },
4346 IdentityLifecycleState::Active,
4347 Some(ContinuityRecord {
4348 identity,
4349 agent_runtime_id: AgentRuntimeId::parse("rt:review:singleton:0")?,
4350 session_id: meerkat_core::types::SessionId::new(),
4351 generation: ContinuityGeneration::new(0),
4352 checkpoint_version: CheckpointVersion::new(0),
4353 }),
4354 None,
4355 )
4356 .await;
4357
4358 for requested_identity in ["review:singleton", "rt:review:singleton:0"] {
4359 let err =
4360 resolve_rpc_identity_control_target(&runtime, &identity_rt, requested_identity)
4361 .await
4362 .expect_err("hidden registered live binding must not resolve");
4363 assert!(
4364 err.contains("identity hidden by policy"),
4365 "unexpected error for {requested_identity}: {err}"
4366 );
4367 }
4368
4369 Ok(())
4370 }
4371
4372 #[tokio::test]
4373 async fn live_only_hidden_alias_reports_policy_error()
4374 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4375 let temp_dir = tempfile::tempdir()?;
4376 let runtime = UnifiedRuntime::builder()
4377 .mob_spec(rpc_test_mob_spec(&temp_dir)?)
4378 .module_config(MobKitConfig {
4379 modules: Vec::new(),
4380 discovery: DiscoverySpec {
4381 namespace: "rpc-hidden-live-only-test".to_string(),
4382 modules: Vec::new(),
4383 },
4384 pre_spawn: Vec::new(),
4385 })
4386 .timeout(Duration::from_secs(1))
4387 .build()
4388 .await?;
4389 runtime
4390 .spawn(
4391 SpawnMemberSpec::from_wire(
4392 "worker".to_string(),
4393 "rt:review:singleton:0".to_string(),
4394 Some("You are a hidden Review Agent.".into()),
4395 None,
4396 None,
4397 )
4398 .with_labels(BTreeMap::from([
4399 ("agent_identity".to_string(), "review:singleton".to_string()),
4400 ("role".to_string(), "delegate".to_string()),
4401 ("source_mob_id".to_string(), "upstream".to_string()),
4402 ])),
4403 )
4404 .await?;
4405 let identity_rt = IdentityRuntime::new(IdentityRuntimeConfig {
4406 continuity_store: Arc::new(LocalContinuityStore::in_memory()?),
4407 lease_provider: Arc::new(LocalLeaseProvider::new()),
4408 runtime_instance_id: "rpc-hidden-live-only-test".to_string(),
4409 has_runtime_store: true,
4410 durability_policy: DurabilityPolicy::SyncWriteThrough,
4411 bridge: None,
4412 default_timeout: None,
4413 });
4414
4415 for requested_identity in ["review:singleton", "rt:review:singleton:0"] {
4416 let err =
4417 resolve_rpc_identity_control_target(&runtime, &identity_rt, requested_identity)
4418 .await
4419 .expect_err("hidden live-only alias must not collapse into unknown identity");
4420 assert!(
4421 err.contains("identity hidden by policy"),
4422 "unexpected error for {requested_identity}: {err}"
4423 );
4424 }
4425
4426 let identity_ctx = IdentityFirstContext {
4427 runtime: Arc::new(identity_rt),
4428 roster_provider: Arc::new(EmptyRosterProvider),
4429 topology_provider: None,
4430 customizer: None,
4431 };
4432 for requested_identity in ["review:singleton", "rt:review:singleton:0"] {
4433 let response: Value = serde_json::from_str(
4434 &handle_unified_rpc_json(
4435 &runtime,
4436 &json!({
4437 "jsonrpc": "2.0",
4438 "id": 1,
4439 "method": "mobkit/status_identity",
4440 "params": { "identity": requested_identity },
4441 })
4442 .to_string(),
4443 Duration::from_secs(1),
4444 None,
4445 Some(&identity_ctx),
4446 )
4447 .await,
4448 )?;
4449 assert_eq!(
4450 response["error"]["data"]["kind"],
4451 json!("identity_hidden_by_policy"),
4452 "unexpected hidden response for {requested_identity}: {response:#?}"
4453 );
4454 }
4455
4456 Ok(())
4457 }
4458
4459 #[tokio::test]
4460 async fn live_only_resolution_rejects_runtime_member_bound_to_other_durable_identity()
4461 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4462 let temp_dir = tempfile::tempdir()?;
4463 let runtime = UnifiedRuntime::builder()
4464 .mob_spec(rpc_test_mob_spec(&temp_dir)?)
4465 .module_config(MobKitConfig {
4466 modules: Vec::new(),
4467 discovery: DiscoverySpec {
4468 namespace: "rpc-identity-alias-test".to_string(),
4469 modules: Vec::new(),
4470 },
4471 pre_spawn: Vec::new(),
4472 })
4473 .timeout(Duration::from_secs(1))
4474 .build()
4475 .await?;
4476 let mut labels = BTreeMap::new();
4477 labels.insert("agent_identity".to_string(), "other:singleton".to_string());
4478 runtime
4479 .spawn(
4480 SpawnMemberSpec::from_wire(
4481 "worker".to_string(),
4482 "rt:review:singleton:0".to_string(),
4483 Some("You are a wrong-projected Review Agent.".into()),
4484 None,
4485 None,
4486 )
4487 .with_labels(labels),
4488 )
4489 .await?;
4490
4491 let identity_rt = IdentityRuntime::new(IdentityRuntimeConfig {
4492 continuity_store: Arc::new(LocalContinuityStore::in_memory()?),
4493 lease_provider: Arc::new(LocalLeaseProvider::new()),
4494 runtime_instance_id: "rpc-identity-alias-test".to_string(),
4495 has_runtime_store: true,
4496 durability_policy: DurabilityPolicy::SyncWriteThrough,
4497 bridge: None,
4498 default_timeout: None,
4499 });
4500 let identity = AgentIdentity::parse("review:singleton")?;
4501 let record = ContinuityRecord {
4502 identity: identity.clone(),
4503 agent_runtime_id: AgentRuntimeId::parse("rt:review:singleton:0")?,
4504 session_id: meerkat_core::types::SessionId::new(),
4505 generation: ContinuityGeneration::new(0),
4506 checkpoint_version: CheckpointVersion::new(0),
4507 };
4508 identity_rt
4509 .register(
4510 DurableAgentSpec {
4511 identity: identity.clone(),
4512 profile: meerkat_mob::ProfileName::from("worker"),
4513 addressability: AgentAddressability::Addressable,
4514 display_name: None,
4515 labels: BTreeMap::new(),
4516 context: None,
4517 additional_instructions: Vec::new(),
4518 initial_message: None,
4519 runtime_mode_override: None,
4520 },
4521 IdentityLifecycleState::Active,
4522 Some(record),
4523 Some(LeaseGrant {
4524 identity,
4525 fencing_token: FencingToken::new(1),
4526 ttl: Duration::from_mins(1),
4527 }),
4528 )
4529 .await;
4530
4531 let err = resolve_rpc_identity_control_target(&runtime, &identity_rt, "other:singleton")
4532 .await
4533 .expect_err("wrong-projected live alias must not resolve as live-only");
4534 assert!(
4535 err.contains("identity runtime binding belongs to review:singleton"),
4536 "unexpected error: {err}"
4537 );
4538
4539 Ok(())
4540 }
4541}