1#![deny(missing_docs)]
2#![deny(rustdoc::broken_intra_doc_links)]
3#[cfg(feature = "http")]
46pub(crate) mod http;
47
48pub(crate) mod outbound;
49
50#[cfg(feature = "http")]
51pub(crate) mod outbound_ring;
52
53pub(crate) mod session;
54
55pub(crate) mod workflow;
56
57#[cfg(feature = "governor")]
62pub(crate) mod governor;
63
64#[cfg_attr(not(feature = "http"), allow(dead_code))]
70pub(crate) mod resume_ticket;
71
72pub mod outbound_sink;
73pub use outbound_sink::{OutboundFrameSink, OutboundSinkError};
74
75pub mod sampling;
76pub use sampling::{
77 ModelHint, ModelPreferences, SamplingContent, SamplingMessage, SamplingRequest,
78 SamplingResponse,
79};
80
81pub mod roots;
82pub use roots::Root;
83
84pub mod outbound_ext;
85pub use outbound_ext::McpOutboundExt;
86
87#[cfg(all(feature = "http", feature = "test-fixtures"))]
92pub use http::{handle_dead_leader_orphan_mcp, OrphanOutcome};
93
94#[cfg(all(feature = "http", feature = "bench"))]
95pub use http::encode_sse_frame;
96#[cfg(feature = "bench")]
97pub use outbound_sink::bench_stdio_sink;
98
99#[cfg(all(feature = "http", feature = "bench"))]
104pub fn bench_filter_replay(
105 entries: &[(u64, std::sync::Arc<serde_json::Value>)],
106 since_id: u64,
107) -> Vec<(u64, std::sync::Arc<serde_json::Value>)> {
108 entries
109 .iter()
110 .filter(|(id, _)| *id > since_id)
111 .cloned()
112 .collect()
113}
114
115use async_trait::async_trait;
116use klieo_core::agent::Agent;
117use klieo_core::error::ToolError;
118use klieo_core::llm::ToolDef;
119use klieo_core::tool::{ToolCtx, ToolInvoker};
120use std::sync::Arc;
121use thiserror::Error;
122use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
123use tokio::sync::Mutex;
124use tokio_util::sync::CancellationToken;
125use tracing::warn;
126
127pub(crate) const JSONRPC_PARSE_ERROR: i64 = -32700;
130const JSONRPC_METHOD_NOT_FOUND: i64 = -32601;
131#[cfg(feature = "http")]
132pub(crate) const JSONRPC_INVALID_PARAMS: i64 = -32602;
133pub(crate) const JSONRPC_SERVER_ERROR: i64 = -32000;
134#[cfg(feature = "http")]
138pub(crate) const JSONRPC_RESUME_BUFFER_EXPIRED: i64 = -32011;
139#[cfg(feature = "http")]
143pub(crate) const JSONRPC_RESUME_BUFFER_NOT_FOUND: i64 = -32012;
144#[cfg(feature = "http")]
150pub(crate) const JSONRPC_LEADER_DIED: i64 = -32099;
151#[cfg(feature = "http")]
158pub(crate) const JSONRPC_SESSION_CONFLICT: i64 = -32002;
159#[cfg(feature = "http")]
167pub(crate) const JSONRPC_UNAUTHENTICATED: i64 = -32001;
168
169#[cfg(feature = "http")]
173const _: () = {
174 let codes: [i64; 9] = [
175 JSONRPC_PARSE_ERROR,
176 JSONRPC_METHOD_NOT_FOUND,
177 JSONRPC_INVALID_PARAMS,
178 JSONRPC_SERVER_ERROR,
179 JSONRPC_UNAUTHENTICATED,
180 JSONRPC_RESUME_BUFFER_EXPIRED,
181 JSONRPC_RESUME_BUFFER_NOT_FOUND,
182 JSONRPC_LEADER_DIED,
183 JSONRPC_SESSION_CONFLICT,
184 ];
185 let mut i = 0;
186 while i < codes.len() {
187 let mut j = i + 1;
188 while j < codes.len() {
189 assert!(codes[i] != codes[j], "JSONRPC_* code collision");
190 j += 1;
191 }
192 i += 1;
193 }
194};
195
196pub const LEADER_TTL: std::time::Duration = std::time::Duration::from_secs(5);
203
204pub const MCP_LEADER_KEY_PREFIX: &str = "mcp.";
207
208pub const MCP_PROTOCOL_VERSION: &str = "2025-03-26";
215
216#[derive(Debug, Error)]
218#[non_exhaustive]
219pub enum McpServerError {
220 #[error("io error: {0}")]
222 Io(#[from] std::io::Error),
223 #[error("json decode error: {0}")]
225 Json(#[from] serde_json::Error),
226 #[error("resume window expired (since_id={since_id})")]
229 ResumeBufferExpired {
230 since_id: u64,
232 },
233
234 #[error("no buffered stream for progressToken: {0}")]
237 ResumeBufferNotFound(String),
238
239 #[error("tool error: {0}")]
241 Tool(#[from] ToolError),
242
243 #[error("invalid subject token: {0}")]
254 InvalidSubject(#[source] klieo_core::BusError),
255
256 #[error("bus error: {0}")]
265 Bus(#[source] klieo_core::BusError),
266
267 #[error("outbound request timed out")]
270 OutboundTimeout,
271
272 #[error("client returned error: code={code} message={message}")]
275 ClientReturnedError {
276 code: i64,
278 message: String,
280 },
281
282 #[error("transport closed")]
284 TransportClosed,
285
286 #[error("outbound channel unsupported on this transport")]
289 OutboundUnsupported,
290
291 #[error("outbound serialisation failed: {0}")]
294 OutboundSerialisation(#[source] serde_json::Error),
295
296 #[error("failed to serialise sampling request: {0}")]
298 SamplingSerialise(serde_json::Error),
299
300 #[error("failed to deserialise sampling response: {0}")]
302 SamplingDeserialise(serde_json::Error),
303}
304
305impl From<klieo_core::ServerOutboundError> for McpServerError {
306 fn from(e: klieo_core::ServerOutboundError) -> Self {
307 use klieo_core::ServerOutboundError as E;
308 match e {
309 E::Timeout => McpServerError::OutboundTimeout,
310 E::PeerError { code, message } => McpServerError::ClientReturnedError { code, message },
311 E::TransportClosed => McpServerError::TransportClosed,
312 E::Unsupported => McpServerError::OutboundUnsupported,
313 E::Serialisation(err) => McpServerError::OutboundSerialisation(err),
314 _ => McpServerError::OutboundUnsupported,
318 }
319 }
320}
321
322impl From<klieo_core::BusError> for McpServerError {
323 fn from(e: klieo_core::BusError) -> Self {
324 match e {
325 klieo_core::BusError::Invalid(_) => Self::InvalidSubject(e),
326 other => Self::Bus(other),
327 }
328 }
329}
330
331#[derive(Debug, thiserror::Error)]
333#[non_exhaustive]
334pub enum McpBuildError {
335 #[error("with_cancel_subscription requires build_arc()")]
337 CancelRequiresArc,
338 #[error("at least one invoker required; call add_tools or add_agent before build")]
340 NoInvokers,
341 #[error("duplicate tool name {0:?} across registered invokers")]
343 DuplicateTool(String),
344 #[error(transparent)]
347 RegulatedProfile(#[from] klieo_core::ProfileViolation),
348 #[error("workflow registered without with_hitl(..); call with_hitl before build")]
355 WorkflowWithoutHitl,
356 #[error("workflow registered without with_governor(..); call with_governor before build")]
364 WorkflowWithoutGovernor,
365}
366
367pub struct McpServer {
382 pub(crate) invoker: Arc<dyn ToolInvoker>,
383 tool_ctx_factory: ToolCtxFactory,
384 pub(crate) parent_cancel: CancellationToken,
385 pub(crate) resume_buffer: std::sync::Arc<dyn klieo_core::resume::ResumeBuffer>,
386 pub(crate) pubsub: std::sync::Arc<dyn klieo_core::Pubsub>,
387 pub(crate) cancel_registry: klieo_core::CancelRegistry<String>,
388 #[cfg(feature = "http")]
397 pub(crate) publish_permits: std::sync::Arc<tokio::sync::Semaphore>,
398 pub(crate) leader_registry: Option<klieo_core::LeaderRegistry>,
399 pub(crate) ownership_registry: Option<klieo_core::OwnershipRegistry>,
400 #[cfg_attr(not(feature = "http"), allow(dead_code))]
406 pub(crate) resume_ticket_store: Option<Arc<crate::resume_ticket::ResumeTicketStore>>,
407 #[cfg_attr(not(feature = "http"), allow(dead_code))]
413 pub(crate) workflow_resume_handles:
414 std::collections::HashMap<String, Arc<dyn crate::workflow::WorkflowResumeHandle>>,
415 pub(crate) authenticator: Option<Arc<dyn klieo_auth_common::Authenticator>>,
423 pub(crate) leader_ttl: std::time::Duration,
424 pub(crate) leader_heartbeat_interval: std::time::Duration,
425 pub(crate) max_failover_attempts: u32,
426 pub(crate) kv_reaper_interval: Option<std::time::Duration>,
427 _kv_reaper: Option<klieo_core::KvReaperHandle>,
433 pub(crate) stdio_session: tokio::sync::OnceCell<std::sync::Arc<crate::session::Session>>,
439 #[cfg(feature = "http")]
445 pub(crate) sessions: std::sync::Arc<
446 tokio::sync::RwLock<
447 std::collections::HashMap<uuid::Uuid, std::sync::Arc<crate::session::Session>>,
448 >,
449 >,
450 #[cfg(feature = "http")]
457 pub(crate) max_sessions: usize,
458 #[cfg(feature = "http")]
466 pub(crate) max_sessions_per_principal: usize,
467 #[cfg(feature = "http")]
472 pub(crate) sse_replay_capacity: usize,
473 #[cfg(feature = "http")]
483 pub(crate) principal_counts:
484 std::sync::Arc<tokio::sync::RwLock<std::collections::HashMap<String, usize>>>,
485 #[cfg(feature = "http")]
491 pub(crate) idle_reaper_started: tokio::sync::OnceCell<()>,
492 pub(crate) declare_sampling: bool,
497 pub(crate) stdout_writer: tokio::sync::OnceCell<crate::outbound::SharedWriter>,
504 pub(crate) client_caps: tokio::sync::Mutex<ClientCaps>,
509 #[cfg(feature = "http")]
513 pub(crate) session_idle_timeout: std::time::Duration,
514 #[cfg(feature = "http")]
519 pub(crate) idle_reaper_tick: std::time::Duration,
520 #[cfg(feature = "http")]
526 pub(crate) server_start: std::time::Instant,
527}
528
529#[derive(Default, Debug)]
535pub(crate) struct ClientCaps {
536 pub roots_supported: bool,
540}
541
542pub type AgentContextFactory =
548 Arc<dyn Fn() -> klieo_core::agent::AgentContext + Send + Sync + 'static>;
549
550pub type ToolCtxFactory = Arc<dyn Fn() -> klieo_core::tool::ToolCtx + Send + Sync + 'static>;
559
560fn default_tool_ctx_factory() -> ToolCtxFactory {
564 Arc::new(noop_ctx)
565}
566
567#[cfg(feature = "http")]
597const DEFAULT_PUBLISH_PERMITS: usize = 64;
598
599#[cfg(feature = "http")]
603pub(crate) const DEFAULT_SESSION_IDLE_TIMEOUT: std::time::Duration =
604 std::time::Duration::from_secs(300);
605
606#[cfg(feature = "http")]
612pub(crate) const DEFAULT_MAX_SESSIONS: usize = 1024;
613
614#[cfg(feature = "http")]
618pub(crate) const DEFAULT_MAX_SESSIONS_PER_PRINCIPAL_DIVISOR: usize = 16;
619
620#[cfg(feature = "http")]
626pub(crate) const DEFAULT_SSE_REPLAY_CAPACITY: usize = 256;
627
628#[cfg(feature = "http")]
633pub(crate) fn default_max_sessions_per_principal(max_sessions: usize) -> usize {
634 (max_sessions / DEFAULT_MAX_SESSIONS_PER_PRINCIPAL_DIVISOR).max(1)
635}
636
637#[cfg(feature = "http")]
643pub(crate) const DEFAULT_IDLE_REAPER_TICK: std::time::Duration = std::time::Duration::from_secs(10);
644
645pub struct McpServerBuilder {
653 invokers: Vec<Arc<dyn ToolInvoker>>,
654 parent_cancel: CancellationToken,
655 tool_ctx_factory: ToolCtxFactory,
656 resume_buffer: Option<std::sync::Arc<dyn klieo_core::resume::ResumeBuffer>>,
657 pubsub: Option<std::sync::Arc<dyn klieo_core::Pubsub>>,
658 subscribe_cancels: bool,
659 #[cfg(feature = "http")]
660 publish_permits: Option<usize>,
661 leader_kv: Option<Arc<dyn klieo_core::KvStore>>,
662 tenant_kv: Option<Arc<dyn klieo_core::KvStore>>,
663 checkpoint_kv: Option<Arc<dyn klieo_core::KvStore>>,
672 tenant_strict: bool,
673 profile: klieo_core::DeploymentProfile,
674 authenticator: Option<Arc<dyn klieo_auth_common::Authenticator>>,
675 leader_ttl: Option<std::time::Duration>,
676 leader_heartbeat_interval: Option<std::time::Duration>,
677 max_failover_attempts: Option<u32>,
678 kv_reaper_interval: Option<std::time::Duration>,
679 declare_sampling: bool,
680 #[cfg(feature = "http")]
681 session_idle_timeout: Option<std::time::Duration>,
682 #[cfg(feature = "http")]
683 max_sessions: Option<usize>,
684 #[cfg(feature = "http")]
685 max_sessions_per_principal: Option<usize>,
686 #[cfg(feature = "http")]
687 sse_replay_capacity: Option<usize>,
688 #[cfg(all(feature = "http", any(test, feature = "test-fixtures")))]
694 idle_reaper_tick: Option<std::time::Duration>,
695 hitl: Option<crate::workflow::HitlBundle>,
699 pending_workflows: Vec<crate::workflow::WorkflowRegistration>,
705 governor_bundle: Option<GovernorBundleHolder>,
712}
713
714#[cfg(feature = "governor")]
720pub(crate) type GovernorBundleHolder = crate::governor::GovernorBundle;
721
722#[cfg(not(feature = "governor"))]
727#[derive(Clone)]
728pub(crate) struct GovernorBundleHolder;
729
730impl Default for McpServerBuilder {
731 fn default() -> Self {
732 Self::new()
733 }
734}
735
736fn spawn_reaper_if_configured(
742 interval: Option<std::time::Duration>,
743 leader_registry: Option<&klieo_core::LeaderRegistry>,
744 ownership_registry: Option<&klieo_core::OwnershipRegistry>,
745 resume_buffer: &Arc<dyn klieo_core::resume::ResumeBuffer>,
746) -> Option<klieo_core::KvReaperHandle> {
747 let interval = interval?;
748 let mut buckets: Vec<String> = Vec::new();
749 let kv = if let Some(reg) = leader_registry {
750 buckets.push(reg.bucket().to_string());
751 reg.kv().clone()
752 } else if let Some(reg) = ownership_registry {
753 buckets.push(reg.bucket().to_string());
754 reg.kv().clone()
755 } else {
756 return None;
757 };
758 if let (Some(_), Some(ownership)) = (leader_registry, ownership_registry) {
759 buckets.push(ownership.bucket().to_string());
760 }
761 Some(klieo_core::spawn_kv_reaper(
762 kv,
763 resume_buffer.clone(),
764 buckets,
765 interval,
766 ))
767}
768
769impl McpServerBuilder {
770 pub fn new() -> Self {
773 Self {
774 invokers: Vec::new(),
775 parent_cancel: CancellationToken::new(),
776 tool_ctx_factory: default_tool_ctx_factory(),
777 resume_buffer: None,
778 pubsub: None,
779 subscribe_cancels: false,
780 #[cfg(feature = "http")]
781 publish_permits: None,
782 leader_kv: None,
783 tenant_kv: None,
784 checkpoint_kv: None,
785 tenant_strict: false,
786 profile: klieo_core::DeploymentProfile::Unprofiled,
787 authenticator: None,
788 leader_ttl: None,
789 leader_heartbeat_interval: None,
790 max_failover_attempts: None,
791 kv_reaper_interval: None,
792 declare_sampling: false,
793 #[cfg(feature = "http")]
794 session_idle_timeout: None,
795 #[cfg(feature = "http")]
796 max_sessions: None,
797 #[cfg(feature = "http")]
798 max_sessions_per_principal: None,
799 #[cfg(feature = "http")]
800 sse_replay_capacity: None,
801 #[cfg(all(feature = "http", any(test, feature = "test-fixtures")))]
802 idle_reaper_tick: None,
803 hitl: None,
804 pending_workflows: Vec::new(),
805 governor_bundle: None,
806 }
807 }
808
809 pub fn with_parent_cancel(mut self, parent_cancel: CancellationToken) -> Self {
819 self.parent_cancel = parent_cancel;
820 self
821 }
822
823 pub fn with_tool_ctx_factory(mut self, factory: ToolCtxFactory) -> Self {
830 self.tool_ctx_factory = factory;
831 self
832 }
833
834 #[must_use]
837 pub fn with_resume_buffer(
838 mut self,
839 buffer: std::sync::Arc<dyn klieo_core::resume::ResumeBuffer>,
840 ) -> Self {
841 self.resume_buffer = Some(buffer);
842 self
843 }
844
845 #[must_use]
870 pub fn with_pubsub(mut self, pubsub: std::sync::Arc<dyn klieo_core::Pubsub>) -> Self {
871 self.pubsub = Some(pubsub);
872 self
873 }
874
875 #[must_use]
889 pub fn with_cancel_subscription(mut self) -> Self {
890 self.subscribe_cancels = true;
891 self
892 }
893
894 #[cfg(feature = "http")]
906 #[must_use]
907 pub fn with_publish_concurrency(mut self, permits: usize) -> Self {
908 self.publish_permits = Some(permits);
909 self
910 }
911
912 #[must_use]
923 pub fn with_leader_election(mut self, kv: Arc<dyn klieo_core::KvStore>) -> Self {
924 self.leader_kv = Some(kv);
925 self
926 }
927
928 #[must_use]
941 pub fn with_tenant_binding(mut self, kv: Arc<dyn klieo_core::KvStore>) -> Self {
942 self.tenant_kv = Some(kv);
943 self.tenant_strict = false;
944 self
945 }
946
947 #[must_use]
954 pub fn with_tenant_binding_strict(mut self, kv: Arc<dyn klieo_core::KvStore>) -> Self {
955 self.tenant_kv = Some(kv);
956 self.tenant_strict = true;
957 self
958 }
959
960 #[must_use]
979 pub fn with_checkpoint_kv(mut self, kv: Arc<dyn klieo_core::KvStore>) -> Self {
980 self.checkpoint_kv = Some(kv);
981 self
982 }
983
984 #[must_use]
995 pub fn with_authenticator(
996 mut self,
997 authenticator: Arc<dyn klieo_auth_common::Authenticator>,
998 ) -> Self {
999 self.authenticator = Some(authenticator);
1000 self
1001 }
1002
1003 pub fn profile(mut self, profile: klieo_core::DeploymentProfile) -> Self {
1007 self.profile = profile;
1008 self
1009 }
1010
1011 #[must_use]
1018 pub fn with_leader_ttl(mut self, ttl: std::time::Duration) -> Self {
1019 self.leader_ttl = Some(ttl);
1020 self
1021 }
1022
1023 #[must_use]
1028 pub fn with_leader_heartbeat_interval(mut self, interval: std::time::Duration) -> Self {
1029 self.leader_heartbeat_interval = Some(interval);
1030 self
1031 }
1032
1033 #[must_use]
1038 pub fn with_max_failover_attempts(mut self, cap: u32) -> Self {
1039 self.max_failover_attempts = Some(cap);
1040 self
1041 }
1042
1043 #[must_use]
1059 pub fn with_kv_reaper(mut self, interval: std::time::Duration) -> Self {
1060 self.kv_reaper_interval = Some(interval);
1061 self
1062 }
1063
1064 #[must_use]
1075 pub fn with_client_sampling(mut self) -> Self {
1076 self.declare_sampling = true;
1077 self
1078 }
1079
1080 #[cfg(feature = "http")]
1086 #[must_use]
1087 pub fn with_session_idle_timeout(mut self, ttl: std::time::Duration) -> Self {
1088 self.session_idle_timeout = Some(ttl);
1089 self
1090 }
1091
1092 #[cfg(feature = "http")]
1100 #[must_use]
1101 pub fn with_max_sessions(mut self, cap: usize) -> Self {
1102 assert!(cap > 0, "max_sessions must be > 0");
1103 self.max_sessions = Some(cap);
1104 self
1105 }
1106
1107 #[cfg(feature = "http")]
1121 #[must_use]
1122 pub fn with_max_sessions_per_principal(mut self, cap: usize) -> Self {
1123 assert!(cap > 0, "max_sessions_per_principal must be > 0");
1124 self.max_sessions_per_principal = Some(cap);
1125 self
1126 }
1127
1128 #[cfg(feature = "http")]
1134 #[must_use]
1135 pub fn with_sse_replay_capacity(mut self, capacity: usize) -> Self {
1136 self.sse_replay_capacity = Some(capacity);
1137 self
1138 }
1139
1140 #[cfg(all(feature = "http", any(test, feature = "test-fixtures")))]
1151 #[must_use]
1152 pub fn with_idle_reaper_tick(mut self, tick: std::time::Duration) -> Self {
1153 self.idle_reaper_tick = Some(tick);
1154 self
1155 }
1156
1157 pub fn add_tools(mut self, invoker: Arc<dyn ToolInvoker>) -> Self {
1160 self.invokers.push(invoker);
1161 self
1162 }
1163
1164 pub fn add_agent_with_schema<A>(
1168 mut self,
1169 agent: A,
1170 input_schema: serde_json::Value,
1171 ctx_factory: AgentContextFactory,
1172 ) -> Self
1173 where
1174 A: Agent + 'static,
1175 A::Input: serde::de::DeserializeOwned + Send + 'static,
1176 A::Output: serde::Serialize + Send + 'static,
1177 {
1178 let name = agent.name().to_string();
1179 let invoker: Arc<dyn ToolInvoker> = Arc::new(AgentAsToolInvoker {
1180 agent: Arc::new(agent),
1181 name,
1182 input_schema,
1183 ctx_factory,
1184 #[cfg(feature = "governor")]
1191 governor: self.governor_bundle.clone(),
1192 });
1193 self.invokers.push(invoker);
1194 self
1195 }
1196
1197 #[cfg(feature = "schemars")]
1201 pub fn add_agent<A>(self, agent: A, ctx_factory: AgentContextFactory) -> Self
1202 where
1203 A: Agent + 'static,
1204 A::Input: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
1205 A::Output: serde::Serialize + Send + 'static,
1206 {
1207 let schema = serde_json::to_value(schemars::schema_for!(A::Input))
1208 .expect("schemars::Schema serialises to JSON via #[derive(Serialize)]");
1209 self.add_agent_with_schema(agent, schema, ctx_factory)
1210 }
1211
1212 #[cfg(feature = "governor")]
1229 #[must_use]
1230 pub fn with_governor(
1231 mut self,
1232 governor: Arc<dyn klieo_ops::governor::Governor>,
1233 provider: klieo_ops::ProviderId,
1234 ) -> Self {
1235 self.governor_bundle = Some(crate::governor::GovernorBundle { governor, provider });
1236 self
1237 }
1238
1239 pub fn with_hitl(
1246 mut self,
1247 client: Arc<klieo_hitl_client::HitlClient>,
1248 cfg: Arc<klieo_hitl::HitlConfig>,
1249 ) -> Self {
1250 self.hitl = Some(crate::workflow::HitlBundle { client, cfg });
1251 self
1252 }
1253
1254 pub fn add_workflow_with_schema<A>(
1266 mut self,
1267 agent: A,
1268 system_prompt: impl Into<String>,
1269 input_schema: serde_json::Value,
1270 run_options: klieo_core::runtime::RunOptions,
1271 ctx_factory: AgentContextFactory,
1272 ) -> Self
1273 where
1274 A: Agent + 'static,
1275 A::Input: serde::de::DeserializeOwned + Send + 'static,
1276 {
1277 let name = agent.name().to_string();
1278 let prompt = system_prompt.into();
1279 drop(agent);
1285 let materialise: crate::workflow::WorkflowMaterialiser = Box::new(
1286 move |bundle: crate::workflow::HitlBundle,
1287 ticket_store: Option<Arc<crate::resume_ticket::ResumeTicketStore>>,
1288 governor_bundle: Option<crate::GovernorBundleHolder>| {
1289 #[cfg(not(feature = "governor"))]
1290 let _ = governor_bundle;
1291 let invoker = Arc::new(crate::workflow::WorkflowAsToolInvoker::<A>::new(
1292 name.clone(),
1293 prompt.clone(),
1294 input_schema.clone(),
1295 ctx_factory.clone(),
1296 run_options.clone(),
1297 bundle,
1298 ticket_store,
1299 #[cfg(feature = "governor")]
1300 governor_bundle,
1301 ));
1302 crate::workflow::WorkflowMaterialisation {
1303 name: name.clone(),
1304 resume_handle: invoker.clone()
1305 as Arc<dyn crate::workflow::WorkflowResumeHandle>,
1306 invoker: invoker as Arc<dyn ToolInvoker>,
1307 }
1308 },
1309 );
1310 self.pending_workflows
1311 .push(crate::workflow::WorkflowRegistration { materialise });
1312 self
1313 }
1314
1315 #[cfg(feature = "schemars")]
1319 pub fn add_workflow<A>(
1320 self,
1321 agent: A,
1322 system_prompt: impl Into<String>,
1323 run_options: klieo_core::runtime::RunOptions,
1324 ctx_factory: AgentContextFactory,
1325 ) -> Self
1326 where
1327 A: Agent + 'static,
1328 A::Input: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
1329 {
1330 let schema = serde_json::to_value(schemars::schema_for!(A::Input))
1331 .expect("schemars::Schema serialises to JSON via #[derive(Serialize)]");
1332 self.add_workflow_with_schema(agent, system_prompt, schema, run_options, ctx_factory)
1333 }
1334
1335 pub fn build(self) -> Result<McpServer, McpBuildError> {
1344 if self.subscribe_cancels {
1345 return Err(McpBuildError::CancelRequiresArc);
1346 }
1347 self.build_inner()
1348 }
1349
1350 fn build_inner(mut self) -> Result<McpServer, McpBuildError> {
1351 let ticket_store = self
1352 .checkpoint_kv
1353 .clone()
1354 .map(|kv| Arc::new(crate::resume_ticket::ResumeTicketStore::new(kv)));
1355 let mut workflow_resume_handles: std::collections::HashMap<
1356 String,
1357 Arc<dyn crate::workflow::WorkflowResumeHandle>,
1358 > = std::collections::HashMap::new();
1359 if !self.pending_workflows.is_empty() {
1360 let bundle = self
1361 .hitl
1362 .clone()
1363 .ok_or(McpBuildError::WorkflowWithoutHitl)?;
1364 #[cfg(feature = "governor")]
1373 if self.governor_bundle.is_none() {
1374 return Err(McpBuildError::WorkflowWithoutGovernor);
1375 }
1376 let governor_bundle = self.governor_bundle.clone();
1381 let pending = std::mem::take(&mut self.pending_workflows);
1382 for reg in pending {
1383 let mat = (reg.materialise)(
1384 bundle.clone(),
1385 ticket_store.clone(),
1386 governor_bundle.clone(),
1387 );
1388 if workflow_resume_handles
1389 .insert(mat.name.clone(), mat.resume_handle)
1390 .is_some()
1391 {
1392 return Err(McpBuildError::DuplicateTool(mat.name));
1393 }
1394 self.invokers.push(mat.invoker);
1395 }
1396 }
1397 let invoker_count = self.invokers.len();
1398 if invoker_count == 0 {
1399 return Err(McpBuildError::NoInvokers);
1400 }
1401 let invoker: Arc<dyn ToolInvoker> = if invoker_count == 1 {
1402 self.invokers.into_iter().next().unwrap() } else {
1404 Arc::new(MergedInvoker::new(self.invokers)?)
1405 };
1406 #[cfg(feature = "http")]
1407 let permits = self.publish_permits.unwrap_or(DEFAULT_PUBLISH_PERMITS);
1408 #[cfg(feature = "http")]
1409 let max_sessions = self.max_sessions.unwrap_or(DEFAULT_MAX_SESSIONS);
1410 #[cfg(feature = "http")]
1411 let max_sessions_per_principal = self
1412 .max_sessions_per_principal
1413 .unwrap_or_else(|| default_max_sessions_per_principal(max_sessions));
1414 let leader_registry = self.leader_kv.map(|kv| {
1415 klieo_core::LeaderRegistry::new(
1416 kv,
1417 "klieo-leaders".into(),
1418 uuid::Uuid::new_v4().to_string(),
1419 )
1420 });
1421 let profile = self.profile;
1422 profile.validate(
1423 self.tenant_kv.is_some(),
1424 self.authenticator.as_ref().map(|a| a.allows_anonymous()),
1425 )?;
1426 let tenant_strict = self.tenant_strict || profile.requires_strict_binding();
1427 let ownership_registry = self.tenant_kv.map(|kv| {
1428 let bucket = "klieo-tenants".into();
1429 if tenant_strict {
1430 klieo_core::OwnershipRegistry::new_strict(kv, bucket)
1431 } else {
1432 klieo_core::OwnershipRegistry::new(kv, bucket)
1433 }
1434 });
1435 if profile.requires_strict_binding() || profile.requires_named_principal() {
1436 tracing::warn!(
1437 target: "klieo.security",
1438 cwe = 639,
1439 "regulated multi-tenant profile active on this replica; \
1440 cross-replica tenant isolation assumes ALL replicas run the \
1441 same profile — a lenient peer reintroduces CWE-639. Fleet \
1442 homogeneity is NOT verified by this replica."
1443 );
1444 }
1445 let resume_buffer = self
1446 .resume_buffer
1447 .unwrap_or_else(|| std::sync::Arc::new(klieo_core::resume::NoopResumeBuffer));
1448 let leader_ttl = self.leader_ttl.unwrap_or(LEADER_TTL);
1449 let leader_heartbeat_interval = self.leader_heartbeat_interval.unwrap_or(leader_ttl / 2);
1450 let max_failover_attempts = self
1451 .max_failover_attempts
1452 .unwrap_or(klieo_core::FAILOVER_ATTEMPT_CAP);
1453 let kv_reaper = spawn_reaper_if_configured(
1454 self.kv_reaper_interval,
1455 leader_registry.as_ref(),
1456 ownership_registry.as_ref(),
1457 &resume_buffer,
1458 );
1459 Ok(McpServer {
1460 invoker,
1461 tool_ctx_factory: self.tool_ctx_factory,
1462 parent_cancel: self.parent_cancel,
1463 resume_buffer,
1464 pubsub: self
1465 .pubsub
1466 .unwrap_or_else(|| klieo_bus_memory::MemoryBus::new().pubsub.clone()),
1467 cancel_registry: klieo_core::CancelRegistry::new(),
1468 #[cfg(feature = "http")]
1469 publish_permits: std::sync::Arc::new(tokio::sync::Semaphore::new(permits)),
1470 leader_registry,
1471 ownership_registry,
1472 resume_ticket_store: ticket_store,
1473 workflow_resume_handles,
1474 authenticator: self.authenticator,
1475 leader_ttl,
1476 leader_heartbeat_interval,
1477 max_failover_attempts,
1478 kv_reaper_interval: self.kv_reaper_interval,
1479 _kv_reaper: kv_reaper,
1480 stdio_session: tokio::sync::OnceCell::new(),
1484 #[cfg(feature = "http")]
1488 sessions: std::sync::Arc::new(tokio::sync::RwLock::new(
1489 std::collections::HashMap::new(),
1490 )),
1491 #[cfg(feature = "http")]
1492 max_sessions,
1493 #[cfg(feature = "http")]
1494 max_sessions_per_principal,
1495 #[cfg(feature = "http")]
1496 sse_replay_capacity: self
1497 .sse_replay_capacity
1498 .unwrap_or(DEFAULT_SSE_REPLAY_CAPACITY),
1499 #[cfg(feature = "http")]
1500 principal_counts: std::sync::Arc::new(tokio::sync::RwLock::new(
1501 std::collections::HashMap::new(),
1502 )),
1503 #[cfg(feature = "http")]
1507 idle_reaper_started: tokio::sync::OnceCell::new(),
1508 declare_sampling: self.declare_sampling,
1509 stdout_writer: tokio::sync::OnceCell::new(),
1513 client_caps: tokio::sync::Mutex::new(ClientCaps::default()),
1515 #[cfg(feature = "http")]
1516 session_idle_timeout: self
1517 .session_idle_timeout
1518 .unwrap_or(DEFAULT_SESSION_IDLE_TIMEOUT),
1519 #[cfg(all(feature = "http", any(test, feature = "test-fixtures")))]
1520 idle_reaper_tick: self.idle_reaper_tick.unwrap_or(DEFAULT_IDLE_REAPER_TICK),
1521 #[cfg(all(feature = "http", not(any(test, feature = "test-fixtures"))))]
1522 idle_reaper_tick: DEFAULT_IDLE_REAPER_TICK,
1523 #[cfg(feature = "http")]
1527 server_start: std::time::Instant::now(),
1528 })
1529 }
1530
1531 pub fn build_arc(self) -> Result<std::sync::Arc<McpServer>, McpBuildError> {
1539 let spawn_subscriber = self.subscribe_cancels;
1540 let server = std::sync::Arc::new(self.build_inner()?);
1541 if spawn_subscriber {
1542 klieo_core::cancel::spawn_wildcard_cancel_subscriber(
1543 server.pubsub.clone(),
1544 "klieo.mcp.cancel.>".to_string(),
1545 "klieo.mcp.cancel.".to_string(),
1546 server.cancel_registry.clone(),
1547 "mcp.cancel",
1548 );
1549 }
1550 Ok(server)
1551 }
1552}
1553
1554impl McpServer {
1555 pub fn builder() -> McpServerBuilder {
1564 McpServerBuilder::new()
1565 }
1566
1567 pub fn leader_registry(&self) -> Option<&klieo_core::LeaderRegistry> {
1571 self.leader_registry.as_ref()
1572 }
1573
1574 pub fn ownership_registry(&self) -> Option<&klieo_core::OwnershipRegistry> {
1578 self.ownership_registry.as_ref()
1579 }
1580
1581 pub fn authenticator(&self) -> Option<&Arc<dyn klieo_auth_common::Authenticator>> {
1583 self.authenticator.as_ref()
1584 }
1585
1586 pub fn leader_ttl(&self) -> std::time::Duration {
1590 self.leader_ttl
1591 }
1592
1593 pub fn leader_heartbeat_interval(&self) -> std::time::Duration {
1598 self.leader_heartbeat_interval
1599 }
1600
1601 pub fn max_failover_attempts(&self) -> u32 {
1606 self.max_failover_attempts
1607 }
1608
1609 pub fn kv_reaper_interval(&self) -> Option<std::time::Duration> {
1613 self.kv_reaper_interval
1614 }
1615
1616 #[cfg(feature = "http")]
1623 pub async fn session_ids(&self) -> Vec<uuid::Uuid> {
1624 self.sessions.read().await.keys().copied().collect()
1625 }
1626
1627 #[cfg(feature = "http")]
1634 pub async fn is_session_closed_by_id(&self, id: uuid::Uuid) -> Option<bool> {
1635 self.sessions.read().await.get(&id).map(|s| s.is_closed())
1636 }
1637
1638 #[cfg(feature = "http")]
1643 pub(crate) fn sse_replay_enabled(&self) -> bool {
1644 self.sse_replay_capacity > 0
1645 }
1646
1647 #[cfg(feature = "http")]
1657 pub(crate) async fn decrement_principal_count(&self, principal: Option<&str>) {
1658 let Some(principal) = principal else { return };
1659 let mut counts = self.principal_counts.write().await;
1660 if let Some(entry) = counts.get_mut(principal) {
1661 *entry = entry.saturating_sub(1);
1662 if *entry == 0 {
1663 counts.remove(principal);
1664 }
1665 }
1666 }
1667
1668 pub fn outbound(&self) -> Option<Arc<dyn klieo_core::ServerOutbound>> {
1680 self.stdio_session
1681 .get()
1682 .and_then(|session| session.outbound.get())
1683 .map(|o| o.clone() as Arc<dyn klieo_core::ServerOutbound>)
1684 }
1685
1686 #[cfg(all(feature = "http", feature = "test-fixtures"))]
1710 pub async fn client_roots_for_session(&self, session_id: uuid::Uuid) -> Option<Vec<Root>> {
1711 let sessions = self.sessions.read().await;
1712 let session = sessions.get(&session_id)?;
1713 let cache = session.roots_cache.get()?;
1714 Some(cache.snapshot())
1715 }
1716
1717 #[cfg(all(feature = "http", feature = "test-fixtures"))]
1725 pub async fn outbound_for_session(
1726 &self,
1727 session_id: uuid::Uuid,
1728 ) -> Option<Arc<dyn klieo_core::ServerOutbound>> {
1729 let sessions = self.sessions.read().await;
1730 let session = sessions.get(&session_id)?;
1731 let outbound = session.outbound.get()?.clone();
1732 Some(outbound as Arc<dyn klieo_core::ServerOutbound>)
1733 }
1734
1735 #[cfg(all(feature = "http", feature = "test-fixtures"))]
1741 pub fn sse_replay_capacity(&self) -> usize {
1742 self.sse_replay_capacity
1743 }
1744
1745 #[cfg(all(feature = "http", feature = "test-fixtures"))]
1750 pub async fn sse_replay_snapshot(
1751 &self,
1752 session_id: uuid::Uuid,
1753 ) -> Option<Vec<(u64, std::sync::Arc<serde_json::Value>)>> {
1754 let sessions = self.sessions.read().await;
1755 let session = sessions.get(&session_id)?;
1756 let buffer = session.sse_replay_buffer.lock();
1757 Some(buffer.iter().cloned().collect())
1758 }
1759
1760 #[cfg(all(feature = "http", feature = "test-fixtures"))]
1770 pub async fn emit_test_notification(
1771 &self,
1772 session_id: uuid::Uuid,
1773 method: &str,
1774 payload_bytes: usize,
1775 ) -> Result<(), ()> {
1776 let session = {
1777 let sessions = self.sessions.read().await;
1778 sessions.get(&session_id).cloned()
1779 };
1780 let Some(outbound) = session.as_ref().and_then(|s| s.outbound.get()) else {
1781 return Err(());
1782 };
1783 outbound
1784 .send_notification_frame(method, payload_bytes)
1785 .await
1786 .map_err(|_| ())
1787 }
1788
1789 pub fn client_roots(&self) -> Vec<Root> {
1799 self.stdio_session
1800 .get()
1801 .and_then(|session| session.roots_cache.get())
1802 .map(|c| c.snapshot())
1803 .unwrap_or_default()
1804 }
1805
1806 pub fn subscribe_root_changes(&self) -> Option<tokio::sync::watch::Receiver<Vec<Root>>> {
1816 self.stdio_session
1817 .get()
1818 .and_then(|session| session.roots_cache.get())
1819 .map(|c| c.subscribe())
1820 }
1821
1822 pub fn expose_tools(invoker: Arc<dyn ToolInvoker>) -> Self {
1826 Self::builder()
1827 .add_tools(invoker)
1828 .build()
1829 .expect("single-invoker build cannot fail")
1830 }
1831
1832 pub fn expose_agent_with_schema<A>(
1852 agent: A,
1853 input_schema: serde_json::Value,
1854 ctx_factory: AgentContextFactory,
1855 ) -> Self
1856 where
1857 A: Agent + 'static,
1858 A::Input: serde::de::DeserializeOwned + Send + 'static,
1859 A::Output: serde::Serialize + Send + 'static,
1860 {
1861 Self::builder()
1862 .add_agent_with_schema(agent, input_schema, ctx_factory)
1863 .build()
1864 .expect("single-invoker build cannot fail")
1865 }
1866
1867 #[cfg(feature = "schemars")]
1872 pub fn expose_agent<A>(agent: A, ctx_factory: AgentContextFactory) -> Self
1873 where
1874 A: Agent + 'static,
1875 A::Input: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
1876 A::Output: serde::Serialize + Send + 'static,
1877 {
1878 Self::builder()
1879 .add_agent(agent, ctx_factory)
1880 .build()
1881 .expect("single-invoker build cannot fail")
1882 }
1883
1884 #[cfg(not(feature = "governor"))]
1897 pub fn expose_workflow_with_schema<A>(
1898 agent: A,
1899 system_prompt: impl Into<String>,
1900 input_schema: serde_json::Value,
1901 run_options: klieo_core::runtime::RunOptions,
1902 hitl_client: Arc<klieo_hitl_client::HitlClient>,
1903 hitl_cfg: Arc<klieo_hitl::HitlConfig>,
1904 ctx_factory: AgentContextFactory,
1905 ) -> Result<Self, McpBuildError>
1906 where
1907 A: Agent + 'static,
1908 A::Input: serde::de::DeserializeOwned + Send + 'static,
1909 {
1910 Self::builder()
1911 .with_hitl(hitl_client, hitl_cfg)
1912 .add_workflow_with_schema(agent, system_prompt, input_schema, run_options, ctx_factory)
1913 .build()
1914 }
1915
1916 #[cfg(all(feature = "schemars", not(feature = "governor")))]
1921 pub fn expose_workflow<A>(
1922 agent: A,
1923 system_prompt: impl Into<String>,
1924 run_options: klieo_core::runtime::RunOptions,
1925 hitl_client: Arc<klieo_hitl_client::HitlClient>,
1926 hitl_cfg: Arc<klieo_hitl::HitlConfig>,
1927 ctx_factory: AgentContextFactory,
1928 ) -> Result<Self, McpBuildError>
1929 where
1930 A: Agent + 'static,
1931 A::Input: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
1932 {
1933 Self::builder()
1934 .with_hitl(hitl_client, hitl_cfg)
1935 .add_workflow(agent, system_prompt, run_options, ctx_factory)
1936 .build()
1937 }
1938
1939 pub fn invoker(&self) -> &std::sync::Arc<dyn ToolInvoker> {
1945 &self.invoker
1946 }
1947
1948 pub fn resume_buffer(&self) -> &std::sync::Arc<dyn klieo_core::resume::ResumeBuffer> {
1950 &self.resume_buffer
1951 }
1952
1953 pub fn pubsub(&self) -> &std::sync::Arc<dyn klieo_core::Pubsub> {
1956 &self.pubsub
1957 }
1958
1959 pub fn cancel_registry(&self) -> &klieo_core::CancelRegistry<String> {
1969 &self.cancel_registry
1970 }
1971
1972 pub async fn publish_cancel(&self, progress_token: &str) -> Result<(), McpServerError> {
1998 klieo_core::cancel::publish_cancel_signal(
2004 &self.pubsub,
2005 "klieo.mcp.cancel.",
2006 progress_token,
2007 )
2008 .await?;
2009 Ok(())
2010 }
2011
2012 #[cfg(feature = "http")]
2017 pub(crate) fn tool_ctx_with_progress(
2018 &self,
2019 progress: tokio::sync::broadcast::Sender<klieo_core::AgentEvent>,
2020 cancel: tokio_util::sync::CancellationToken,
2021 caller_principal: Option<String>,
2022 parent_anchor: Option<String>,
2023 ) -> klieo_core::tool::ToolCtx {
2024 let mut ctx = (self.tool_ctx_factory)()
2025 .with_progress(progress)
2026 .with_cancel(cancel);
2027 if let Some(principal) = caller_principal {
2028 ctx = ctx.with_caller_principal(principal);
2029 }
2030 if let Some(anchor) = parent_anchor {
2031 ctx = ctx.with_parent_anchor(anchor);
2032 }
2033 ctx
2034 }
2035
2036 pub async fn serve_stdio(self: Arc<Self>) -> Result<(), McpServerError> {
2046 let stdin = tokio::io::stdin();
2047 let stdout: outbound::SharedWriter = Arc::new(Mutex::new(tokio::io::stdout()));
2048 self.serve_with_streams(stdin, stdout).await
2049 }
2050
2051 pub async fn serve_with_streams<R>(
2073 self: Arc<Self>,
2074 reader: R,
2075 writer: outbound::SharedWriter,
2076 ) -> Result<(), McpServerError>
2077 where
2078 R: tokio::io::AsyncRead + Unpin,
2079 {
2080 let stdout = self.ensure_stdout_writer_with(writer).await;
2081 self.ensure_outbound_and_roots().await;
2082 let mut lines = BufReader::new(reader).lines();
2083
2084 while let Some(line) = lines.next_line().await? {
2085 if line.trim().is_empty() {
2086 continue;
2087 }
2088 self.dispatch_stdio_line(line, stdout.clone());
2089 }
2090 Ok(())
2091 }
2092
2093 fn dispatch_stdio_line(self: &Arc<Self>, line: String, writer: outbound::SharedWriter) {
2101 let server = self.clone();
2102 tokio::spawn(async move {
2103 if let Err(error) = server.process_stdio_line(&line, &writer).await {
2104 warn!(error = ?error, "stdio dispatch task failed");
2105 }
2106 });
2107 }
2108
2109 async fn ensure_stdout_writer_with(
2117 &self,
2118 writer: outbound::SharedWriter,
2119 ) -> outbound::SharedWriter {
2120 let _ = self.stdout_writer.set(writer);
2125 self.stdout_writer
2126 .get()
2127 .expect("stdout_writer populated above")
2128 .clone()
2129 }
2130
2131 async fn ensure_outbound_and_roots(&self) {
2137 if !self.declare_sampling {
2138 return;
2139 }
2140 let writer = self
2143 .stdout_writer
2144 .get()
2145 .expect("serve_with_streams primes stdout_writer before ensure_outbound_and_roots")
2146 .clone();
2147 let session = self
2148 .stdio_session
2149 .get_or_init(|| async { std::sync::Arc::new(crate::session::Session::new_stdio()) })
2150 .await
2151 .clone();
2152 let outbound = session
2153 .outbound
2154 .get_or_init(|| async {
2155 let sink: Arc<dyn OutboundFrameSink> =
2156 Arc::new(crate::outbound_sink::StdioFrameSink::new(writer.clone()));
2157 Arc::new(crate::outbound::OutboundRequests::new(sink))
2158 })
2159 .await
2160 .clone();
2161 let _ = session
2162 .roots_cache
2163 .get_or_init(|| async {
2164 let outbound: Arc<dyn klieo_core::ServerOutbound> = outbound.clone();
2165 Arc::new(crate::roots::RootsCache::new(outbound))
2166 })
2167 .await;
2168 }
2169
2170 async fn process_stdio_line(
2173 &self,
2174 line: &str,
2175 writer: &outbound::SharedWriter,
2176 ) -> Result<(), McpServerError> {
2177 let parsed: serde_json::Value = match serde_json::from_str(line) {
2178 Ok(value) => value,
2179 Err(error) => {
2180 warn!(error = %error, "rejected malformed JSON-RPC frame");
2182 let envelope = rpc_error(None, JSONRPC_PARSE_ERROR, "malformed JSON-RPC frame");
2183 return write_frame(writer, &envelope).await;
2184 }
2185 };
2186 let stdio_session = self.stdio_session.get();
2187 match classify_inbound(&parsed) {
2188 InboundKind::Request => {
2189 let envelope = self.handle_jsonrpc(parsed, stdio_session).await;
2190 write_frame(writer, &envelope).await
2191 }
2192 InboundKind::Notification => {
2193 self.handle_jsonrpc(parsed, stdio_session).await;
2195 Ok(())
2196 }
2197 InboundKind::OutboundResponse(id) => {
2198 self.route_outbound_response(id, parsed).await;
2199 Ok(())
2200 }
2201 InboundKind::Unparseable => {
2202 warn!("rejected inbound frame: no method and no id");
2203 Ok(())
2204 }
2205 }
2206 }
2207
2208 async fn route_outbound_response(&self, id: i64, frame: serde_json::Value) {
2215 if let Some(outbound) = self
2216 .stdio_session
2217 .get()
2218 .and_then(|session| session.outbound.get())
2219 {
2220 outbound.complete_pending(id, frame).await;
2221 } else {
2222 warn!(
2223 rpc_id = id,
2224 "outbound response received but server has no outbound table wired"
2225 );
2226 }
2227 }
2228
2229 #[cfg(test)]
2236 async fn handle_line(&self, line: &str) -> serde_json::Value {
2237 let req: serde_json::Value = match serde_json::from_str(line) {
2238 Ok(v) => v,
2239 Err(e) => {
2240 warn!(error = %e, "rejected malformed JSON-RPC frame");
2241 return rpc_error(None, JSONRPC_PARSE_ERROR, "malformed JSON-RPC frame");
2242 }
2243 };
2244 self.handle_jsonrpc(req, self.stdio_session.get()).await
2245 }
2246
2247 pub(crate) async fn handle_jsonrpc(
2261 &self,
2262 req: serde_json::Value,
2263 session: Option<&std::sync::Arc<crate::session::Session>>,
2264 ) -> serde_json::Value {
2265 let id = req.get("id").cloned();
2266 let method = req.get("method").and_then(|m| m.as_str()).unwrap_or("");
2267
2268 match method {
2269 "initialize" => rpc_ok(id, self.handle_initialize(&req).await),
2270 "notifications/initialized" => {
2271 self.handle_initialized_notification(session).await;
2272 serde_json::Value::Null
2273 }
2274 "notifications/roots/list_changed" => {
2275 self.handle_roots_list_changed_notification(session);
2276 serde_json::Value::Null
2277 }
2278 "shutdown" => rpc_ok(id, serde_json::Value::Null),
2279 "tools/list" => rpc_ok(id, self.tools_list()),
2280 "tools/call" => match self.tools_call(req.get("params")).await {
2281 Ok(v) => rpc_ok(id, v),
2282 Err(e) => tool_error_to_envelope(id, e),
2283 },
2284 other => {
2285 warn!(rpc_id = ?id, method = other, "method not found");
2286 rpc_error(
2287 id,
2288 JSONRPC_METHOD_NOT_FOUND,
2289 &format!("method not found: {other}"),
2290 )
2291 }
2292 }
2293 }
2294
2295 async fn handle_initialize(&self, req: &serde_json::Value) -> serde_json::Value {
2302 let roots_supported = req.pointer("/params/capabilities/roots").is_some();
2303 {
2304 let mut caps = self.client_caps.lock().await;
2305 caps.roots_supported = roots_supported;
2306 }
2307 if self.declare_sampling {
2308 initialize_result_with_sampling()
2309 } else {
2310 initialize_result_without_sampling()
2311 }
2312 }
2313
2314 async fn handle_initialized_notification(
2326 &self,
2327 session: Option<&std::sync::Arc<crate::session::Session>>,
2328 ) {
2329 let roots_supported = self.client_caps.lock().await.roots_supported;
2330 if !roots_supported {
2331 return;
2332 }
2333 let Some(cache) = session
2334 .and_then(|session| session.roots_cache.get())
2335 .cloned()
2336 else {
2337 return;
2338 };
2339 tokio::spawn(async move {
2340 if let Err(error) = cache.refresh().await {
2341 warn!(error = ?error, "initial roots/list fetch failed");
2342 }
2343 });
2344 }
2345
2346 fn handle_roots_list_changed_notification(
2359 &self,
2360 session: Option<&std::sync::Arc<crate::session::Session>>,
2361 ) {
2362 let Some(cache) = session
2363 .and_then(|session| session.roots_cache.get())
2364 .cloned()
2365 else {
2366 return;
2367 };
2368 tokio::spawn(async move {
2369 if let Err(error) = cache.refresh().await {
2370 warn!(error = ?error, "roots list_changed re-fetch failed");
2371 }
2372 });
2373 }
2374
2375 fn tools_list(&self) -> serde_json::Value {
2376 let tools: Vec<serde_json::Value> = self
2377 .invoker
2378 .catalogue()
2379 .iter()
2380 .map(tool_def_to_mcp_descriptor)
2381 .collect();
2382 serde_json::json!({ "tools": tools })
2383 }
2384
2385 async fn tools_call(
2386 &self,
2387 params: Option<&serde_json::Value>,
2388 ) -> Result<serde_json::Value, ToolError> {
2389 let params = params.ok_or_else(|| ToolError::InvalidArgs("missing params".into()))?;
2390 let name = params
2391 .get("name")
2392 .and_then(|n| n.as_str())
2393 .ok_or_else(|| ToolError::InvalidArgs("missing tool name".into()))?;
2394 let args = params
2395 .get("arguments")
2396 .cloned()
2397 .unwrap_or(serde_json::Value::Null);
2398 let ctx = (self.tool_ctx_factory)().with_cancel(self.parent_cancel.child_token());
2399 let out = self.invoker.invoke(name, args, ctx).await?;
2400 Ok(serde_json::json!({
2401 "content": [
2402 { "type": "text", "text": out.to_string() }
2403 ]
2404 }))
2405 }
2406}
2407
2408struct AgentAsToolInvoker<A>
2415where
2416 A: Agent + 'static,
2417 A::Input: serde::de::DeserializeOwned + Send + 'static,
2418 A::Output: serde::Serialize + Send + 'static,
2419{
2420 agent: Arc<A>,
2421 name: String,
2422 input_schema: serde_json::Value,
2423 ctx_factory: AgentContextFactory,
2424 #[cfg(feature = "governor")]
2430 governor: Option<crate::governor::GovernorBundle>,
2431}
2432
2433#[async_trait]
2434impl<A> ToolInvoker for AgentAsToolInvoker<A>
2435where
2436 A: Agent + 'static,
2437 A::Input: serde::de::DeserializeOwned + Send + 'static,
2438 A::Output: serde::Serialize + Send + 'static,
2439{
2440 fn catalogue(&self) -> Vec<ToolDef> {
2441 vec![ToolDef::new(
2442 self.name.clone(),
2443 format!("klieo agent: {}", self.name),
2444 self.input_schema.clone(),
2445 )]
2446 }
2447
2448 async fn invoke(
2449 &self,
2450 name: &str,
2451 args: serde_json::Value,
2452 tool_ctx: ToolCtx,
2453 ) -> Result<serde_json::Value, ToolError> {
2454 if name != self.name {
2455 return Err(ToolError::UnknownTool(name.into()));
2456 }
2457 let input: A::Input = serde_json::from_value(args).map_err(|e| {
2464 warn!(agent = %self.name, error = %e, "decode of MCP tools/call args failed");
2465 ToolError::InvalidArgs("arguments do not match inputSchema".into())
2466 })?;
2467 let mut ctx = (self.ctx_factory)();
2468 ctx.cancel = tool_ctx.cancel.child_token();
2473 ctx.progress = tool_ctx.progress.clone();
2474 if let Some(principal) = tool_ctx.caller_principal.as_ref() {
2479 ctx = ctx.with_tenant_label(klieo_core::principal_hash(principal.as_str()));
2480 }
2481 if let Some(anchor) = tool_ctx.parent_anchor.as_ref() {
2486 ctx = ctx.with_parent_anchor(anchor.as_str().to_string());
2487 }
2488 #[cfg(feature = "governor")]
2492 if let Some(bundle) = self.governor.as_ref() {
2493 ctx = crate::governor::wrap_ctx_with_governor(ctx, bundle);
2494 }
2495 let output = self.agent.run(ctx, input).await.map_err(|e| {
2496 warn!(agent = %self.name, error = %e, "exposed agent execution failed");
2497 ToolError::Permanent("agent execution failed".into())
2498 })?;
2499 serde_json::to_value(output).map_err(|e| {
2500 warn!(agent = %self.name, error = %e, "encode of agent output failed");
2501 ToolError::Permanent("agent output not serialisable".into())
2502 })
2503 }
2504}
2505
2506struct MergedInvoker {
2521 inner: Vec<Arc<dyn ToolInvoker>>,
2522 routes: std::collections::HashMap<String, usize>,
2523 merged_catalogue: Vec<ToolDef>,
2524}
2525
2526impl MergedInvoker {
2527 fn new(inner: Vec<Arc<dyn ToolInvoker>>) -> Result<Self, McpBuildError> {
2528 let mut routes: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
2529 let mut merged_catalogue: Vec<ToolDef> = Vec::new();
2530 for (index, invoker) in inner.iter().enumerate() {
2531 for tool in invoker.catalogue() {
2532 if routes.insert(tool.name.clone(), index).is_some() {
2533 return Err(McpBuildError::DuplicateTool(tool.name));
2534 }
2535 merged_catalogue.push(tool);
2536 }
2537 }
2538 Ok(Self {
2539 inner,
2540 routes,
2541 merged_catalogue,
2542 })
2543 }
2544}
2545
2546#[async_trait]
2547impl ToolInvoker for MergedInvoker {
2548 fn catalogue(&self) -> Vec<ToolDef> {
2549 self.merged_catalogue.clone()
2550 }
2551
2552 async fn invoke(
2553 &self,
2554 name: &str,
2555 args: serde_json::Value,
2556 ctx: ToolCtx,
2557 ) -> Result<serde_json::Value, ToolError> {
2558 match self.routes.get(name) {
2559 Some(&index) => self.inner[index].invoke(name, args, ctx).await,
2560 None => Err(ToolError::UnknownTool(name.into())),
2561 }
2562 }
2563
2564 fn is_tool_idempotent(&self, name: &str) -> bool {
2570 match self.routes.get(name) {
2571 Some(&index) => self.inner[index].is_tool_idempotent(name),
2572 None => false,
2573 }
2574 }
2575
2576 fn tool_redacts_audit(&self, name: &str) -> bool {
2581 match self.routes.get(name) {
2582 Some(&index) => self.inner[index].tool_redacts_audit(name),
2583 None => false,
2584 }
2585 }
2586}
2587
2588pub(crate) fn tool_error_to_envelope(
2597 id: Option<serde_json::Value>,
2598 e: ToolError,
2599) -> serde_json::Value {
2600 let stable_msg = match &e {
2601 ToolError::UnknownTool(name) => {
2602 warn!(rpc_id = ?id, tool = %name, "tools/call: unknown tool");
2603 format!("unknown tool: {name}")
2604 }
2605 ToolError::InvalidArgs(reason) => {
2606 warn!(rpc_id = ?id, reason = %reason, "tools/call: invalid args");
2607 reason.clone()
2608 }
2609 _ => {
2610 warn!(rpc_id = ?id, error = %e, "tools/call failed");
2611 "tool invocation failed".into()
2612 }
2613 };
2614 rpc_error(id, JSONRPC_SERVER_ERROR, &stable_msg)
2615}
2616
2617fn tool_def_to_mcp_descriptor(def: &ToolDef) -> serde_json::Value {
2618 serde_json::json!({
2619 "name": def.name,
2620 "description": def.description,
2621 "inputSchema": def.json_schema,
2622 })
2623}
2624
2625fn initialize_result_with_sampling() -> serde_json::Value {
2626 initialize_result_inner(true)
2627}
2628
2629fn initialize_result_without_sampling() -> serde_json::Value {
2630 initialize_result_inner(false)
2631}
2632
2633fn initialize_result_inner(with_sampling: bool) -> serde_json::Value {
2639 let mut capabilities = serde_json::json!({ "tools": {} });
2640 if with_sampling {
2641 capabilities["sampling"] = serde_json::json!({});
2642 }
2643 serde_json::json!({
2644 "protocolVersion": MCP_PROTOCOL_VERSION,
2645 "capabilities": capabilities,
2646 "serverInfo": { "name": "klieo-mcp-server", "version": env!("CARGO_PKG_VERSION") }
2647 })
2648}
2649
2650pub(crate) fn rpc_ok(id: Option<serde_json::Value>, result: serde_json::Value) -> serde_json::Value {
2651 serde_json::json!({ "jsonrpc": "2.0", "id": id, "result": result })
2652}
2653
2654#[derive(Debug)]
2659enum InboundKind {
2660 Request,
2663 Notification,
2666 OutboundResponse(i64),
2669 Unparseable,
2672}
2673
2674fn classify_inbound(value: &serde_json::Value) -> InboundKind {
2677 let has_method = value.get("method").is_some();
2678 let id = value.get("id");
2679 if has_method {
2680 return if id.is_some() {
2681 InboundKind::Request
2682 } else {
2683 InboundKind::Notification
2684 };
2685 }
2686 let has_payload = value.get("result").is_some() || value.get("error").is_some();
2687 match (id.and_then(serde_json::Value::as_i64), has_payload) {
2688 (Some(id), true) => InboundKind::OutboundResponse(id),
2689 _ => InboundKind::Unparseable,
2690 }
2691}
2692
2693async fn write_frame(
2698 writer: &outbound::SharedWriter,
2699 envelope: &serde_json::Value,
2700) -> Result<(), McpServerError> {
2701 let bytes = serde_json::to_vec(envelope)?;
2702 let mut guard = writer.lock().await;
2703 guard.write_all(&bytes).await?;
2704 guard.write_all(b"\n").await?;
2705 guard.flush().await?;
2706 Ok(())
2707}
2708
2709pub(crate) fn rpc_error(
2710 id: Option<serde_json::Value>,
2711 code: i64,
2712 message: &str,
2713) -> serde_json::Value {
2714 serde_json::json!({
2715 "jsonrpc": "2.0",
2716 "id": id,
2717 "error": { "code": code, "message": message }
2718 })
2719}
2720
2721#[doc(hidden)]
2725pub fn __test_noop_ctx() -> klieo_core::tool::ToolCtx {
2726 noop_ctx()
2727}
2728
2729fn noop_ctx() -> ToolCtx {
2730 let bus = klieo_bus_memory::MemoryBus::new();
2731 ToolCtx::new(bus.pubsub, bus.kv, bus.jobs)
2732}
2733
2734#[cfg(test)]
2735mod tests {
2736 use super::*;
2737 use async_trait::async_trait;
2738 use klieo_core::tool::Tool;
2739 use std::sync::OnceLock;
2740
2741 struct EmptyInvoker;
2742
2743 #[async_trait]
2744 impl ToolInvoker for EmptyInvoker {
2745 fn catalogue(&self) -> Vec<ToolDef> {
2746 Vec::new()
2747 }
2748 async fn invoke(
2749 &self,
2750 name: &str,
2751 _args: serde_json::Value,
2752 _ctx: ToolCtx,
2753 ) -> Result<serde_json::Value, ToolError> {
2754 Err(ToolError::UnknownTool(name.into()))
2755 }
2756 }
2757
2758 struct Echo;
2759
2760 #[async_trait]
2761 impl Tool for Echo {
2762 fn name(&self) -> &str {
2763 "echo"
2764 }
2765 fn description(&self) -> &str {
2766 "echoes back its args"
2767 }
2768 fn json_schema(&self) -> &serde_json::Value {
2769 static S: OnceLock<serde_json::Value> = OnceLock::new();
2770 S.get_or_init(|| serde_json::json!({"type": "object"}))
2771 }
2772 async fn invoke(
2773 &self,
2774 args: serde_json::Value,
2775 _ctx: ToolCtx,
2776 ) -> Result<serde_json::Value, ToolError> {
2777 Ok(args)
2778 }
2779 }
2780
2781 struct OneToolInvoker;
2782
2783 #[async_trait]
2784 impl ToolInvoker for OneToolInvoker {
2785 fn catalogue(&self) -> Vec<ToolDef> {
2786 vec![ToolDef::new(
2787 "echo",
2788 "echoes back its args",
2789 serde_json::json!({"type": "object"}),
2790 )]
2791 }
2792 async fn invoke(
2793 &self,
2794 name: &str,
2795 args: serde_json::Value,
2796 ctx: ToolCtx,
2797 ) -> Result<serde_json::Value, ToolError> {
2798 if name == "echo" {
2799 Echo.invoke(args, ctx).await
2800 } else {
2801 Err(ToolError::UnknownTool(name.into()))
2802 }
2803 }
2804 }
2805
2806 #[tokio::test]
2807 async fn initialize_returns_server_info() {
2808 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2809 let resp = server
2810 .handle_line(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}"#)
2811 .await;
2812 let info = resp["result"]["serverInfo"]["name"].as_str().unwrap();
2813 assert_eq!(info, "klieo-mcp-server");
2814 }
2815
2816 #[tokio::test]
2817 async fn tools_list_surfaces_invoker_catalogue() {
2818 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2819 let resp = server
2820 .handle_line(r#"{"jsonrpc":"2.0","id":2,"method":"tools/list"}"#)
2821 .await;
2822 let tools = resp["result"]["tools"].as_array().unwrap();
2823 assert_eq!(tools.len(), 1);
2824 assert_eq!(tools[0]["name"], "echo");
2825 }
2826
2827 #[tokio::test]
2828 async fn tools_call_dispatches_to_invoker() {
2829 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2830 let resp = server
2831 .handle_line(
2832 r#"{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"echo","arguments":{"hello":"world"}}}"#,
2833 )
2834 .await;
2835 let text = resp["result"]["content"][0]["text"].as_str().unwrap();
2836 assert!(text.contains("hello"));
2837 assert!(text.contains("world"));
2838 }
2839
2840 #[tokio::test]
2841 async fn unknown_method_returns_method_not_found() {
2842 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2843 let resp = server
2844 .handle_line(r#"{"jsonrpc":"2.0","id":4,"method":"nope"}"#)
2845 .await;
2846 assert_eq!(resp["error"]["code"], JSONRPC_METHOD_NOT_FOUND);
2847 }
2848
2849 #[tokio::test]
2850 async fn tools_call_without_params_returns_server_error() {
2851 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2852 let resp = server
2853 .handle_line(r#"{"jsonrpc":"2.0","id":5,"method":"tools/call"}"#)
2854 .await;
2855 assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
2856 assert!(resp["error"]["message"]
2857 .as_str()
2858 .unwrap()
2859 .contains("missing params"));
2860 }
2861
2862 #[tokio::test]
2863 async fn tools_call_unknown_tool_surfaces_invoker_error() {
2864 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2865 let resp = server
2866 .handle_line(
2867 r#"{"jsonrpc":"2.0","id":6,"method":"tools/call","params":{"name":"does-not-exist","arguments":{}}}"#,
2868 )
2869 .await;
2870 assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
2871 assert!(resp["error"]["message"]
2872 .as_str()
2873 .unwrap()
2874 .contains("does-not-exist"));
2875 }
2876
2877 #[tokio::test]
2878 async fn malformed_frame_returns_sanitised_parse_error() {
2879 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2880 let resp = server.handle_line("not json").await;
2881 assert_eq!(resp["error"]["code"], JSONRPC_PARSE_ERROR);
2882 let msg = resp["error"]["message"].as_str().unwrap();
2885 assert_eq!(msg, "malformed JSON-RPC frame");
2886 }
2887
2888 #[tokio::test]
2889 async fn handle_jsonrpc_dispatches_initialize() {
2890 let server = McpServer::builder()
2891 .add_tools(Arc::new(EmptyInvoker))
2892 .build()
2893 .unwrap();
2894 let req = serde_json::json!({
2895 "jsonrpc": "2.0",
2896 "id": 1,
2897 "method": "initialize",
2898 "params": {}
2899 });
2900 let resp = server.handle_jsonrpc(req, None).await;
2901 assert_eq!(resp["jsonrpc"], "2.0");
2902 assert_eq!(resp["id"], 1);
2903 assert!(resp["result"].is_object());
2904 }
2905
2906 #[tokio::test]
2907 async fn handle_jsonrpc_returns_method_not_found_for_unknown() {
2908 let server = McpServer::builder()
2909 .add_tools(Arc::new(EmptyInvoker))
2910 .build()
2911 .unwrap();
2912 let req = serde_json::json!({
2913 "jsonrpc": "2.0",
2914 "id": 7,
2915 "method": "no_such_method"
2916 });
2917 let resp = server.handle_jsonrpc(req, None).await;
2918 assert_eq!(resp["error"]["code"], JSONRPC_METHOD_NOT_FOUND);
2919 assert_eq!(resp["id"], 7);
2920 }
2921
2922 #[tokio::test]
2923 async fn classify_inbound_recognises_request_shape() {
2924 let frame = serde_json::json!({
2925 "jsonrpc": "2.0",
2926 "id": 1,
2927 "method": "tools/list"
2928 });
2929 assert!(matches!(classify_inbound(&frame), InboundKind::Request));
2930 }
2931
2932 #[tokio::test]
2933 async fn classify_inbound_recognises_notification_shape() {
2934 let frame = serde_json::json!({
2935 "jsonrpc": "2.0",
2936 "method": "notifications/initialized"
2937 });
2938 assert!(matches!(
2939 classify_inbound(&frame),
2940 InboundKind::Notification
2941 ));
2942 }
2943
2944 #[tokio::test]
2945 async fn classify_inbound_recognises_outbound_result_shape() {
2946 let frame = serde_json::json!({
2947 "jsonrpc": "2.0",
2948 "id": 42,
2949 "result": {"role": "assistant"}
2950 });
2951 match classify_inbound(&frame) {
2952 InboundKind::OutboundResponse(id) => assert_eq!(id, 42),
2953 other => panic!("expected OutboundResponse(42), got {other:?}"),
2954 }
2955 }
2956
2957 #[tokio::test]
2958 async fn classify_inbound_recognises_outbound_error_shape() {
2959 let frame = serde_json::json!({
2960 "jsonrpc": "2.0",
2961 "id": 7,
2962 "error": {"code": -32601, "message": "Method not found"}
2963 });
2964 match classify_inbound(&frame) {
2965 InboundKind::OutboundResponse(id) => assert_eq!(id, 7),
2966 other => panic!("expected OutboundResponse(7), got {other:?}"),
2967 }
2968 }
2969
2970 #[tokio::test]
2971 async fn classify_inbound_rejects_no_method_no_id() {
2972 let frame = serde_json::json!({"jsonrpc": "2.0"});
2973 assert!(matches!(classify_inbound(&frame), InboundKind::Unparseable));
2974 }
2975
2976 #[tokio::test]
2977 async fn classify_inbound_rejects_bare_id_without_payload() {
2978 let frame = serde_json::json!({"jsonrpc": "2.0", "id": 9});
2982 assert!(matches!(classify_inbound(&frame), InboundKind::Unparseable));
2983 }
2984
2985 type CapturedBytes = std::sync::Arc<std::sync::Mutex<Vec<u8>>>;
2989
2990 fn duplex_writer() -> (outbound::SharedWriter, CapturedBytes) {
2994 let buffer: CapturedBytes = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
2995 let shared: outbound::SharedWriter = Arc::new(Mutex::new(BufferSink(buffer.clone())));
2996 (shared, buffer)
2997 }
2998
2999 struct BufferSink(CapturedBytes);
3003
3004 impl tokio::io::AsyncWrite for BufferSink {
3005 fn poll_write(
3006 self: std::pin::Pin<&mut Self>,
3007 _cx: &mut std::task::Context<'_>,
3008 buf: &[u8],
3009 ) -> std::task::Poll<std::io::Result<usize>> {
3010 self.0
3011 .lock()
3012 .expect("BufferSink mutex poisoned in test")
3013 .extend_from_slice(buf);
3014 std::task::Poll::Ready(Ok(buf.len()))
3015 }
3016
3017 fn poll_flush(
3018 self: std::pin::Pin<&mut Self>,
3019 _cx: &mut std::task::Context<'_>,
3020 ) -> std::task::Poll<std::io::Result<()>> {
3021 std::task::Poll::Ready(Ok(()))
3022 }
3023
3024 fn poll_shutdown(
3025 self: std::pin::Pin<&mut Self>,
3026 _cx: &mut std::task::Context<'_>,
3027 ) -> std::task::Poll<std::io::Result<()>> {
3028 std::task::Poll::Ready(Ok(()))
3029 }
3030 }
3031
3032 fn captured_bytes(buffer: &CapturedBytes) -> Vec<u8> {
3033 buffer
3034 .lock()
3035 .expect("captured-bytes mutex poisoned in test")
3036 .clone()
3037 }
3038
3039 #[tokio::test]
3040 async fn process_stdio_line_writes_response_for_request() {
3041 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3042 let (writer, buffer) = duplex_writer();
3043 let request = r#"{"jsonrpc":"2.0","id":11,"method":"tools/list"}"#;
3044 server
3045 .process_stdio_line(request, &writer)
3046 .await
3047 .expect("stdio dispatch must not fail");
3048 let bytes = captured_bytes(&buffer);
3049 assert!(bytes.ends_with(b"\n"), "frames are newline-delimited");
3050 let envelope: serde_json::Value =
3051 serde_json::from_slice(bytes.trim_ascii_end()).expect("written frame must be JSON");
3052 assert_eq!(envelope["id"], 11);
3053 assert!(envelope["result"]["tools"].is_array());
3054 }
3055
3056 #[tokio::test]
3057 async fn process_stdio_line_drops_outbound_response_when_table_absent() {
3058 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3063 assert!(
3064 server
3065 .stdio_session
3066 .get()
3067 .and_then(|s| s.outbound.get())
3068 .is_none(),
3069 "default-built server must not wire an outbound table"
3070 );
3071 let (writer, buffer) = duplex_writer();
3072 let stray = r#"{"jsonrpc":"2.0","id":99,"result":{"role":"assistant"}}"#;
3073 server
3074 .process_stdio_line(stray, &writer)
3075 .await
3076 .expect("stray response must not break the loop");
3077 assert!(
3078 captured_bytes(&buffer).is_empty(),
3079 "stray outbound responses must never produce wire output"
3080 );
3081 }
3082
3083 #[tokio::test]
3084 async fn process_stdio_line_drops_notification_without_writing() {
3085 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3089 let (writer, buffer) = duplex_writer();
3090 let notification = r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#;
3091 server
3092 .process_stdio_line(notification, &writer)
3093 .await
3094 .expect("notification dispatch must not fail");
3095 assert!(
3096 captured_bytes(&buffer).is_empty(),
3097 "notifications must not produce wire output"
3098 );
3099 }
3100
3101 #[tokio::test]
3102 async fn process_stdio_line_drops_unparseable_frame() {
3103 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3107 let (writer, buffer) = duplex_writer();
3108 let unparseable = r#"{"jsonrpc":"2.0"}"#;
3109 server
3110 .process_stdio_line(unparseable, &writer)
3111 .await
3112 .expect("unparseable frame must not break the loop");
3113 assert!(
3114 captured_bytes(&buffer).is_empty(),
3115 "unparseable frames must not produce wire output"
3116 );
3117 }
3118
3119 #[tokio::test]
3120 async fn process_stdio_line_writes_parse_error_for_malformed_json() {
3121 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3125 let (writer, buffer) = duplex_writer();
3126 server
3127 .process_stdio_line("not json", &writer)
3128 .await
3129 .expect("parse-error path must not fail the loop");
3130 let bytes = captured_bytes(&buffer);
3131 let envelope: serde_json::Value =
3132 serde_json::from_slice(bytes.trim_ascii_end()).expect("parse-error envelope is JSON");
3133 assert_eq!(envelope["error"]["code"], JSONRPC_PARSE_ERROR);
3134 assert_eq!(envelope["error"]["message"], "malformed JSON-RPC frame");
3135 }
3136
3137 #[tokio::test]
3138 async fn initialize_arm_records_roots_capability_when_advertised() {
3139 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3143 let req = serde_json::json!({
3144 "jsonrpc": "2.0",
3145 "id": 400,
3146 "method": "initialize",
3147 "params": { "capabilities": { "roots": {} } }
3148 });
3149 server.handle_jsonrpc(req, None).await;
3150 assert!(
3151 server.client_caps.lock().await.roots_supported,
3152 "initialize must record advertised roots capability"
3153 );
3154 }
3155
3156 #[tokio::test]
3157 async fn initialize_arm_defaults_roots_unsupported_when_absent() {
3158 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3161 let req = serde_json::json!({
3162 "jsonrpc": "2.0",
3163 "id": 401,
3164 "method": "initialize",
3165 "params": { "capabilities": {} }
3166 });
3167 server.handle_jsonrpc(req, None).await;
3168 assert!(
3169 !server.client_caps.lock().await.roots_supported,
3170 "initialize must leave roots_supported=false when absent"
3171 );
3172 }
3173
3174 #[tokio::test]
3175 async fn initialize_result_includes_sampling_when_flag_set() {
3176 let payload = super::initialize_result_with_sampling();
3177 assert!(
3178 payload["capabilities"]["sampling"].is_object(),
3179 "initialize_result_with_sampling must surface capabilities.sampling; got: {payload}"
3180 );
3181 }
3182
3183 #[tokio::test]
3184 async fn initialize_result_omits_sampling_when_flag_unset() {
3185 let payload = super::initialize_result_without_sampling();
3186 assert!(
3187 payload["capabilities"].get("sampling").is_none(),
3188 "initialize_result_without_sampling must omit capabilities.sampling; got: {payload}"
3189 );
3190 }
3191
3192 #[tokio::test]
3193 async fn initialized_notification_returns_null_value() {
3194 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3200 let req = serde_json::json!({
3201 "jsonrpc": "2.0",
3202 "method": "notifications/initialized"
3203 });
3204 let resp = server.handle_jsonrpc(req, None).await;
3205 assert!(
3206 resp.is_null(),
3207 "notifications/initialized must yield a Null sentinel; got: {resp}"
3208 );
3209 }
3210
3211 #[tokio::test]
3212 async fn list_changed_notification_returns_null_value() {
3213 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3218 let req = serde_json::json!({
3219 "jsonrpc": "2.0",
3220 "method": "notifications/roots/list_changed"
3221 });
3222 let resp = server.handle_jsonrpc(req, None).await;
3223 assert!(
3224 resp.is_null(),
3225 "notifications/roots/list_changed must yield a Null sentinel; got: {resp}"
3226 );
3227 }
3228
3229 #[tokio::test]
3230 async fn list_changed_when_cache_absent_is_noop() {
3231 let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3236 assert!(
3237 server
3238 .stdio_session
3239 .get()
3240 .and_then(|s| s.roots_cache.get())
3241 .is_none(),
3242 "default-built server must not wire a roots cache"
3243 );
3244 let req = serde_json::json!({
3245 "jsonrpc": "2.0",
3246 "method": "notifications/roots/list_changed"
3247 });
3248 let resp = server.handle_jsonrpc(req, None).await;
3249 assert!(
3250 resp.is_null(),
3251 "cache-absent list_changed must still yield Null; got: {resp}"
3252 );
3253 }
3254
3255 #[cfg(feature = "http")]
3256 #[tokio::test]
3257 async fn tool_ctx_with_progress_threads_cancel() {
3258 use std::sync::Arc;
3259 use tokio_util::sync::CancellationToken;
3260 let server = Arc::new(
3261 McpServer::builder()
3262 .add_tools(Arc::new(EmptyInvoker))
3263 .build()
3264 .unwrap(),
3265 );
3266 let (tx, _rx) = tokio::sync::broadcast::channel::<klieo_core::AgentEvent>(8);
3267 let token = CancellationToken::new();
3268 let ctx = server.tool_ctx_with_progress(tx, token.clone(), None, None);
3269 token.cancel();
3270 assert!(ctx.cancel.is_cancelled());
3271 }
3272
3273 mod expose_agent_tests {
3274 use super::*;
3275 use async_trait::async_trait;
3276 use klieo_core::agent::{Agent, AgentContext};
3277 use klieo_core::error::Error as KlieoError;
3278 use klieo_core::llm::ToolDef;
3279 use klieo_core::test_utils::fake_context;
3280 use serde::{Deserialize, Serialize};
3281
3282 #[derive(Debug, Clone, Deserialize, Serialize)]
3283 struct GreetIn {
3284 who: String,
3285 }
3286
3287 #[derive(Debug, Clone, Serialize)]
3288 struct GreetOut {
3289 greeting: String,
3290 }
3291
3292 struct Greeter;
3293
3294 #[async_trait]
3295 impl Agent for Greeter {
3296 type Input = GreetIn;
3297 type Output = GreetOut;
3298 type Error = KlieoError;
3299
3300 fn name(&self) -> &str {
3301 "greeter"
3302 }
3303 fn system_prompt(&self) -> &str {
3304 ""
3305 }
3306 fn tools(&self) -> &[ToolDef] {
3307 &[]
3308 }
3309 async fn run(
3310 &self,
3311 _ctx: AgentContext,
3312 input: GreetIn,
3313 ) -> Result<GreetOut, KlieoError> {
3314 Ok(GreetOut {
3315 greeting: format!("hello {}", input.who),
3316 })
3317 }
3318 }
3319
3320 #[derive(Debug, Clone, Serialize)]
3324 struct CancelObserveOut {
3325 state: String,
3326 }
3327
3328 struct CancelObserver;
3329
3330 #[async_trait]
3331 impl Agent for CancelObserver {
3332 type Input = serde_json::Value;
3333 type Output = CancelObserveOut;
3334 type Error = KlieoError;
3335 fn name(&self) -> &str {
3336 "cancel-observer"
3337 }
3338 fn system_prompt(&self) -> &str {
3339 ""
3340 }
3341 fn tools(&self) -> &[ToolDef] {
3342 &[]
3343 }
3344 async fn run(
3345 &self,
3346 ctx: AgentContext,
3347 _input: serde_json::Value,
3348 ) -> Result<CancelObserveOut, KlieoError> {
3349 let state = if ctx.cancel.is_cancelled() {
3350 "cancelled".into()
3351 } else {
3352 "ran".into()
3353 };
3354 Ok(CancelObserveOut { state })
3355 }
3356 }
3357
3358 fn fresh_ctx() -> AgentContext {
3359 fake_context("greeter")
3360 }
3361
3362 fn one_object_schema() -> serde_json::Value {
3363 serde_json::json!({
3364 "type": "object",
3365 "properties": {"who": {"type": "string"}},
3366 "required": ["who"]
3367 })
3368 }
3369
3370 #[tokio::test]
3371 async fn expose_agent_with_schema_lists_agent_as_single_tool() {
3372 let server = McpServer::expose_agent_with_schema(
3373 Greeter,
3374 one_object_schema(),
3375 Arc::new(fresh_ctx),
3376 );
3377 let resp = server
3378 .handle_line(r#"{"jsonrpc":"2.0","id":1,"method":"tools/list"}"#)
3379 .await;
3380 let tools = resp["result"]["tools"].as_array().unwrap();
3381 assert_eq!(tools.len(), 1);
3382 assert_eq!(tools[0]["name"], "greeter");
3383 assert_eq!(tools[0]["inputSchema"]["type"], "object");
3384 }
3385
3386 #[tokio::test]
3387 async fn expose_agent_with_schema_dispatches_tools_call_through_agent() {
3388 let server = McpServer::expose_agent_with_schema(
3389 Greeter,
3390 one_object_schema(),
3391 Arc::new(fresh_ctx),
3392 );
3393 let resp = server
3394 .handle_line(
3395 r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"greeter","arguments":{"who":"world"}}}"#,
3396 )
3397 .await;
3398 let text = resp["result"]["content"][0]["text"].as_str().unwrap();
3399 assert!(
3400 text.contains(r#""greeting":"hello world""#),
3401 "tools/call must return serialised agent output; got: {text}"
3402 );
3403 }
3404
3405 #[tokio::test]
3406 async fn expose_agent_with_schema_rejects_unknown_tool_name() {
3407 let server = McpServer::expose_agent_with_schema(
3408 Greeter,
3409 one_object_schema(),
3410 Arc::new(fresh_ctx),
3411 );
3412 let resp = server
3413 .handle_line(
3414 r#"{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"not-greeter","arguments":{}}}"#,
3415 )
3416 .await;
3417 assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
3418 assert!(resp["error"]["message"]
3419 .as_str()
3420 .unwrap()
3421 .contains("not-greeter"));
3422 }
3423
3424 #[tokio::test]
3425 async fn expose_agent_with_schema_rejects_malformed_args() {
3426 let server = McpServer::expose_agent_with_schema(
3427 Greeter,
3428 one_object_schema(),
3429 Arc::new(fresh_ctx),
3430 );
3431 let resp = server
3433 .handle_line(
3434 r#"{"jsonrpc":"2.0","id":4,"method":"tools/call","params":{"name":"greeter","arguments":{}}}"#,
3435 )
3436 .await;
3437 assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
3438 let msg = resp["error"]["message"].as_str().unwrap();
3439 assert!(
3440 msg.contains("arguments do not match inputSchema"),
3441 "wire message must be the sanitised string; got: {msg}"
3442 );
3443 assert!(
3446 !msg.contains("GreetIn") && !msg.contains("missing field"),
3447 "internal decode detail must not leak: {msg}"
3448 );
3449 }
3450
3451 #[tokio::test]
3457 async fn expose_agent_sanitises_run_error_on_wire() {
3458 struct Failing;
3459 #[async_trait]
3460 impl Agent for Failing {
3461 type Input = serde_json::Value;
3462 type Output = serde_json::Value;
3463 type Error = KlieoError;
3464 fn name(&self) -> &str {
3465 "failing"
3466 }
3467 fn system_prompt(&self) -> &str {
3468 ""
3469 }
3470 fn tools(&self) -> &[ToolDef] {
3471 &[]
3472 }
3473 async fn run(
3474 &self,
3475 _ctx: AgentContext,
3476 _input: serde_json::Value,
3477 ) -> Result<serde_json::Value, KlieoError> {
3478 Err(KlieoError::BadResponse(
3479 "internal: token=secret-abc upstream=https://provider/url".into(),
3480 ))
3481 }
3482 }
3483 let server = McpServer::expose_agent_with_schema(
3484 Failing,
3485 serde_json::json!({}),
3486 Arc::new(fresh_ctx),
3487 );
3488 let resp = server
3489 .handle_line(
3490 r#"{"jsonrpc":"2.0","id":99,"method":"tools/call","params":{"name":"failing","arguments":{}}}"#,
3491 )
3492 .await;
3493 assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
3494 let msg = resp["error"]["message"].as_str().unwrap();
3495 assert!(
3496 msg.contains("tool invocation failed"),
3497 "wire message must contain the sanitised stable string; got: {msg}"
3498 );
3499 assert!(
3502 !msg.contains("secret-abc") && !msg.contains("https://"),
3503 "internal error detail must not leak: {msg}"
3504 );
3505 }
3506
3507 #[tokio::test]
3513 async fn builder_propagates_parent_cancel_into_ctx() {
3514 let parent = CancellationToken::new();
3515 let server = McpServer::builder()
3516 .with_parent_cancel(parent.clone())
3517 .add_agent_with_schema(CancelObserver, serde_json::json!({}), Arc::new(fresh_ctx))
3518 .build()
3519 .unwrap();
3520
3521 let resp = server
3523 .handle_line(
3524 r#"{"jsonrpc":"2.0","id":200,"method":"tools/call","params":{"name":"cancel-observer","arguments":{}}}"#,
3525 )
3526 .await;
3527 let text = resp["result"]["content"][0]["text"].as_str().unwrap();
3528 assert!(
3529 text.contains(r#""state":"ran""#),
3530 "live parent token must produce live ctx.cancel; got: {text}"
3531 );
3532
3533 parent.cancel();
3536 let resp = server
3537 .handle_line(
3538 r#"{"jsonrpc":"2.0","id":201,"method":"tools/call","params":{"name":"cancel-observer","arguments":{}}}"#,
3539 )
3540 .await;
3541 let text = resp["result"]["content"][0]["text"].as_str().unwrap();
3542 assert!(
3543 text.contains(r#""state":"cancelled""#),
3544 "cancelled parent must propagate into ctx.cancel via child_token; got: {text}"
3545 );
3546 }
3547
3548 #[cfg(feature = "http")]
3554 #[tokio::test]
3555 async fn tool_ctx_with_progress_cancel_cascades_into_agent_context() {
3556 let request_cancel = CancellationToken::new();
3557 let server = Arc::new(
3558 McpServer::builder()
3559 .add_agent_with_schema(
3560 CancelObserver,
3561 serde_json::json!({}),
3562 Arc::new(fresh_ctx),
3563 )
3564 .build()
3565 .unwrap(),
3566 );
3567 let (tx, _rx) = tokio::sync::broadcast::channel::<klieo_core::AgentEvent>(8);
3568 request_cancel.cancel();
3569 let tool_ctx = server.tool_ctx_with_progress(tx, request_cancel, None, None);
3570 let result = server
3571 .invoker
3572 .invoke("cancel-observer", serde_json::json!({}), tool_ctx)
3573 .await
3574 .unwrap();
3575 let text = result.to_string();
3576 assert!(
3577 text.contains(r#""state":"cancelled""#),
3578 "cancelled request token must cascade into AgentContext.cancel; got: {text}"
3579 );
3580 }
3581
3582 #[tokio::test]
3589 async fn shim_ctor_uses_default_uncancelled_parent_token() {
3590 let server = McpServer::expose_agent_with_schema(
3591 CancelObserver,
3592 serde_json::json!({}),
3593 Arc::new(fresh_ctx),
3594 );
3595 let resp = server
3596 .handle_line(
3597 r#"{"jsonrpc":"2.0","id":202,"method":"tools/call","params":{"name":"cancel-observer","arguments":{}}}"#,
3598 )
3599 .await;
3600 let text = resp["result"]["content"][0]["text"].as_str().unwrap();
3601 assert!(
3602 text.contains(r#""state":"ran""#),
3603 "shim ctor must default to a never-cancelled parent token; got: {text}"
3604 );
3605 }
3606
3607 #[tokio::test]
3614 async fn agent_as_tool_invoker_installs_tenant_label_from_caller_principal() {
3615 use klieo_core::test_utils::{noop_bus, FakeLlmClient, FakeLlmStep};
3616 const PRINCIPAL: &str = "alice@example.com";
3617
3618 struct EchoLoopAgent;
3619
3620 #[async_trait]
3621 impl Agent for EchoLoopAgent {
3622 type Input = serde_json::Value;
3623 type Output = serde_json::Value;
3624 type Error = KlieoError;
3625 fn name(&self) -> &str {
3626 "echo-loop"
3627 }
3628 fn system_prompt(&self) -> &str {
3629 ""
3630 }
3631 fn tools(&self) -> &[ToolDef] {
3632 &[]
3633 }
3634 async fn run(
3635 &self,
3636 ctx: AgentContext,
3637 _input: serde_json::Value,
3638 ) -> Result<serde_json::Value, KlieoError> {
3639 let out = klieo_core::runtime::run_steps(
3640 &ctx,
3641 "",
3642 klieo_core::ids::ThreadId::new("echo-loop-thread"),
3643 klieo_core::runtime::RunOptions::default(),
3644 )
3645 .await?;
3646 Ok(serde_json::Value::String(out))
3647 }
3648 }
3649
3650 let mut ctx_seed = fake_context("echo-loop");
3651 ctx_seed.llm = Arc::new(
3652 FakeLlmClient::new("fake").with_steps(vec![FakeLlmStep::Text("done".into())]),
3653 );
3654 let episodic_for_probe = ctx_seed.episodic.clone();
3655 let short_term_for_probe = ctx_seed.short_term.clone();
3656 let run_id_for_probe = ctx_seed.run_id;
3657
3658 let slot = Arc::new(std::sync::Mutex::new(Some(ctx_seed)));
3659 let ctx_factory: AgentContextFactory = Arc::new(move || {
3660 slot.lock()
3661 .unwrap()
3662 .take()
3663 .expect("ctx_factory called more than once")
3664 });
3665 let server = McpServer::builder()
3666 .add_agent_with_schema(
3667 EchoLoopAgent,
3668 serde_json::json!({"type": "object"}),
3669 ctx_factory,
3670 )
3671 .build()
3672 .unwrap();
3673 let (pubsub, _, kv, jobs) = noop_bus();
3674 let tool_ctx = klieo_core::tool::ToolCtx::new(pubsub, kv, jobs)
3675 .with_caller_principal(PRINCIPAL.into());
3676 let _ = server
3677 .invoker
3678 .invoke(
3679 "echo-loop",
3680 serde_json::json!({}),
3681 tool_ctx,
3682 )
3683 .await
3684 .unwrap();
3685
3686 let expected = klieo_core::principal_hash(PRINCIPAL);
3687 let episodes = episodic_for_probe.replay(run_id_for_probe).await.unwrap();
3688 let labels: Vec<&str> = episodes
3689 .iter()
3690 .filter_map(|e| match e {
3691 klieo_core::Episode::RunAttributed { tenant_label } => {
3692 Some(tenant_label.as_str())
3693 }
3694 _ => None,
3695 })
3696 .collect();
3697 assert_eq!(
3698 labels,
3699 vec![expected.as_str()],
3700 "exactly one RunAttributed carrying principal_hash; got {episodes:?}",
3701 );
3702 for ep in &episodes {
3703 let payload = serde_json::to_string(ep).unwrap();
3704 assert!(
3705 !payload.contains(PRINCIPAL),
3706 "raw principal leaked into recorded episode: {payload}",
3707 );
3708 }
3709 let history = short_term_for_probe
3710 .load(klieo_core::ids::ThreadId::new("echo-loop-thread"), 8192)
3711 .await
3712 .unwrap_or_default();
3713 for msg in &history {
3714 assert!(
3715 !msg.content.contains(PRINCIPAL),
3716 "principal leaked into short-term memory: {}",
3717 msg.content
3718 );
3719 }
3720 }
3721
3722 #[tokio::test]
3729 async fn agent_as_tool_invoker_records_run_origin_from_parent_anchor() {
3730 use klieo_core::test_utils::{noop_bus, FakeLlmClient, FakeLlmStep};
3731 const PRINCIPAL: &str = "alice@example.com";
3732 const ANCHOR: &str = "sha256:deadbeefcafe0123";
3733
3734 struct EchoLoopAgent;
3735
3736 #[async_trait]
3737 impl Agent for EchoLoopAgent {
3738 type Input = serde_json::Value;
3739 type Output = serde_json::Value;
3740 type Error = KlieoError;
3741 fn name(&self) -> &str {
3742 "echo-origin"
3743 }
3744 fn system_prompt(&self) -> &str {
3745 ""
3746 }
3747 fn tools(&self) -> &[ToolDef] {
3748 &[]
3749 }
3750 async fn run(
3751 &self,
3752 ctx: AgentContext,
3753 _input: serde_json::Value,
3754 ) -> Result<serde_json::Value, KlieoError> {
3755 let out = klieo_core::runtime::run_steps(
3756 &ctx,
3757 "",
3758 klieo_core::ids::ThreadId::new("echo-origin-thread"),
3759 klieo_core::runtime::RunOptions::default(),
3760 )
3761 .await?;
3762 Ok(serde_json::Value::String(out))
3763 }
3764 }
3765
3766 let mut ctx_seed = fake_context("echo-origin");
3767 ctx_seed.llm = Arc::new(
3768 FakeLlmClient::new("fake").with_steps(vec![FakeLlmStep::Text("done".into())]),
3769 );
3770 let episodic_for_probe = ctx_seed.episodic.clone();
3771 let short_term_for_probe = ctx_seed.short_term.clone();
3772 let run_id_for_probe = ctx_seed.run_id;
3773
3774 let slot = Arc::new(std::sync::Mutex::new(Some(ctx_seed)));
3775 let ctx_factory: AgentContextFactory = Arc::new(move || {
3776 slot.lock()
3777 .unwrap()
3778 .take()
3779 .expect("ctx_factory called more than once")
3780 });
3781 let server = McpServer::builder()
3782 .add_agent_with_schema(
3783 EchoLoopAgent,
3784 serde_json::json!({"type": "object"}),
3785 ctx_factory,
3786 )
3787 .build()
3788 .unwrap();
3789 let (pubsub, _, kv, jobs) = noop_bus();
3790 let tool_ctx = klieo_core::tool::ToolCtx::new(pubsub, kv, jobs)
3791 .with_caller_principal(PRINCIPAL.into())
3792 .with_parent_anchor(ANCHOR.into());
3793 let _ = server
3794 .invoker
3795 .invoke("echo-origin", serde_json::json!({}), tool_ctx)
3796 .await
3797 .unwrap();
3798
3799 let episodes = episodic_for_probe.replay(run_id_for_probe).await.unwrap();
3800 let anchors: Vec<&str> = episodes
3801 .iter()
3802 .filter_map(|e| match e {
3803 klieo_core::Episode::RunOrigin { parent_anchor } => {
3804 Some(parent_anchor.as_str())
3805 }
3806 _ => None,
3807 })
3808 .collect();
3809 assert_eq!(
3810 anchors,
3811 vec![ANCHOR],
3812 "exactly one RunOrigin carrying the verbatim anchor; got {episodes:?}",
3813 );
3814 let attributed = episodes
3818 .iter()
3819 .filter(|e| matches!(e, klieo_core::Episode::RunAttributed { .. }))
3820 .count();
3821 assert_eq!(
3822 attributed, 1,
3823 "RunOrigin co-emitted with exactly one RunAttributed; got {episodes:?}",
3824 );
3825 let history = short_term_for_probe
3826 .load(klieo_core::ids::ThreadId::new("echo-origin-thread"), 8192)
3827 .await
3828 .unwrap_or_default();
3829 for msg in &history {
3830 assert!(
3831 !msg.content.contains(ANCHOR),
3832 "anchor leaked into short-term memory: {}",
3833 msg.content
3834 );
3835 }
3836 }
3837
3838 #[tokio::test]
3842 async fn builder_with_no_invokers_returns_no_invokers_error() {
3843 let result = McpServer::builder().build();
3844 assert!(matches!(result, Err(McpBuildError::NoInvokers)));
3845 }
3846
3847 #[tokio::test]
3852 async fn with_client_sampling_sets_capability_flag() {
3853 let server = McpServer::builder()
3854 .with_client_sampling()
3855 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3856 .build()
3857 .unwrap();
3858 assert!(
3859 server.declare_sampling,
3860 "with_client_sampling() must set declare_sampling=true on the built server"
3861 );
3862 }
3863
3864 #[tokio::test]
3868 async fn default_builder_does_not_declare_sampling() {
3869 let server = McpServer::builder()
3870 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3871 .build()
3872 .unwrap();
3873 assert!(
3874 !server.declare_sampling,
3875 "default builder must leave declare_sampling=false"
3876 );
3877 }
3878
3879 #[cfg(feature = "http")]
3885 #[tokio::test]
3886 async fn default_session_idle_timeout_is_5min() {
3887 let server = McpServer::builder()
3888 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3889 .build()
3890 .unwrap();
3891 assert_eq!(
3892 server.session_idle_timeout,
3893 std::time::Duration::from_secs(300),
3894 "default session idle timeout must be 5 minutes"
3895 );
3896 assert!(
3897 server.sessions.read().await.is_empty(),
3898 "HTTP server must hold zero sessions before any initialize POST"
3899 );
3900 }
3901
3902 #[cfg(feature = "http")]
3906 #[tokio::test]
3907 async fn with_session_idle_timeout_overrides_default() {
3908 let server = McpServer::builder()
3909 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3910 .with_session_idle_timeout(std::time::Duration::from_secs(42))
3911 .build()
3912 .unwrap();
3913 assert_eq!(
3914 server.session_idle_timeout,
3915 std::time::Duration::from_secs(42),
3916 "with_session_idle_timeout must override the default"
3917 );
3918 }
3919
3920 #[cfg(feature = "http")]
3924 #[tokio::test]
3925 async fn zero_duration_records_disabled_watchdog() {
3926 let server = McpServer::builder()
3927 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3928 .with_session_idle_timeout(std::time::Duration::ZERO)
3929 .build()
3930 .unwrap();
3931 assert_eq!(
3932 server.session_idle_timeout,
3933 std::time::Duration::ZERO,
3934 "Duration::ZERO must thread through to record disabled-watchdog intent"
3935 );
3936 }
3937
3938 #[cfg(feature = "http")]
3943 #[test]
3944 fn default_max_sessions_is_1024() {
3945 assert_eq!(DEFAULT_MAX_SESSIONS, 1024);
3946 }
3947
3948 #[cfg(feature = "http")]
3954 #[tokio::test]
3955 async fn with_max_sessions_overrides_default() {
3956 let server = McpServer::builder()
3957 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3958 .with_max_sessions(64)
3959 .build()
3960 .unwrap();
3961 assert_eq!(
3962 server.max_sessions, 64,
3963 "with_max_sessions must override the default cap"
3964 );
3965 }
3966
3967 #[cfg(feature = "http")]
3972 #[test]
3973 #[should_panic(expected = "max_sessions must be > 0")]
3974 fn with_max_sessions_panics_on_zero() {
3975 let _ = McpServer::builder().with_max_sessions(0);
3976 }
3977
3978 #[cfg(feature = "http")]
3984 #[test]
3985 fn default_divisor_is_sixteen() {
3986 assert_eq!(DEFAULT_MAX_SESSIONS_PER_PRINCIPAL_DIVISOR, 16);
3987 }
3988
3989 #[cfg(feature = "http")]
3995 #[test]
3996 fn default_per_principal_derives_from_max_sessions() {
3997 assert_eq!(default_max_sessions_per_principal(1024), 64);
3998 assert_eq!(default_max_sessions_per_principal(32), 2);
3999 }
4000
4001 #[cfg(feature = "http")]
4007 #[test]
4008 fn default_per_principal_floors_at_one() {
4009 assert_eq!(default_max_sessions_per_principal(0), 1);
4010 assert_eq!(default_max_sessions_per_principal(15), 1);
4011 assert_eq!(default_max_sessions_per_principal(16), 1);
4012 }
4013
4014 #[cfg(feature = "http")]
4020 #[tokio::test]
4021 async fn with_max_sessions_per_principal_overrides_default() {
4022 let server = McpServer::builder()
4023 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4024 .with_max_sessions(1024)
4025 .with_max_sessions_per_principal(8)
4026 .build()
4027 .unwrap();
4028 assert_eq!(
4029 server.max_sessions_per_principal, 8,
4030 "with_max_sessions_per_principal must override the default sub-cap"
4031 );
4032 }
4033
4034 #[cfg(feature = "http")]
4040 #[test]
4041 #[should_panic(expected = "max_sessions_per_principal must be > 0")]
4042 fn with_max_sessions_per_principal_panics_on_zero() {
4043 let _ = McpServer::builder().with_max_sessions_per_principal(0);
4044 }
4045
4046 #[cfg(feature = "http")]
4052 #[test]
4053 fn default_sse_replay_capacity_is_256() {
4054 assert_eq!(DEFAULT_SSE_REPLAY_CAPACITY, 256);
4055 }
4056
4057 #[cfg(feature = "http")]
4064 #[tokio::test]
4065 async fn with_sse_replay_capacity_overrides_default() {
4066 let server = McpServer::builder()
4067 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4068 .with_sse_replay_capacity(8)
4069 .build()
4070 .unwrap();
4071 assert_eq!(server.sse_replay_capacity, 8);
4072 assert!(server.sse_replay_enabled());
4073
4074 let off = McpServer::builder()
4075 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4076 .with_sse_replay_capacity(0)
4077 .build()
4078 .unwrap();
4079 assert_eq!(off.sse_replay_capacity, 0);
4080 assert!(!off.sse_replay_enabled());
4081 }
4082
4083 #[tokio::test]
4087 async fn builder_supports_multi_agent_dispatch() {
4088 let server = McpServer::builder()
4089 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4090 .add_agent_with_schema(CancelObserver, serde_json::json!({}), Arc::new(fresh_ctx))
4091 .build()
4092 .unwrap();
4093
4094 let resp = server
4096 .handle_line(r#"{"jsonrpc":"2.0","id":300,"method":"tools/list"}"#)
4097 .await;
4098 let tools = resp["result"]["tools"].as_array().unwrap();
4099 let names: Vec<&str> = tools.iter().map(|t| t["name"].as_str().unwrap()).collect();
4100 assert_eq!(tools.len(), 2);
4101 assert!(names.contains(&"greeter"));
4102 assert!(names.contains(&"cancel-observer"));
4103
4104 let resp = server
4106 .handle_line(
4107 r#"{"jsonrpc":"2.0","id":301,"method":"tools/call","params":{"name":"greeter","arguments":{"who":"multi"}}}"#,
4108 )
4109 .await;
4110 let text = resp["result"]["content"][0]["text"].as_str().unwrap();
4111 assert!(text.contains(r#""greeting":"hello multi""#));
4112
4113 let resp = server
4115 .handle_line(
4116 r#"{"jsonrpc":"2.0","id":302,"method":"tools/call","params":{"name":"cancel-observer","arguments":{}}}"#,
4117 )
4118 .await;
4119 let text = resp["result"]["content"][0]["text"].as_str().unwrap();
4120 assert!(text.contains(r#""state":"ran""#));
4121 }
4122
4123 #[tokio::test]
4129 async fn builder_parent_cancel_propagates_into_every_agent() {
4130 let parent = CancellationToken::new();
4131 let server = McpServer::builder()
4132 .with_parent_cancel(parent.clone())
4133 .add_agent_with_schema(CancelObserver, serde_json::json!({}), Arc::new(fresh_ctx))
4134 .add_tools(Arc::new(super::OneToolInvoker))
4135 .build()
4136 .unwrap();
4137
4138 parent.cancel();
4139 let resp = server
4140 .handle_line(
4141 r#"{"jsonrpc":"2.0","id":303,"method":"tools/call","params":{"name":"cancel-observer","arguments":{}}}"#,
4142 )
4143 .await;
4144 let text = resp["result"]["content"][0]["text"].as_str().unwrap();
4145 assert!(
4146 text.contains(r#""state":"cancelled""#),
4147 "builder-level parent_cancel must reach every add_agent_* invoker; got: {text}"
4148 );
4149 }
4150
4151 #[tokio::test]
4157 async fn builder_multi_agent_unknown_tool_returns_error() {
4158 let server = McpServer::builder()
4159 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4160 .add_agent_with_schema(CancelObserver, serde_json::json!({}), Arc::new(fresh_ctx))
4161 .build()
4162 .unwrap();
4163
4164 let resp = server
4165 .handle_line(
4166 r#"{"jsonrpc":"2.0","id":304,"method":"tools/call","params":{"name":"no-such-agent","arguments":{}}}"#,
4167 )
4168 .await;
4169 assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
4170 assert!(
4171 resp["error"]["message"]
4172 .as_str()
4173 .unwrap()
4174 .contains("no-such-agent"),
4175 "UnknownTool error must reference the requested name"
4176 );
4177 }
4178
4179 #[tokio::test]
4183 async fn builder_build_returns_duplicate_tool_error() {
4184 let result = McpServer::builder()
4185 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4186 .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4187 .build();
4188 let Err(McpBuildError::DuplicateTool(ref name)) = result else {
4189 panic!("expected DuplicateTool error");
4190 };
4191 assert!(
4192 !name.is_empty(),
4193 "DuplicateTool must carry the colliding tool name"
4194 );
4195 assert_eq!(
4196 name, "greeter",
4197 "DuplicateTool must name the colliding tool"
4198 );
4199 }
4200
4201 #[test]
4208 fn merged_invoker_forwards_tool_redacts_audit_per_owner() {
4209 use klieo_core::test_utils::FakeToolInvoker;
4210
4211 let pii_owner: Arc<dyn ToolInvoker> = Arc::new(
4212 FakeToolInvoker::new().with_redacting_tool("claimant_lookup", "handles PII", Ok),
4213 );
4214 let plain_owner: Arc<dyn ToolInvoker> =
4215 Arc::new(FakeToolInvoker::new().with_tool("echo", "plain", Ok));
4216
4217 let merged = MergedInvoker::new(vec![pii_owner, plain_owner])
4218 .expect("distinct tool names must merge without DuplicateTool");
4219
4220 assert!(
4221 merged.tool_redacts_audit("claimant_lookup"),
4222 "a PII-flagged tool's redaction must survive the merge; \
4223 default-false here would record raw PII (fail-open)"
4224 );
4225 assert!(
4226 !merged.tool_redacts_audit("echo"),
4227 "an unflagged tool must not be reported as redacting"
4228 );
4229 assert!(
4230 !merged.tool_redacts_audit("no-such-tool"),
4231 "an unrouted name hits the None arm and must default to false"
4232 );
4233 }
4234
4235 #[test]
4239 fn mcp_builder_build_returns_cancel_requires_arc_error() {
4240 let result = McpServer::builder()
4241 .add_tools(Arc::new(OneToolInvoker))
4242 .with_cancel_subscription()
4243 .build();
4244 assert!(
4245 matches!(result, Err(McpBuildError::CancelRequiresArc)),
4246 "build() with cancel subscription must return CancelRequiresArc",
4247 );
4248 }
4249
4250 #[tokio::test]
4256 async fn expose_agent_sanitises_encode_error_on_wire() {
4257 struct NonSerialisable;
4258
4259 impl serde::Serialize for NonSerialisable {
4260 fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
4261 where
4262 S: serde::Serializer,
4263 {
4264 Err(serde::ser::Error::custom(
4265 "internal: token=secret-encode-abc upstream=https://provider/encode",
4266 ))
4267 }
4268 }
4269
4270 struct EncodeFailing;
4271 #[async_trait]
4272 impl Agent for EncodeFailing {
4273 type Input = serde_json::Value;
4274 type Output = NonSerialisable;
4275 type Error = KlieoError;
4276 fn name(&self) -> &str {
4277 "encode-failing"
4278 }
4279 fn system_prompt(&self) -> &str {
4280 ""
4281 }
4282 fn tools(&self) -> &[ToolDef] {
4283 &[]
4284 }
4285 async fn run(
4286 &self,
4287 _ctx: AgentContext,
4288 _input: serde_json::Value,
4289 ) -> Result<NonSerialisable, KlieoError> {
4290 Ok(NonSerialisable)
4291 }
4292 }
4293
4294 let server = McpServer::expose_agent_with_schema(
4295 EncodeFailing,
4296 serde_json::json!({}),
4297 Arc::new(fresh_ctx),
4298 );
4299 let resp = server
4300 .handle_line(
4301 r#"{"jsonrpc":"2.0","id":100,"method":"tools/call","params":{"name":"encode-failing","arguments":{}}}"#,
4302 )
4303 .await;
4304 assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
4305 let msg = resp["error"]["message"].as_str().unwrap();
4306 assert!(
4307 msg.contains("tool invocation failed"),
4308 "wire message must contain the sanitised stable string; got: {msg}"
4309 );
4310 assert!(
4311 !msg.contains("secret-encode-abc") && !msg.contains("https://"),
4312 "internal encode-error detail must not leak: {msg}"
4313 );
4314 }
4315
4316 #[tokio::test]
4317 async fn tool_ctx_factory_invoked_per_request() {
4318 use std::sync::atomic::{AtomicUsize, Ordering};
4319 let counter = Arc::new(AtomicUsize::new(0));
4320 let c2 = counter.clone();
4321 let factory: ToolCtxFactory = Arc::new(move || {
4322 c2.fetch_add(1, Ordering::SeqCst);
4323 default_tool_ctx_factory()()
4324 });
4325
4326 let server = McpServer::builder()
4327 .with_tool_ctx_factory(factory)
4328 .add_tools(Arc::new(super::OneToolInvoker))
4329 .build()
4330 .unwrap();
4331
4332 let req = serde_json::json!({
4333 "jsonrpc": "2.0", "id": 1, "method": "tools/call",
4334 "params": { "name": "echo", "arguments": {"x": 1} }
4335 });
4336 server.handle_jsonrpc(req.clone(), None).await;
4337 server.handle_jsonrpc(req, None).await;
4338
4339 assert_eq!(counter.load(Ordering::SeqCst), 2);
4340 }
4341
4342 #[tokio::test]
4347 async fn agent_as_tool_invoker_propagates_progress_to_agent_context() {
4348 use klieo_core::AgentEvent;
4349 use std::sync::Mutex;
4350 use tokio::sync::broadcast;
4351
4352 struct CapturingAgent {
4353 captured: Arc<Mutex<Option<Option<broadcast::Sender<AgentEvent>>>>>,
4354 }
4355
4356 #[async_trait]
4357 impl Agent for CapturingAgent {
4358 type Input = serde_json::Value;
4359 type Output = serde_json::Value;
4360 type Error = KlieoError;
4361
4362 fn name(&self) -> &str {
4363 "capturing"
4364 }
4365 fn system_prompt(&self) -> &str {
4366 ""
4367 }
4368 fn tools(&self) -> &[ToolDef] {
4369 &[]
4370 }
4371 async fn run(
4372 &self,
4373 ctx: AgentContext,
4374 _input: serde_json::Value,
4375 ) -> Result<serde_json::Value, KlieoError> {
4376 *self.captured.lock().unwrap() = Some(ctx.progress.clone());
4377 Ok(serde_json::json!({}))
4378 }
4379 }
4380
4381 let captured = Arc::new(Mutex::new(None::<Option<broadcast::Sender<AgentEvent>>>));
4382 let agent = CapturingAgent {
4383 captured: captured.clone(),
4384 };
4385
4386 let (tx, _rx) = broadcast::channel::<AgentEvent>(16);
4387 let tx_for_factory = tx.clone();
4388 let factory: ToolCtxFactory = Arc::new(move || {
4389 let bus = klieo_bus_memory::MemoryBus::new();
4390 klieo_core::tool::ToolCtx::new(bus.pubsub, bus.kv, bus.jobs)
4391 .with_progress(tx_for_factory.clone())
4392 });
4393
4394 let server = McpServer::builder()
4395 .with_tool_ctx_factory(factory)
4396 .add_agent_with_schema(agent, serde_json::json!({}), Arc::new(fresh_ctx))
4397 .build()
4398 .unwrap();
4399
4400 let req = serde_json::json!({
4401 "jsonrpc": "2.0", "id": 1, "method": "tools/call",
4402 "params": { "name": "capturing", "arguments": {} }
4403 });
4404 let resp = server.handle_jsonrpc(req, None).await;
4405 assert!(
4406 resp["result"].is_object(),
4407 "tools/call must succeed; got: {resp}"
4408 );
4409
4410 let captured_progress = captured
4411 .lock()
4412 .unwrap()
4413 .clone()
4414 .expect("Agent::run was never invoked");
4415 assert!(
4416 captured_progress.is_some(),
4417 "AgentContext.progress was None despite ToolCtx.progress=Some"
4418 );
4419 }
4420
4421 #[cfg(feature = "schemars")]
4422 mod auto_derive {
4423 use super::*;
4424 use schemars::JsonSchema;
4425
4426 #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
4427 struct DerivedIn {
4428 who: String,
4429 }
4430
4431 #[derive(Debug, Clone, Serialize)]
4432 struct DerivedOut {
4433 greeting: String,
4434 }
4435
4436 struct DerivedGreeter;
4437
4438 #[async_trait]
4439 impl Agent for DerivedGreeter {
4440 type Input = DerivedIn;
4441 type Output = DerivedOut;
4442 type Error = KlieoError;
4443
4444 fn name(&self) -> &str {
4445 "derived-greeter"
4446 }
4447 fn system_prompt(&self) -> &str {
4448 ""
4449 }
4450 fn tools(&self) -> &[ToolDef] {
4451 &[]
4452 }
4453 async fn run(
4454 &self,
4455 _ctx: AgentContext,
4456 input: DerivedIn,
4457 ) -> Result<DerivedOut, KlieoError> {
4458 Ok(DerivedOut {
4459 greeting: format!("hi {}", input.who),
4460 })
4461 }
4462 }
4463
4464 #[tokio::test]
4465 async fn expose_agent_auto_derives_schema_via_schemars() {
4466 let server = McpServer::expose_agent(DerivedGreeter, Arc::new(fresh_ctx));
4467 let resp = server
4468 .handle_line(r#"{"jsonrpc":"2.0","id":1,"method":"tools/list"}"#)
4469 .await;
4470 let schema = &resp["result"]["tools"][0]["inputSchema"];
4471 assert!(
4473 schema["properties"]["who"].is_object(),
4474 "derived schema must include the `who` field; got: {schema}"
4475 );
4476 }
4477 }
4478 }
4479
4480
4481 #[cfg(not(feature = "governor"))]
4488 mod expose_workflow_tests {
4489 use super::*;
4490 use async_trait::async_trait;
4491 use chrono::Utc;
4492 use klieo_core::agent::{Agent, AgentContext};
4493 use klieo_core::error::Error as KlieoError;
4494 use klieo_core::llm::Message;
4495 use klieo_core::runtime::{ReviewPolicy, RunOptions};
4496 use klieo_core::test_utils::{fake_context, fake_kv, FakeLlmClient, FakeLlmStep};
4497 use klieo_core::ToolDef;
4498 use klieo_hitl::HitlConfig;
4499 use klieo_hitl_client::HitlClient;
4500 use secrecy::SecretString;
4501 use serde::{Deserialize, Serialize};
4502 use serde_json::json;
4503 use std::time::Duration;
4504 use wiremock::matchers::{method, path};
4505 use wiremock::{Mock, MockServer, ResponseTemplate};
4506
4507 const CHECKPOINT_BUCKET: &str = "klieo.run-checkpoints";
4508 const WORKSPACE_ID: &str = "ws-test";
4509 const PLANTED_SENTINEL: &str = "PLANT-SENTINEL-9F7C";
4510
4511 #[derive(Debug, Clone, Deserialize, Serialize)]
4512 struct WorkflowIn {
4513 #[allow(dead_code)]
4514 payload: String,
4515 }
4516
4517 #[derive(Debug, Clone, Serialize)]
4518 struct UnusedOut;
4519
4520 struct WorkflowAgent {
4521 name: &'static str,
4522 }
4523
4524 #[async_trait]
4525 impl Agent for WorkflowAgent {
4526 type Input = WorkflowIn;
4527 type Output = UnusedOut;
4528 type Error = KlieoError;
4529
4530 fn name(&self) -> &str {
4531 self.name
4532 }
4533 fn system_prompt(&self) -> &str {
4534 ""
4535 }
4536 fn tools(&self) -> &[ToolDef] {
4537 &[]
4538 }
4539 async fn run(
4540 &self,
4541 _ctx: AgentContext,
4542 _input: WorkflowIn,
4543 ) -> Result<UnusedOut, KlieoError> {
4544 Err(KlieoError::BadResponse(
4545 "workflow path must not call Agent::run".into(),
4546 ))
4547 }
4548 }
4549
4550 struct PauseOnce(std::sync::atomic::AtomicBool);
4551
4552 impl PauseOnce {
4553 fn new() -> Self {
4554 Self(std::sync::atomic::AtomicBool::new(false))
4555 }
4556 }
4557
4558 #[async_trait]
4559 impl ReviewPolicy for PauseOnce {
4560 async fn should_pause_for_approval(
4561 &self,
4562 _step: u32,
4563 _message: &Message,
4564 ) -> Result<Option<String>, KlieoError> {
4565 if self.0.swap(true, std::sync::atomic::Ordering::SeqCst) {
4566 Ok(None)
4567 } else {
4568 Ok(Some("policy reason that MUST NOT leak to peer".into()))
4569 }
4570 }
4571 }
4572
4573 fn workflow_ctx_with(steps: Vec<FakeLlmStep>) -> AgentContext {
4574 let mut ctx = fake_context("workflow-test");
4575 ctx.llm = Arc::new(FakeLlmClient::new("fake").with_steps(steps));
4576 ctx.kv = fake_kv();
4577 ctx
4578 }
4579
4580 fn item_json(id: &str, state: &str) -> serde_json::Value {
4581 json!({
4582 "id": id, "workspace_id": WORKSPACE_ID, "state": state, "version": 1,
4583 "escalation_count": 0,
4584 "decision_context": {"subject_ref":"x","run_id":"r","payload_hash_hex":"h"},
4585 "reviewer": null, "updated_at": "2026-06-18T00:00:00Z"
4586 })
4587 }
4588
4589 fn hitl_cfg(poll_timeout: Duration) -> HitlConfig {
4590 HitlConfig::new(
4591 WORKSPACE_ID,
4592 CHECKPOINT_BUCKET,
4593 Duration::from_millis(1),
4594 poll_timeout,
4595 )
4596 }
4597
4598 fn gated_run_options() -> RunOptions {
4599 RunOptions::default()
4600 .with_review_policy(Arc::new(PauseOnce::new()))
4601 .with_checkpoint_bucket(CHECKPOINT_BUCKET)
4602 }
4603
4604 fn plain_run_options() -> RunOptions {
4605 RunOptions::default()
4606 }
4607
4608 fn one_shot_ctx_factory(ctx: AgentContext) -> AgentContextFactory {
4609 let slot = Arc::new(std::sync::Mutex::new(Some(ctx)));
4610 Arc::new(move || {
4611 slot.lock()
4612 .unwrap()
4613 .take()
4614 .expect("ctx_factory called more than once")
4615 })
4616 }
4617
4618 fn input_schema() -> serde_json::Value {
4619 json!({
4620 "type": "object",
4621 "properties": {"payload": {"type": "string"}},
4622 "required": ["payload"]
4623 })
4624 }
4625
4626 #[tokio::test]
4630 async fn invoke_happy_path_returns_text_without_hitl_traffic() {
4631 let mock = MockServer::start().await;
4632 Mock::given(method("POST"))
4634 .and(path("/api/v1/hitl/items"))
4635 .respond_with(ResponseTemplate::new(500))
4636 .expect(0)
4637 .mount(&mock)
4638 .await;
4639
4640 let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("workflow done".into())]);
4641 let client = Arc::new(HitlClient::new(
4642 mock.uri(),
4643 SecretString::from("tok".to_string()),
4644 ));
4645 let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
4646
4647 let server = McpServer::expose_workflow_with_schema(
4648 WorkflowAgent { name: "wf-happy" },
4649 "you are a workflow",
4650 input_schema(),
4651 plain_run_options(),
4652 client,
4653 cfg,
4654 one_shot_ctx_factory(ctx),
4655 )
4656 .unwrap();
4657
4658 let resp = server
4659 .handle_line(
4660 r#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"wf-happy","arguments":{"payload":"hi"}}}"#,
4661 )
4662 .await;
4663
4664 let text = resp["result"]["content"][0]["text"].as_str().unwrap();
4665 assert!(
4666 text.contains("workflow done"),
4667 "tools/call must return run_with_hitl text body; got: {text}"
4668 );
4669 }
4670
4671 #[tokio::test]
4677 async fn invoke_suspend_path_redacts_reason_and_drops_checkpoint() {
4678 let mock = MockServer::start().await;
4679 Mock::given(method("POST"))
4680 .and(path("/api/v1/hitl/items"))
4681 .respond_with(
4682 ResponseTemplate::new(201)
4683 .set_body_json(item_json("item-suspend", "awaiting")),
4684 )
4685 .mount(&mock)
4686 .await;
4687 Mock::given(method("GET"))
4688 .and(path("/api/v1/hitl/items/item-suspend"))
4689 .respond_with(
4690 ResponseTemplate::new(200)
4691 .set_body_json(item_json("item-suspend", "awaiting")),
4692 )
4693 .mount(&mock)
4694 .await;
4695
4696 let ctx = workflow_ctx_with(vec![FakeLlmStep::Text(PLANTED_SENTINEL.into())]);
4697 let client = Arc::new(HitlClient::new(
4698 mock.uri(),
4699 SecretString::from("tok".to_string()),
4700 ));
4701 let cfg = Arc::new(hitl_cfg(Duration::from_millis(5)));
4702
4703 let server = McpServer::expose_workflow_with_schema(
4704 WorkflowAgent {
4705 name: "wf-suspend",
4706 },
4707 "you are a suspending workflow",
4708 input_schema(),
4709 gated_run_options(),
4710 client,
4711 cfg,
4712 one_shot_ctx_factory(ctx),
4713 )
4714 .unwrap();
4715
4716 let resp = server
4717 .handle_line(
4718 r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"wf-suspend","arguments":{"payload":"please-approve"}}}"#,
4719 )
4720 .await;
4721
4722 let text = resp["result"]["content"][0]["text"].as_str().unwrap();
4723 assert!(
4724 text.contains(r#""status":"suspended""#),
4725 "suspend response must carry status=suspended; got: {text}"
4726 );
4727 assert!(
4728 text.contains("workflow suspended for human review"),
4729 "suspend response must carry the safe wire reason; got: {text}"
4730 );
4731 assert!(
4732 !text.contains("policy reason that MUST NOT leak"),
4733 "raw ReviewPolicy reason leaked to peer: {text}"
4734 );
4735 assert!(
4736 !text.contains(PLANTED_SENTINEL),
4737 "checkpoint/conversation bytes leaked to peer: {text}"
4738 );
4739 }
4740
4741 #[tokio::test]
4745 async fn invoke_hitl_submit_failure_maps_to_sanitised_tool_error() {
4746 let mock = MockServer::start().await;
4747 Mock::given(method("POST"))
4748 .and(path("/api/v1/hitl/items"))
4749 .respond_with(ResponseTemplate::new(403).set_body_string("forbidden: token=xyz"))
4750 .mount(&mock)
4751 .await;
4752
4753 let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("never reached".into())]);
4754 let client = Arc::new(HitlClient::new(
4755 mock.uri(),
4756 SecretString::from("tok".to_string()),
4757 ));
4758 let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
4759
4760 let server = McpServer::expose_workflow_with_schema(
4761 WorkflowAgent { name: "wf-err" },
4762 "",
4763 input_schema(),
4764 gated_run_options(),
4765 client,
4766 cfg,
4767 one_shot_ctx_factory(ctx),
4768 )
4769 .unwrap();
4770
4771 let resp = server
4772 .handle_line(
4773 r#"{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"wf-err","arguments":{"payload":"x"}}}"#,
4774 )
4775 .await;
4776
4777 assert!(
4778 resp.get("error").is_some(),
4779 "submit failure must surface as JSON-RPC error; got: {resp}"
4780 );
4781 let msg = resp["error"]["message"].as_str().unwrap();
4782 assert!(
4783 msg.contains("tool invocation failed"),
4784 "wire message must be the sanitised stable string; got: {msg}"
4785 );
4786 assert!(
4787 !msg.contains("token=xyz") && !msg.contains("forbidden"),
4788 "internal HitlClientError detail leaked: {msg}"
4789 );
4790 }
4791
4792
4793 #[tokio::test]
4798 async fn caller_principal_does_not_enter_run_state() {
4799 use klieo_core::test_utils::noop_bus;
4800 const PRINCIPAL: &str = "alice-NEVER-IN-MEMORY@x";
4801
4802 let mock = MockServer::start().await;
4803 Mock::given(method("POST"))
4804 .and(path("/api/v1/hitl/items"))
4805 .respond_with(ResponseTemplate::new(500))
4806 .expect(0)
4807 .mount(&mock)
4808 .await;
4809
4810 let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("done".into())]);
4811 let short_term_for_probe = ctx.short_term.clone();
4812 let episodic_for_probe = ctx.episodic.clone();
4813 let run_id_for_probe = ctx.run_id;
4814 let client = Arc::new(HitlClient::new(
4815 mock.uri(),
4816 SecretString::from("tok".to_string()),
4817 ));
4818 let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
4819
4820 let server = Arc::new(
4821 McpServer::builder()
4822 .with_hitl(client, cfg)
4823 .add_workflow_with_schema(
4824 WorkflowAgent { name: "wf-no-leak" },
4825 "you are a workflow",
4826 input_schema(),
4827 plain_run_options(),
4828 one_shot_ctx_factory(ctx),
4829 )
4830 .build()
4831 .unwrap(),
4832 );
4833
4834 let (pubsub, _, kv, jobs) = noop_bus();
4835 let tool_ctx = klieo_core::tool::ToolCtx::new(pubsub, kv, jobs)
4836 .with_caller_principal(PRINCIPAL.into());
4837 let _result = server
4838 .invoker
4839 .invoke(
4840 "wf-no-leak",
4841 json!({"payload": "hi"}),
4842 tool_ctx,
4843 )
4844 .await
4845 .unwrap();
4846
4847 let history = short_term_for_probe
4848 .load(klieo_core::ids::ThreadId::new("wf-no-leak:any"), 8192)
4849 .await
4850 .unwrap_or_default();
4851 for msg in &history {
4852 assert!(
4853 !msg.content.contains(PRINCIPAL),
4854 "principal leaked into short-term memory: {}",
4855 msg.content
4856 );
4857 }
4858
4859 let episodes = episodic_for_probe
4862 .replay(run_id_for_probe)
4863 .await
4864 .expect("episodic replay must succeed");
4865 let expected_label = klieo_core::principal_hash(PRINCIPAL);
4866 let attributed_labels: Vec<&str> = episodes
4867 .iter()
4868 .filter_map(|e| match e {
4869 klieo_core::Episode::RunAttributed { tenant_label } => {
4870 Some(tenant_label.as_str())
4871 }
4872 _ => None,
4873 })
4874 .collect();
4875 assert_eq!(
4876 attributed_labels,
4877 vec![expected_label.as_str()],
4878 "exactly one RunAttributed carrying principal_hash; got {episodes:?}",
4879 );
4880 for ep in &episodes {
4881 let payload = serde_json::to_string(ep).expect("episode serialises");
4882 assert!(
4883 !payload.contains(PRINCIPAL),
4884 "raw principal leaked into recorded episode: {payload}",
4885 );
4886 }
4887 }
4888
4889 #[tokio::test]
4893 async fn no_principal_yields_no_run_attributed_episode() {
4894 use klieo_core::test_utils::noop_bus;
4895 let mock = MockServer::start().await;
4896 Mock::given(method("POST"))
4897 .and(path("/api/v1/hitl/items"))
4898 .respond_with(ResponseTemplate::new(500))
4899 .expect(0)
4900 .mount(&mock)
4901 .await;
4902 let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("done".into())]);
4903 let episodic_for_probe = ctx.episodic.clone();
4904 let run_id_for_probe = ctx.run_id;
4905 let client = Arc::new(HitlClient::new(
4906 mock.uri(),
4907 SecretString::from("tok".to_string()),
4908 ));
4909 let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
4910 let server = Arc::new(
4911 McpServer::builder()
4912 .with_hitl(client, cfg)
4913 .add_workflow_with_schema(
4914 WorkflowAgent { name: "wf-anon" },
4915 "you are a workflow",
4916 input_schema(),
4917 plain_run_options(),
4918 one_shot_ctx_factory(ctx),
4919 )
4920 .build()
4921 .unwrap(),
4922 );
4923 let (pubsub, _, kv, jobs) = noop_bus();
4924 let tool_ctx = klieo_core::tool::ToolCtx::new(pubsub, kv, jobs);
4925 let _ = server
4926 .invoker
4927 .invoke("wf-anon", json!({"payload": "hi"}), tool_ctx)
4928 .await
4929 .unwrap();
4930 let episodes = episodic_for_probe
4931 .replay(run_id_for_probe)
4932 .await
4933 .expect("episodic replay must succeed");
4934 let attributed_count = episodes
4935 .iter()
4936 .filter(|e| matches!(e, klieo_core::Episode::RunAttributed { .. }))
4937 .count();
4938 assert_eq!(
4939 attributed_count, 0,
4940 "RunAttributed must not appear without a caller_principal; got {episodes:?}",
4941 );
4942 }
4943
4944 async fn run_suspend_with(
4950 with_kv: bool,
4951 with_principal: bool,
4952 ) -> serde_json::Value {
4953 use klieo_core::test_utils::noop_bus;
4954 let mock = MockServer::start().await;
4955 Mock::given(method("POST"))
4956 .and(path("/api/v1/hitl/items"))
4957 .respond_with(
4958 ResponseTemplate::new(201)
4959 .set_body_json(item_json("item-suspend", "awaiting")),
4960 )
4961 .mount(&mock)
4962 .await;
4963 Mock::given(method("GET"))
4964 .and(path("/api/v1/hitl/items/item-suspend"))
4965 .respond_with(
4966 ResponseTemplate::new(200)
4967 .set_body_json(item_json("item-suspend", "awaiting")),
4968 )
4969 .mount(&mock)
4970 .await;
4971
4972 let ctx = workflow_ctx_with(vec![FakeLlmStep::Text(PLANTED_SENTINEL.into())]);
4973 let client = Arc::new(HitlClient::new(
4974 mock.uri(),
4975 SecretString::from("tok".to_string()),
4976 ));
4977 let cfg = Arc::new(hitl_cfg(Duration::from_millis(5)));
4978
4979 let mut builder = McpServer::builder()
4980 .with_hitl(client, cfg)
4981 .add_workflow_with_schema(
4982 WorkflowAgent { name: "wf-suspend" },
4983 "",
4984 input_schema(),
4985 gated_run_options(),
4986 one_shot_ctx_factory(ctx),
4987 );
4988 if with_kv {
4989 builder = builder.with_checkpoint_kv(fake_kv());
4990 }
4991 let server = Arc::new(builder.build().unwrap());
4992
4993 let (pubsub, _, kv, jobs) = noop_bus();
4994 let mut tool_ctx = klieo_core::tool::ToolCtx::new(pubsub, kv, jobs);
4995 if with_principal {
4996 tool_ctx = tool_ctx.with_caller_principal("alice@x".into());
4997 }
4998 server
4999 .invoker
5000 .invoke(
5001 "wf-suspend",
5002 json!({"payload": "please-approve"}),
5003 tool_ctx,
5004 )
5005 .await
5006 .unwrap()
5007 }
5008
5009 #[tokio::test]
5010 async fn suspend_with_kv_and_principal_issues_ticket() {
5011 let envelope = run_suspend_with(true, true).await;
5012 let body = envelope.to_string();
5013 assert_eq!(envelope["status"], "suspended");
5014 let ticket = envelope["ticket"].as_str().expect("ticket present");
5015 assert!(!ticket.is_empty(), "issued ticket must be non-empty");
5016 assert!(
5017 !body.contains(PLANTED_SENTINEL),
5018 "checkpoint bytes leaked: {body}"
5019 );
5020 assert!(
5021 !body.contains("policy reason that MUST NOT leak"),
5022 "raw policy reason leaked: {body}"
5023 );
5024 }
5025
5026 #[tokio::test]
5027 async fn suspend_without_kv_falls_back_to_no_ticket_envelope() {
5028 let envelope = run_suspend_with(false, true).await;
5029 assert_eq!(envelope["status"], "suspended");
5030 assert!(
5031 envelope.get("ticket").is_none(),
5032 "no checkpoint KV must yield slice-1 envelope (no ticket field)",
5033 );
5034 let body = envelope.to_string();
5035 assert!(!body.contains(PLANTED_SENTINEL));
5036 }
5037
5038 #[tokio::test]
5039 async fn suspend_without_principal_falls_back_to_no_ticket_envelope() {
5040 let envelope = run_suspend_with(true, false).await;
5041 assert_eq!(envelope["status"], "suspended");
5042 assert!(
5043 envelope.get("ticket").is_none(),
5044 "no caller principal must yield slice-1 envelope (no ticket field)",
5045 );
5046 let body = envelope.to_string();
5047 assert!(!body.contains(PLANTED_SENTINEL));
5048 }
5049
5050 #[tokio::test]
5056 async fn principal_b_cannot_consume_principal_a_ticket() {
5057 use crate::resume_ticket::{ResumeTicketRecord, ResumeTicketStore};
5058 let store = ResumeTicketStore::new(fake_kv());
5059 let token = ResumeTicketStore::mint_token();
5060 let cp_json = serde_json::json!({
5061 "run_id": klieo_core::ids::RunId::new(),
5062 "step_index": 1,
5063 "thread_id": "t-idor",
5064 "messages": [],
5065 "pending_tool_calls": null,
5066 "created_at": "2026-06-18T00:00:00Z",
5067 });
5068 let checkpoint = serde_json::from_value(cp_json).unwrap();
5069 let record = ResumeTicketRecord {
5070 principal: "alice@x".into(),
5071 workflow_name: "wf".into(),
5072 checkpoint,
5073 created_at: Utc::now(),
5074 };
5075 store.persist(&token, &record).await.unwrap();
5076
5077 let peeked = store.peek(&token).await.unwrap().expect("ticket present");
5079 let principal_b = "mallory@x";
5080 assert_ne!(
5081 peeked.principal, principal_b,
5082 "fixture must seed a distinct principal so the authz arm engages"
5083 );
5084 let consumed_by_alice = store.claim(&token).await.unwrap();
5087 assert!(
5088 consumed_by_alice.is_some(),
5089 "after a foreign-principal denial the rightful owner can still resume"
5090 );
5091 let after = store.claim(&token).await.unwrap();
5093 assert!(after.is_none(), "the now-consumed ticket cannot be reused");
5094 }
5095
5096 #[tokio::test]
5100 async fn concurrent_resume_runs_exactly_once() {
5101 use crate::resume_ticket::{ResumeTicketRecord, ResumeTicketStore};
5102 let store = Arc::new(ResumeTicketStore::new(fake_kv()));
5103 let token = ResumeTicketStore::mint_token();
5104 let cp_json = serde_json::json!({
5105 "run_id": klieo_core::ids::RunId::new(),
5106 "step_index": 1,
5107 "thread_id": "t-conc",
5108 "messages": [],
5109 "pending_tool_calls": null,
5110 "created_at": "2026-06-18T00:00:00Z",
5111 });
5112 let checkpoint = serde_json::from_value(cp_json).unwrap();
5113 let record = ResumeTicketRecord {
5114 principal: "alice@x".into(),
5115 workflow_name: "wf".into(),
5116 checkpoint,
5117 created_at: Utc::now(),
5118 };
5119 store.persist(&token, &record).await.unwrap();
5120 let racers: Vec<_> = (0..8)
5121 .map(|_| {
5122 let store = store.clone();
5123 let token = token.clone();
5124 tokio::spawn(async move { store.claim(&token).await })
5125 })
5126 .collect();
5127 let mut winners = 0usize;
5128 for handle in racers {
5129 if handle.await.unwrap().unwrap().is_some() {
5130 winners += 1;
5131 }
5132 }
5133 assert_eq!(
5134 winners, 1,
5135 "concurrent ticket consumption must run exactly once; got {winners}"
5136 );
5137 }
5138
5139 #[tokio::test]
5146 async fn approve_resume_drives_run_to_completion() {
5147 use klieo_core::checkpoint::ApprovalDecision;
5148 use std::sync::Mutex;
5149
5150 let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("approved".into())]);
5151 let cp_json = serde_json::json!({
5152 "run_id": ctx.run_id,
5153 "step_index": 1,
5154 "thread_id": "t-resume-approve",
5155 "messages": [],
5156 "pending_tool_calls": null,
5157 "created_at": "2026-06-18T00:00:00Z",
5158 });
5159 let checkpoint: klieo_core::checkpoint::RunCheckpoint =
5160 serde_json::from_value(cp_json).unwrap();
5161
5162 let client = Arc::new(HitlClient::new(
5163 "http://unused".to_string(),
5164 SecretString::from("tok".to_string()),
5165 ));
5166 let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
5167 let ctx_holder = Arc::new(Mutex::new(Some(ctx)));
5168 let ctx_factory: AgentContextFactory = Arc::new(move || {
5169 ctx_holder
5170 .lock()
5171 .unwrap()
5172 .take()
5173 .expect("ctx_factory drained")
5174 });
5175
5176 let invoker = Arc::new(crate::workflow::WorkflowAsToolInvoker::<WorkflowAgent>::new(
5177 "wf-resume".into(),
5178 "".into(),
5179 input_schema(),
5180 ctx_factory,
5181 plain_run_options(),
5182 crate::workflow::HitlBundle { client, cfg },
5183 None,
5184 #[cfg(feature = "governor")]
5185 None,
5186 ));
5187
5188 let handle: Arc<dyn crate::workflow::WorkflowResumeHandle> = invoker;
5189 let result = handle
5190 .resume(checkpoint, ApprovalDecision::Approved, "hashed-tenant".into())
5191 .await
5192 .unwrap();
5193 assert_eq!(result, serde_json::Value::String("approved".into()));
5194 }
5195
5196 #[tokio::test]
5200 async fn reject_resume_feeds_reason_back_to_model() {
5201 use klieo_core::checkpoint::ApprovalDecision;
5202 use klieo_core::llm::Role;
5203 use std::sync::Mutex;
5204
5205 let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("acknowledged".into())]);
5206 let short_term_for_probe = ctx.short_term.clone();
5207 let cp_json = serde_json::json!({
5208 "run_id": ctx.run_id,
5209 "step_index": 1,
5210 "thread_id": "t-resume-reject",
5211 "messages": [],
5212 "pending_tool_calls": null,
5213 "created_at": "2026-06-18T00:00:00Z",
5214 });
5215 let checkpoint: klieo_core::checkpoint::RunCheckpoint =
5216 serde_json::from_value(cp_json).unwrap();
5217
5218 let client = Arc::new(HitlClient::new(
5219 "http://unused".to_string(),
5220 SecretString::from("tok".to_string()),
5221 ));
5222 let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
5223 let ctx_holder = Arc::new(Mutex::new(Some(ctx)));
5224 let ctx_factory: AgentContextFactory = Arc::new(move || {
5225 ctx_holder
5226 .lock()
5227 .unwrap()
5228 .take()
5229 .expect("ctx_factory drained")
5230 });
5231
5232 let invoker = Arc::new(crate::workflow::WorkflowAsToolInvoker::<WorkflowAgent>::new(
5233 "wf-reject".into(),
5234 "".into(),
5235 input_schema(),
5236 ctx_factory,
5237 plain_run_options(),
5238 crate::workflow::HitlBundle { client, cfg },
5239 None,
5240 #[cfg(feature = "governor")]
5241 None,
5242 ));
5243
5244 let handle: Arc<dyn crate::workflow::WorkflowResumeHandle> = invoker;
5245 let _ = handle
5246 .resume(
5247 checkpoint,
5248 ApprovalDecision::Rejected {
5249 reason: "BAD-IDEA-XYZ".into(),
5250 },
5251 "hashed-tenant".into(),
5252 )
5253 .await
5254 .unwrap();
5255
5256 let history = short_term_for_probe
5257 .load(
5258 klieo_core::ids::ThreadId::new("t-resume-reject"),
5259 8192,
5260 )
5261 .await
5262 .unwrap();
5263 let rejection_seen = history
5264 .iter()
5265 .any(|m| m.role == Role::Tool && m.content.contains("BAD-IDEA-XYZ"));
5266 assert!(
5267 rejection_seen,
5268 "the model must see the operator's rejection reason on resume"
5269 );
5270 }
5271
5272 #[test]
5275 fn builder_rejects_workflow_without_hitl() {
5276 let ctx_factory: AgentContextFactory = Arc::new(|| fake_context("guard-test"));
5277 let err = McpServer::builder()
5278 .add_workflow_with_schema(
5279 WorkflowAgent { name: "wf-guard" },
5280 "",
5281 input_schema(),
5282 plain_run_options(),
5283 ctx_factory,
5284 )
5285 .build()
5286 .err()
5287 .expect("workflow without with_hitl must fail build");
5288 assert!(
5289 matches!(err, McpBuildError::WorkflowWithoutHitl),
5290 "expected WorkflowWithoutHitl, got: {err:?}"
5291 );
5292 }
5293 }
5294
5295 #[test]
5296 fn resume_errors_render_messages() {
5297 let a = McpServerError::ResumeBufferExpired { since_id: 7 };
5298 assert_eq!(a.to_string(), "resume window expired (since_id=7)");
5299 let b = McpServerError::ResumeBufferNotFound("tok".into());
5300 assert_eq!(b.to_string(), "no buffered stream for progressToken: tok");
5301 }
5302
5303 #[test]
5304 fn from_server_outbound_serialisation_maps_to_outbound_serialisation() {
5305 use klieo_core::ServerOutboundError;
5306 let serde_err = serde_json::from_str::<serde_json::Value>("{invalid}").unwrap_err();
5307 let mcp_err = McpServerError::from(ServerOutboundError::Serialisation(serde_err));
5308 assert!(
5309 matches!(mcp_err, McpServerError::OutboundSerialisation(_)),
5310 "ServerOutboundError::Serialisation must map to McpServerError::OutboundSerialisation; got {mcp_err:?}"
5311 );
5312 use std::error::Error;
5313 assert!(
5314 mcp_err.source().is_some(),
5315 "McpServerError::OutboundSerialisation must expose source via #[source]"
5316 );
5317 }
5318
5319 struct NamedAuthn;
5320
5321 #[async_trait]
5322 impl klieo_auth_common::Authenticator for NamedAuthn {
5323 async fn authenticate(
5324 &self,
5325 _headers: &dyn klieo_auth_common::Headers,
5326 _payload: &[u8],
5327 ) -> Result<klieo_auth_common::Identity, klieo_auth_common::AuthError> {
5328 Ok(klieo_auth_common::Identity::new("alice"))
5329 }
5330 }
5331
5332 #[test]
5333 fn regulated_without_tenant_kv_fails_closed() {
5334 let err = McpServer::builder()
5335 .add_tools(Arc::new(OneToolInvoker))
5336 .with_authenticator(Arc::new(NamedAuthn))
5337 .profile(klieo_core::DeploymentProfile::RegulatedMultiTenant)
5338 .build()
5339 .err()
5340 .expect("must fail closed");
5341 assert!(matches!(
5342 err,
5343 McpBuildError::RegulatedProfile(klieo_core::ProfileViolation::MissingTenantKv)
5344 ));
5345 }
5346
5347 #[test]
5348 fn regulated_without_authenticator_fails_closed() {
5349 let kv = klieo_bus_memory::MemoryBus::new().kv;
5350 let err = McpServer::builder()
5351 .add_tools(Arc::new(OneToolInvoker))
5352 .with_tenant_binding(kv)
5353 .profile(klieo_core::DeploymentProfile::RegulatedMultiTenant)
5354 .build()
5355 .err()
5356 .expect("must fail closed");
5357 assert!(matches!(
5358 err,
5359 McpBuildError::RegulatedProfile(klieo_core::ProfileViolation::AnonymousAuth)
5360 ));
5361 }
5362
5363 struct AnonAuthn;
5364
5365 #[async_trait]
5366 impl klieo_auth_common::Authenticator for AnonAuthn {
5367 async fn authenticate(
5368 &self,
5369 _headers: &dyn klieo_auth_common::Headers,
5370 _payload: &[u8],
5371 ) -> Result<klieo_auth_common::Identity, klieo_auth_common::AuthError> {
5372 Ok(klieo_auth_common::Identity::anonymous())
5373 }
5374
5375 fn allows_anonymous(&self) -> bool {
5376 true
5377 }
5378 }
5379
5380 #[test]
5381 fn regulated_with_anonymous_authenticator_fails_closed() {
5382 let kv = klieo_bus_memory::MemoryBus::new().kv;
5383 let err = McpServer::builder()
5384 .add_tools(Arc::new(OneToolInvoker))
5385 .with_tenant_binding(kv)
5386 .with_authenticator(Arc::new(AnonAuthn))
5387 .profile(klieo_core::DeploymentProfile::RegulatedMultiTenant)
5388 .build()
5389 .err()
5390 .expect("must fail closed");
5391 assert!(matches!(
5392 err,
5393 McpBuildError::RegulatedProfile(klieo_core::ProfileViolation::AnonymousAuth)
5394 ));
5395 }
5396
5397 #[test]
5398 fn regulated_forces_strict_over_lenient_binding() {
5399 let kv = klieo_bus_memory::MemoryBus::new().kv;
5400 let server = McpServer::builder()
5401 .add_tools(Arc::new(OneToolInvoker))
5402 .with_tenant_binding(kv) .with_authenticator(Arc::new(NamedAuthn))
5404 .profile(klieo_core::DeploymentProfile::RegulatedMultiTenant)
5405 .build()
5406 .expect("regulated build with named auth + kv must succeed");
5407 assert_eq!(
5408 server.ownership_registry.as_ref().map(|r| r.is_strict()),
5409 Some(true)
5410 );
5411 }
5412
5413 #[test]
5414 fn unprofiled_keeps_lenient_binding() {
5415 let kv = klieo_bus_memory::MemoryBus::new().kv;
5416 let server = McpServer::builder()
5417 .add_tools(Arc::new(OneToolInvoker))
5418 .with_tenant_binding(kv)
5419 .with_authenticator(Arc::new(NamedAuthn))
5420 .build()
5421 .expect("unprofiled build ok");
5422 assert_eq!(
5423 server.ownership_registry.as_ref().map(|r| r.is_strict()),
5424 Some(false)
5425 );
5426 }
5427}
5428
5429#[cfg(test)]
5430#[cfg(feature = "http")]
5431mod jsonrpc_const_tests {
5432 use super::*;
5433
5434 #[test]
5435 fn jsonrpc_constants_are_i64_and_unique_at_runtime() {
5436 let codes: [i64; 9] = [
5441 JSONRPC_PARSE_ERROR,
5442 JSONRPC_METHOD_NOT_FOUND,
5443 JSONRPC_INVALID_PARAMS,
5444 JSONRPC_SERVER_ERROR,
5445 JSONRPC_UNAUTHENTICATED,
5446 JSONRPC_RESUME_BUFFER_EXPIRED,
5447 JSONRPC_RESUME_BUFFER_NOT_FOUND,
5448 JSONRPC_LEADER_DIED,
5449 JSONRPC_SESSION_CONFLICT,
5450 ];
5451 let mut seen = std::collections::HashSet::new();
5452 let mut duplicates: Vec<i64> = Vec::new();
5453 for code in codes {
5454 if !seen.insert(code) {
5455 duplicates.push(code);
5456 }
5457 }
5458 assert!(
5459 duplicates.is_empty(),
5460 "JSONRPC_* codes must be unique; found duplicates: {duplicates:?}"
5461 );
5462
5463 let mut local = seen.clone();
5467 assert!(
5468 !local.insert(JSONRPC_PARSE_ERROR),
5469 "duplicate detection logic broken"
5470 );
5471 }
5472}