1use std::hash::{Hash, Hasher};
53use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
54use std::time::{Duration, Instant};
55
56use dashmap::DashMap;
57use rustc_hash::FxHasher;
58use serde_json::Value;
59
60use std::sync::Arc;
61
62use crate::error::{McpError, Result};
63use crate::retry::{retry_mcp_call, McpRetryConfig};
64use crate::rmcp_adapter::RmcpClientAdapter;
65use crate::types::{ContentBlock, McpConfig, ResourceContent, ToolCallResult, ToolDefinition};
66use crate::validation::{ErrorEnhancer, McpValidator, ValidationConfig, ValidationErrorKind};
67use nika_event::{EventKind, EventLog};
68
69#[derive(Debug, Clone)]
75pub struct McpPingResult {
76 pub server: String,
78
79 pub latency: Duration,
81
82 pub tool_count: usize,
84
85 pub was_connected: bool,
87}
88
89#[derive(Debug, Clone)]
91pub enum McpPingError {
92 StartFailed { server: String, details: String },
94
95 Timeout { server: String, timeout: Duration },
97
98 ConnectionRefused { server: String },
100
101 ServerError { server: String, details: String },
103}
104
105impl std::fmt::Display for McpPingError {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 match self {
108 McpPingError::StartFailed { server, details } => {
109 write!(f, "MCP server '{}' failed to start: {}", server, details)
110 }
111 McpPingError::Timeout { server, timeout } => {
112 write!(f, "MCP server '{}' timed out after {:?}", server, timeout)
113 }
114 McpPingError::ConnectionRefused { server } => {
115 write!(f, "MCP server '{}' connection refused", server)
116 }
117 McpPingError::ServerError { server, details } => {
118 write!(f, "MCP server '{}' error: {}", server, details)
119 }
120 }
121 }
122}
123
124impl McpPingError {
125 pub fn suggestion(&self) -> &'static str {
127 match self {
128 McpPingError::StartFailed { .. } => {
129 "Check the MCP server command is correct and the executable exists"
130 }
131 McpPingError::Timeout { .. } => {
132 "The MCP server may be slow to start. Try increasing the timeout"
133 }
134 McpPingError::ConnectionRefused { .. } => {
135 "Ensure the MCP server is running and accessible"
136 }
137 McpPingError::ServerError { .. } => "Check the MCP server logs for more details",
138 }
139 }
140}
141
142#[derive(Debug, Clone)]
159pub struct CacheConfig {
160 pub ttl: Duration,
162
163 pub max_entries: usize,
165}
166
167impl Default for CacheConfig {
168 fn default() -> Self {
169 Self {
170 ttl: Duration::from_secs(300), max_entries: 1000,
172 }
173 }
174}
175
176#[derive(Debug, Clone)]
182struct CacheEntry {
183 result: Arc<ToolCallResult>,
185
186 created_at: Instant,
188}
189
190impl CacheEntry {
191 fn new(result: Arc<ToolCallResult>) -> Self {
192 Self {
193 result,
194 created_at: Instant::now(),
195 }
196 }
197
198 fn is_expired(&self, ttl: Duration) -> bool {
199 self.created_at.elapsed() > ttl
200 }
201}
202
203#[derive(Debug)]
207struct ResponseCache {
208 config: CacheConfig,
210
211 entries: DashMap<String, CacheEntry, rustc_hash::FxBuildHasher>,
213
214 hits: AtomicU64,
216
217 misses: AtomicU64,
219}
220
221impl ResponseCache {
222 fn new(config: CacheConfig) -> Self {
223 Self {
224 config,
225 entries: DashMap::default(),
226 hits: AtomicU64::new(0),
227 misses: AtomicU64::new(0),
228 }
229 }
230
231 fn cache_key(tool: &str, params: &Value) -> String {
236 let mut hasher = FxHasher::default();
237 let canonical = Self::canonicalize_value(params);
239 let params_str = match serde_json::to_string(&canonical) {
240 Ok(s) => s,
241 Err(e) => {
242 tracing::warn!(
243 tool = tool,
244 error = %e,
245 "JSON serialization failed for cache key, using Debug format"
246 );
247 format!("{:?}", params)
248 }
249 };
250 params_str.hash(&mut hasher);
251 format!("{}:{:016x}", tool, hasher.finish())
252 }
253
254 const MAX_CANONICALIZE_DEPTH: usize = 128;
256
257 fn canonicalize_value(value: &Value) -> Value {
262 Self::canonicalize_value_inner(value, 0)
263 }
264
265 fn canonicalize_value_inner(value: &Value, depth: usize) -> Value {
266 if depth >= Self::MAX_CANONICALIZE_DEPTH {
267 return value.clone();
268 }
269 match value {
270 Value::Object(map) => {
271 let mut sorted: serde_json::Map<String, Value> = serde_json::Map::new();
272 let mut keys: Vec<&String> = map.keys().collect();
273 keys.sort();
274 for key in keys {
275 sorted.insert(
276 key.clone(),
277 Self::canonicalize_value_inner(&map[key], depth + 1),
278 );
279 }
280 Value::Object(sorted)
281 }
282 Value::Array(arr) => Value::Array(
283 arr.iter()
284 .map(|v| Self::canonicalize_value_inner(v, depth + 1))
285 .collect(),
286 ),
287 other => other.clone(),
288 }
289 }
290
291 fn get(&self, tool: &str, params: &Value) -> Option<Arc<ToolCallResult>> {
296 let key = Self::cache_key(tool, params);
297
298 if let Some(entry) = self.entries.get(&key) {
299 if entry.is_expired(self.config.ttl) {
300 let ttl = self.config.ttl;
303 drop(entry);
304 self.entries.remove_if(&key, |_, e| e.is_expired(ttl));
305 self.misses.fetch_add(1, Ordering::Relaxed);
306 return None;
307 }
308
309 self.hits.fetch_add(1, Ordering::Relaxed);
310 return Some(Arc::clone(&entry.result));
311 }
312
313 self.misses.fetch_add(1, Ordering::Relaxed);
314 None
315 }
316
317 fn put(&self, tool: &str, params: &Value, result: ToolCallResult) {
321 if result.is_error {
323 return;
324 }
325
326 let key = Self::cache_key(tool, params);
327
328 if self.entries.len() >= self.config.max_entries {
330 self.evict_oldest();
331 }
332
333 self.entries.insert(key, CacheEntry::new(Arc::new(result)));
334 }
335
336 fn evict_oldest(&self) {
341 let to_remove = (self.config.max_entries / 10).max(1);
342 let mut entries: Vec<(String, Instant)> = self
343 .entries
344 .iter()
345 .map(|e| (e.key().clone(), e.created_at))
346 .collect();
347
348 if entries.len() <= to_remove {
349 for (key, _) in &entries {
351 self.entries.remove(key);
352 }
353 return;
354 }
355
356 entries.select_nth_unstable_by_key(to_remove - 1, |(_, created)| *created);
358
359 for (key, _) in entries.iter().take(to_remove) {
360 self.entries.remove(key);
361 }
362 }
363
364 fn clear(&self) {
366 self.entries.clear();
367 self.hits.store(0, Ordering::Relaxed);
368 self.misses.store(0, Ordering::Relaxed);
369 }
370
371 fn stats(&self) -> ResponseCacheStats {
373 ResponseCacheStats {
374 entries: self.entries.len(),
375 hits: self.hits.load(Ordering::Relaxed),
376 misses: self.misses.load(Ordering::Relaxed),
377 }
378 }
379}
380
381#[derive(Debug, Clone, Default)]
383pub struct ResponseCacheStats {
384 pub entries: usize,
386
387 pub hits: u64,
389
390 pub misses: u64,
392}
393
394impl ResponseCacheStats {
395 pub fn hit_rate(&self) -> f64 {
397 let total = self.hits + self.misses;
398 if total == 0 {
399 0.0
400 } else {
401 self.hits as f64 / total as f64
402 }
403 }
404}
405
406pub struct McpClient {
426 name: String,
428
429 connected: AtomicBool,
433
434 is_mock: bool,
436
437 adapter: Option<RmcpClientAdapter>,
439
440 validator: Option<McpValidator>,
442
443 cache: Option<ResponseCache>,
445
446 reconnecting: AtomicBool,
448}
449
450impl std::fmt::Debug for McpClient {
451 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452 f.debug_struct("McpClient")
453 .field("name", &self.name)
454 .field("connected", &self.connected)
455 .field("is_mock", &self.is_mock)
456 .field("has_adapter", &self.adapter.is_some())
457 .field("has_validator", &self.validator.is_some())
458 .field("has_cache", &self.cache.is_some())
459 .finish()
460 }
461}
462
463impl McpClient {
464 pub fn new(config: McpConfig) -> Result<Self> {
484 if config.name.is_empty() {
486 return Err(McpError::ValidationError {
487 reason: "MCP server name cannot be empty".to_string(),
488 });
489 }
490
491 if config.command.is_empty() {
492 return Err(McpError::ValidationError {
493 reason: "MCP server command cannot be empty".to_string(),
494 });
495 }
496
497 let name = config.name.clone();
498 let adapter = RmcpClientAdapter::new(config);
499
500 Ok(Self {
501 name,
502 connected: AtomicBool::new(false),
503 is_mock: false,
504 adapter: Some(adapter),
505 validator: None,
506 cache: None,
507 reconnecting: AtomicBool::new(false),
508 })
509 }
510
511 pub fn with_validation(mut self, config: ValidationConfig) -> Self {
525 self.validator = Some(McpValidator::new(config));
526 self
527 }
528
529 pub fn with_cache(mut self, config: CacheConfig) -> Self {
549 self.cache = Some(ResponseCache::new(config));
550 self
551 }
552
553 pub fn cache_stats(&self) -> Option<ResponseCacheStats> {
557 self.cache.as_ref().map(|c| c.stats())
558 }
559
560 pub fn mock(name: &str) -> Self {
574 Self {
575 name: name.to_string(),
576 connected: AtomicBool::new(true), is_mock: true,
578 adapter: None,
579 validator: None,
580 cache: None,
581 reconnecting: AtomicBool::new(false),
582 }
583 }
584
585 pub fn name(&self) -> &str {
587 &self.name
588 }
589
590 pub fn is_connected(&self) -> bool {
595 if self.is_mock {
596 return self.connected.load(Ordering::SeqCst);
597 }
598 self.adapter
600 .as_ref()
601 .map(|a| a.is_connected_sync())
602 .unwrap_or(false)
603 }
604
605 pub async fn is_connected_async(&self) -> bool {
607 if self.is_mock {
608 return self.connected.load(Ordering::SeqCst);
609 }
610 if let Some(adapter) = &self.adapter {
611 adapter.is_connected().await
612 } else {
613 false
614 }
615 }
616
617 pub async fn ping(&self) -> std::result::Result<McpPingResult, McpPingError> {
632 let start = Instant::now();
633 let was_connected = self.is_connected_async().await;
634
635 if self.is_mock {
637 return Ok(McpPingResult {
638 server: self.name.clone(),
639 latency: start.elapsed(),
640 tool_count: self.mock_list_tools().len(),
641 was_connected: true,
642 });
643 }
644
645 if !was_connected {
647 if let Err(e) = self.connect().await {
648 let error_msg = e.to_string().to_lowercase();
649 if error_msg.contains("refused") || error_msg.contains("connection") {
650 return Err(McpPingError::ConnectionRefused {
651 server: self.name.clone(),
652 });
653 }
654 return Err(McpPingError::StartFailed {
655 server: self.name.clone(),
656 details: e.to_string(),
657 });
658 }
659 }
660
661 match tokio::time::timeout(Duration::from_secs(10), self.list_tools()).await {
663 Ok(Ok(tools)) => Ok(McpPingResult {
664 server: self.name.clone(),
665 latency: start.elapsed(),
666 tool_count: tools.len(),
667 was_connected,
668 }),
669 Ok(Err(e)) => Err(McpPingError::ServerError {
670 server: self.name.clone(),
671 details: e.to_string(),
672 }),
673 Err(_) => Err(McpPingError::Timeout {
674 server: self.name.clone(),
675 timeout: Duration::from_secs(10),
676 }),
677 }
678 }
679
680 pub fn is_configured(&self) -> bool {
689 self.is_mock || self.adapter.is_some()
690 }
691
692 pub async fn connect(&self) -> Result<()> {
706 if self.is_mock {
707 self.connected.store(true, Ordering::SeqCst);
708 if let Some(ref validator) = self.validator {
710 let tools = self.mock_list_tools();
711 validator
712 .cache()
713 .populate(&self.name, &tools)
714 .map_err(|e| McpError::McpSchemaError {
715 tool: "*".to_string(),
716 reason: format!("Failed to cache mock tool schemas: {}", e),
717 })?;
718 }
719 return Ok(());
720 }
721
722 let adapter = self
723 .adapter
724 .as_ref()
725 .ok_or_else(|| McpError::McpNotConnected {
726 name: self.name.clone(),
727 })?;
728
729 adapter.connect().await?;
730 self.connected.store(true, Ordering::SeqCst);
731
732 if let Some(ref validator) = self.validator {
734 let tools = adapter.list_tools().await?;
735 validator
736 .cache()
737 .populate(&self.name, &tools)
738 .map_err(|e| McpError::McpSchemaError {
739 tool: "*".to_string(),
740 reason: format!("Failed to cache tool schemas: {}", e),
741 })?;
742 tracing::debug!(
743 mcp_server = %self.name,
744 tools_cached = tools.len(),
745 "Cached tool schemas for validation"
746 );
747 }
748
749 Ok(())
750 }
751
752 pub async fn disconnect(&self) -> Result<()> {
759 if self.is_mock {
760 self.connected.store(false, Ordering::SeqCst);
761 return Ok(());
762 }
763
764 if let Some(adapter) = &self.adapter {
765 adapter.disconnect().await?;
766 }
767
768 if let Some(ref cache) = self.cache {
770 cache.clear();
771 }
772
773 if let Some(ref validator) = self.validator {
775 validator.cache().clear();
776 }
777
778 self.connected.store(false, Ordering::SeqCst);
779 Ok(())
780 }
781
782 pub async fn reconnect(&self) -> Result<()> {
799 if self.is_mock {
800 self.connected.store(true, Ordering::SeqCst);
801 return Ok(());
802 }
803
804 if self
806 .reconnecting
807 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
808 .is_err()
809 {
810 tracing::debug!(
811 mcp_server = %self.name,
812 "Reconnect already in progress, skipping"
813 );
814 return Ok(());
815 }
816
817 let result = self.reconnect_inner().await;
819 self.reconnecting.store(false, Ordering::SeqCst);
820 result
821 }
822
823 async fn reconnect_inner(&self) -> Result<()> {
825 self.disconnect().await?;
827
828 let adapter = self
830 .adapter
831 .as_ref()
832 .ok_or_else(|| McpError::McpNotConnected {
833 name: self.name.clone(),
834 })?;
835
836 adapter.reconnect().await?;
837 self.connected.store(true, Ordering::SeqCst);
838
839 if let Some(ref validator) = self.validator {
841 let tools = adapter.list_tools().await?;
842 validator
843 .cache()
844 .populate(&self.name, &tools)
845 .map_err(|e| McpError::McpSchemaError {
846 tool: "*".to_string(),
847 reason: format!("Failed to cache tool schemas after reconnect: {}", e),
848 })?;
849 tracing::debug!(
850 mcp_server = %self.name,
851 tools_cached = tools.len(),
852 "Re-populated tool schemas after reconnect"
853 );
854 }
855
856 Ok(())
857 }
858
859 pub fn is_connection_error(error: &McpError) -> bool {
864 let error_str = error.to_string().to_lowercase();
865 error_str.contains("broken pipe")
866 || error_str.contains("connection reset")
867 || error_str.contains("connection refused")
868 || error_str.contains("eof")
869 || error_str.contains("stdin not available")
870 || error_str.contains("stdout not available")
871 || error_str.contains("transport closed")
872 || error_str.contains("transport send")
873 }
874
875 fn enhance_error(&self, tool_name: &str, error: McpError) -> McpError {
877 if let Some(ref validator) = self.validator {
878 if validator.config().enhance_errors {
879 let enhancer = ErrorEnhancer::new(validator.cache());
880 return enhancer.enhance(&self.name, tool_name, error);
881 }
882 }
883 error
884 }
885
886 pub async fn call_tool(&self, name: &str, params: Value) -> Result<ToolCallResult> {
915 if let Some(ref validator) = self.validator {
917 if validator.config().pre_validate {
918 let result = validator.validate(&self.name, name, ¶ms);
919 if !result.is_valid {
920 let missing: Vec<String> = result
922 .errors
923 .iter()
924 .filter_map(|e| {
925 if let ValidationErrorKind::MissingRequired { field } = &e.kind {
926 Some(field.clone())
927 } else {
928 None
929 }
930 })
931 .collect();
932
933 let suggestions: Vec<String> = result
934 .errors
935 .iter()
936 .filter_map(|e| {
937 if let ValidationErrorKind::UnknownField { suggestions, .. } = &e.kind {
938 Some(suggestions.clone())
939 } else {
940 None
941 }
942 })
943 .flatten()
944 .collect();
945
946 let details = result
947 .errors
948 .iter()
949 .map(|e| e.message.clone())
950 .collect::<Vec<_>>()
951 .join("; ");
952
953 return Err(McpError::McpValidationFailed {
954 tool: name.to_string(),
955 details,
956 missing,
957 suggestions,
958 });
959 }
960 }
961 }
962
963 if let Some(ref cache) = self.cache {
965 if let Some(cached_result) = cache.get(name, ¶ms) {
966 tracing::debug!(
967 mcp_server = %self.name,
968 tool = %name,
969 "Cache hit for MCP tool call"
970 );
971 let mut result = (*cached_result).clone();
972 result.was_cached = true;
973 return Ok(result);
974 }
975 }
976
977 if self.is_mock {
978 if !self.connected.load(Ordering::SeqCst) {
979 return Err(McpError::McpNotConnected {
980 name: self.name.clone(),
981 });
982 }
983 let result = self.mock_tool_call(name, ¶ms);
984 if let Some(ref cache) = self.cache {
986 cache.put(name, ¶ms, result.clone());
987 }
988 return Ok(result);
989 }
990
991 let adapter = self
993 .adapter
994 .as_ref()
995 .ok_or_else(|| McpError::McpNotConnected {
996 name: self.name.clone(),
997 })?;
998
999 let result = retry_mcp_call(McpRetryConfig::default(), || {
1000 let params = params.clone();
1001 async move {
1002 match adapter.call_tool(name, params).await {
1003 Ok(result) => Ok(result),
1004 Err(e) => {
1005 let enhanced = self.enhance_error(name, e);
1006 if Self::is_connection_error(&enhanced) {
1008 tracing::warn!(
1009 mcp_server = %self.name,
1010 tool = %name,
1011 error = %enhanced,
1012 "Connection error, attempting reconnect"
1013 );
1014 if let Err(reconnect_err) = self.reconnect().await {
1015 tracing::error!(
1016 mcp_server = %self.name,
1017 error = %reconnect_err,
1018 "Failed to reconnect"
1019 );
1020 }
1021 }
1022 Err(enhanced)
1023 }
1024 }
1025 }
1026 })
1027 .await?;
1028
1029 if let Some(ref cache) = self.cache {
1031 cache.put(name, ¶ms, result.clone());
1032 tracing::debug!(
1033 mcp_server = %self.name,
1034 tool = %name,
1035 "Cached MCP tool response"
1036 );
1037 }
1038 Ok(result)
1039 }
1040
1041 pub async fn call_tool_with_retry_events(
1065 &self,
1066 name: &str,
1067 params: Value,
1068 task_id: &Arc<str>,
1069 event_log: &EventLog,
1070 ) -> Result<ToolCallResult> {
1071 if let Some(ref validator) = self.validator {
1073 if validator.config().pre_validate {
1074 let result = validator.validate(&self.name, name, ¶ms);
1075 if !result.is_valid {
1076 let missing: Vec<String> = result
1077 .errors
1078 .iter()
1079 .filter_map(|e| {
1080 if let ValidationErrorKind::MissingRequired { field } = &e.kind {
1081 Some(field.clone())
1082 } else {
1083 None
1084 }
1085 })
1086 .collect();
1087
1088 let suggestions: Vec<String> = result
1089 .errors
1090 .iter()
1091 .filter_map(|e| {
1092 if let ValidationErrorKind::UnknownField { suggestions, .. } = &e.kind {
1093 Some(suggestions.clone())
1094 } else {
1095 None
1096 }
1097 })
1098 .flatten()
1099 .collect();
1100
1101 let details = result
1102 .errors
1103 .iter()
1104 .map(|e| e.message.clone())
1105 .collect::<Vec<_>>()
1106 .join("; ");
1107
1108 return Err(McpError::McpValidationFailed {
1109 tool: name.to_string(),
1110 details,
1111 missing,
1112 suggestions,
1113 });
1114 }
1115 }
1116 }
1117
1118 if let Some(ref cache) = self.cache {
1120 if let Some(cached_result) = cache.get(name, ¶ms) {
1121 tracing::debug!(
1122 mcp_server = %self.name,
1123 tool = %name,
1124 "Cache hit for MCP tool call"
1125 );
1126 let mut result = (*cached_result).clone();
1127 result.was_cached = true;
1128 return Ok(result);
1129 }
1130 }
1131
1132 if self.is_mock {
1133 if !self.connected.load(Ordering::SeqCst) {
1134 return Err(McpError::McpNotConnected {
1135 name: self.name.clone(),
1136 });
1137 }
1138 let result = self.mock_tool_call(name, ¶ms);
1139 if let Some(ref cache) = self.cache {
1140 cache.put(name, ¶ms, result.clone());
1141 }
1142 return Ok(result);
1143 }
1144
1145 let adapter = self
1147 .adapter
1148 .as_ref()
1149 .ok_or_else(|| McpError::McpNotConnected {
1150 name: self.name.clone(),
1151 })?;
1152
1153 let config = McpRetryConfig::default();
1154 let max_attempts = config.max_retries + 1; let attempt_counter = std::sync::atomic::AtomicU32::new(0);
1156
1157 let result = retry_mcp_call(config, || {
1158 let params = params.clone();
1159 async {
1160 let attempt = attempt_counter.fetch_add(1, Ordering::SeqCst);
1161 match adapter.call_tool(name, params).await {
1162 Ok(result) => Ok(result),
1163 Err(e) => {
1164 let enhanced = self.enhance_error(name, e);
1165 if Self::is_connection_error(&enhanced) {
1166 event_log.emit(EventKind::McpRetry {
1168 task_id: Arc::clone(task_id),
1169 server_name: self.name.clone(),
1170 operation: name.to_string(),
1171 attempt: attempt + 1,
1172 max_attempts: max_attempts as u32,
1173 error: enhanced.to_string(),
1174 });
1175 tracing::warn!(
1176 mcp_server = %self.name,
1177 tool = %name,
1178 attempt = attempt + 1,
1179 error = %enhanced,
1180 "Connection error, attempting reconnect (McpRetry event emitted)"
1181 );
1182 if let Err(reconnect_err) = self.reconnect().await {
1183 tracing::error!(
1184 mcp_server = %self.name,
1185 error = %reconnect_err,
1186 "Failed to reconnect"
1187 );
1188 }
1189 }
1190 Err(enhanced)
1191 }
1192 }
1193 }
1194 })
1195 .await?;
1196
1197 if let Some(ref cache) = self.cache {
1199 cache.put(name, ¶ms, result.clone());
1200 tracing::debug!(
1201 mcp_server = %self.name,
1202 tool = %name,
1203 "Cached MCP tool response"
1204 );
1205 }
1206 Ok(result)
1207 }
1208
1209 pub async fn read_resource(&self, uri: &str) -> Result<ResourceContent> {
1226 if self.is_mock {
1227 if !self.connected.load(Ordering::SeqCst) {
1228 return Err(McpError::McpNotConnected {
1229 name: self.name.clone(),
1230 });
1231 }
1232 return Ok(self.mock_read_resource(uri));
1233 }
1234
1235 let adapter = self
1237 .adapter
1238 .as_ref()
1239 .ok_or_else(|| McpError::McpNotConnected {
1240 name: self.name.clone(),
1241 })?;
1242
1243 retry_mcp_call(McpRetryConfig::default(), || {
1244 async move {
1245 match adapter.read_resource(uri).await {
1246 Ok(result) => Ok(result),
1247 Err(e) => {
1248 if Self::is_connection_error(&e) {
1250 tracing::warn!(
1251 mcp_server = %self.name,
1252 uri = %uri,
1253 error = %e,
1254 "Connection error, attempting reconnect"
1255 );
1256 if let Err(reconnect_err) = self.reconnect().await {
1257 tracing::error!(
1258 mcp_server = %self.name,
1259 error = %reconnect_err,
1260 "Failed to reconnect"
1261 );
1262 }
1263 }
1264 Err(e)
1265 }
1266 }
1267 }
1268 })
1269 .await
1270 }
1271
1272 pub async fn list_tools(&self) -> Result<Vec<ToolDefinition>> {
1287 if self.is_mock {
1288 if !self.connected.load(Ordering::SeqCst) {
1289 return Err(McpError::McpNotConnected {
1290 name: self.name.clone(),
1291 });
1292 }
1293 return Ok(self.mock_list_tools());
1294 }
1295
1296 let adapter = self
1298 .adapter
1299 .as_ref()
1300 .ok_or_else(|| McpError::McpNotConnected {
1301 name: self.name.clone(),
1302 })?;
1303
1304 adapter.list_tools().await
1305 }
1306
1307 fn mock_tool_call(&self, name: &str, params: &Value) -> ToolCallResult {
1313 match name {
1314 "novanet_describe" => {
1315 let response = serde_json::json!({
1316 "nodes": 61,
1317 "arcs": 182,
1318 "labels": ["Entity", "EntityNative", "Page", "Block"],
1319 "relationships": ["HAS_NATIVE", "CONTAINS", "FLOWS_TO"]
1320 });
1321 ToolCallResult::success(vec![ContentBlock::text(response.to_string())])
1322 }
1323
1324 "novanet_context" => {
1325 let entity = params
1327 .get("focus_key")
1328 .or_else(|| params.get("entity"))
1329 .and_then(|v| v.as_str())
1330 .unwrap_or("unknown");
1331 let locale = params
1332 .get("locale")
1333 .and_then(|v| v.as_str())
1334 .unwrap_or("en-US");
1335
1336 let response = serde_json::json!({
1337 "entity": entity,
1338 "locale": locale,
1339 "context": {
1340 "title": format!("{} - Generated Title", entity),
1341 "description": format!("Auto-generated content for {} in {}", entity, locale),
1342 "keywords": ["generated", "mock", entity]
1343 }
1344 });
1345 ToolCallResult::success(vec![ContentBlock::text(response.to_string())])
1346 }
1347
1348 _ => {
1349 let response = serde_json::json!({
1351 "tool": name,
1352 "status": "success",
1353 "message": "Mock tool call completed"
1354 });
1355 ToolCallResult::success(vec![ContentBlock::text(response.to_string())])
1356 }
1357 }
1358 }
1359
1360 fn mock_read_resource(&self, uri: &str) -> ResourceContent {
1362 let text = if uri.starts_with("neo4j://entity/") {
1364 let entity = uri.strip_prefix("neo4j://entity/").unwrap_or("unknown");
1365 serde_json::json!({
1366 "id": entity,
1367 "type": "Entity",
1368 "properties": {
1369 "name": entity,
1370 "created": "2024-01-01T00:00:00Z"
1371 }
1372 })
1373 .to_string()
1374 } else if uri.starts_with("file://") {
1375 "Mock file content".to_string()
1376 } else {
1377 serde_json::json!({
1378 "uri": uri,
1379 "content": "Mock resource content"
1380 })
1381 .to_string()
1382 };
1383
1384 ResourceContent::new(uri)
1385 .with_mime_type("application/json")
1386 .with_text(text)
1387 }
1388
1389 pub fn get_tool_definitions(&self) -> Vec<ToolDefinition> {
1400 if self.is_mock {
1401 self.mock_list_tools()
1402 } else if let Some(ref adapter) = self.adapter {
1403 adapter.get_cached_tools()
1404 } else {
1405 Vec::new()
1406 }
1407 }
1408
1409 pub fn is_tool_cache_fresh(&self, ttl: std::time::Duration) -> bool {
1414 if self.is_mock {
1415 true
1416 } else if let Some(ref adapter) = self.adapter {
1417 adapter.is_tool_cache_fresh(ttl)
1418 } else {
1419 false
1420 }
1421 }
1422
1423 pub fn invalidate_tool_cache(&self) {
1427 if !self.is_mock {
1428 if let Some(ref adapter) = self.adapter {
1429 adapter.invalidate_tool_cache();
1430 }
1431 }
1432 }
1433
1434 fn mock_list_tools(&self) -> Vec<ToolDefinition> {
1436 vec![
1437 ToolDefinition::new("novanet_describe")
1438 .with_description("Bootstrap understanding of the graph"),
1439 ToolDefinition::new("novanet_search")
1440 .with_description("Find nodes via 5 modes: fulltext, property, hybrid, walk, triggers"),
1441 ToolDefinition::new("novanet_context")
1442 .with_description("Unified context assembly for LLM content generation")
1443 .with_input_schema(serde_json::json!({
1444 "type": "object",
1445 "properties": {
1446 "mode": {"type": "string", "description": "Context mode (page, block, knowledge, assemble)"},
1447 "focus_key": {"type": "string", "description": "Focus node key"},
1448 "locale": {"type": "string", "description": "Target locale (e.g., fr-FR)"}
1449 },
1450 "required": ["mode", "locale"]
1451 })),
1452 ]
1453 }
1454}
1455
1456#[cfg(test)]
1459mod tests {
1460 use super::*;
1461
1462 #[tokio::test]
1467 async fn test_multiple_sequential_calls() {
1468 let client = McpClient::mock("test");
1470
1471 for i in 0..10 {
1472 let result = client
1473 .call_tool("test_tool", serde_json::json!({"iteration": i}))
1474 .await;
1475 assert!(
1476 result.is_ok(),
1477 "Call {} should succeed: {:?}",
1478 i,
1479 result.err()
1480 );
1481 }
1482 }
1483
1484 #[tokio::test]
1485 async fn test_concurrent_calls() {
1486 let client = std::sync::Arc::new(McpClient::mock("test"));
1488
1489 let handles: Vec<_> = (0..20)
1490 .map(|i| {
1491 let client = std::sync::Arc::clone(&client);
1492 tokio::spawn(async move {
1493 client
1494 .call_tool("test_tool", serde_json::json!({"iteration": i}))
1495 .await
1496 })
1497 })
1498 .collect();
1499
1500 for (i, handle) in handles.into_iter().enumerate() {
1501 let result = handle.await.expect("Task should not panic");
1502 assert!(result.is_ok(), "Concurrent call {} should succeed", i);
1503 }
1504 }
1505
1506 #[test]
1511 fn test_client_name_accessor() {
1512 let config = McpConfig::new("test-server", "echo");
1513 let client = McpClient::new(config).unwrap();
1514 assert_eq!(client.name(), "test-server");
1515 }
1516
1517 #[test]
1518 fn test_mock_client_is_pre_connected() {
1519 let client = McpClient::mock("test");
1520 assert!(client.is_connected());
1521 assert!(client.is_mock);
1522 }
1523
1524 #[test]
1525 fn test_real_client_starts_disconnected() {
1526 let config = McpConfig::new("test", "echo");
1527 let client = McpClient::new(config).unwrap();
1528 assert!(!client.is_connected());
1529 assert!(!client.is_mock);
1530 }
1531
1532 #[tokio::test]
1533 async fn test_mock_tool_call_returns_success() {
1534 let client = McpClient::mock("test");
1535 let result = client
1536 .call_tool("unknown_tool", serde_json::json!({}))
1537 .await;
1538 assert!(result.is_ok(), "Should succeed: {:?}", result.err());
1539 assert!(!result.unwrap().is_error);
1540 }
1541
1542 #[tokio::test]
1547 async fn test_mock_read_resource_entity() {
1548 let client = McpClient::mock("test");
1549 let result = client.read_resource("neo4j://entity/qr-code").await;
1550 assert!(result.is_ok(), "Should succeed: {:?}", result.err());
1551
1552 let resource = result.unwrap();
1553 assert_eq!(resource.uri, "neo4j://entity/qr-code");
1554 assert!(resource.text.is_some());
1555 }
1556
1557 #[tokio::test]
1558 async fn test_mock_read_resource_file() {
1559 let client = McpClient::mock("test");
1560 let result = client.read_resource("file:///tmp/test.txt").await;
1561 assert!(result.is_ok(), "Should succeed: {:?}", result.err());
1562
1563 let resource = result.unwrap();
1564 assert_eq!(resource.uri, "file:///tmp/test.txt");
1565 }
1566
1567 #[test]
1572 fn test_mock_client_drop_is_noop() {
1573 let client = McpClient::mock("test");
1575 assert!(client.is_mock);
1576 drop(client);
1578 }
1579
1580 #[test]
1581 fn test_real_client_drop_without_process() {
1582 let config = McpConfig::new("test", "echo");
1584 let client = McpClient::new(config).unwrap();
1585 assert!(!client.is_mock);
1586 drop(client);
1588 }
1589
1590 #[test]
1595 fn test_with_validation_enables_validator() {
1596 let config = McpConfig::new("test", "echo");
1597 let client = McpClient::new(config)
1598 .unwrap()
1599 .with_validation(ValidationConfig::default());
1600
1601 assert!(client.validator.is_some());
1603 }
1604
1605 #[tokio::test]
1606 async fn test_mock_connect_populates_schema_cache_when_validation_enabled() {
1607 let client = McpClient::mock("novanet").with_validation(ValidationConfig::default());
1608
1609 client.connect().await.unwrap();
1611
1612 let validator = client.validator.as_ref().unwrap();
1614 let stats = validator.cache().stats();
1615 assert!(stats.tool_count > 0, "Should have cached tools");
1616 }
1617
1618 #[tokio::test]
1619 async fn test_call_tool_validates_missing_required_field() {
1620 let client = McpClient::mock("novanet").with_validation(ValidationConfig::default());
1621 client.connect().await.unwrap();
1622
1623 let result = client
1625 .call_tool(
1626 "novanet_context",
1627 serde_json::json!({
1628 "focus_key": "qr-code"
1629 }),
1631 )
1632 .await;
1633
1634 assert!(result.is_err());
1635 let err = result.unwrap_err();
1636 assert!(matches!(err, McpError::McpValidationFailed { .. }));
1637
1638 if let McpError::McpValidationFailed {
1639 missing, details, ..
1640 } = err
1641 {
1642 assert!(missing.contains(&"mode".to_string()));
1643 assert!(details.contains("mode"));
1644 }
1645 }
1646
1647 #[tokio::test]
1648 async fn test_call_tool_passes_validation_with_valid_params() {
1649 let client = McpClient::mock("novanet").with_validation(ValidationConfig::default());
1650 client.connect().await.unwrap();
1651
1652 let result = client
1654 .call_tool(
1655 "novanet_context",
1656 serde_json::json!({
1657 "mode": "page",
1658 "focus_key": "qr-code",
1659 "locale": "fr-FR"
1660 }),
1661 )
1662 .await;
1663
1664 assert!(result.is_ok(), "Should succeed: {:?}", result.err());
1665 }
1666
1667 #[tokio::test]
1668 async fn test_call_tool_skips_validation_when_disabled() {
1669 let config = ValidationConfig {
1670 pre_validate: false, ..Default::default()
1672 };
1673 let client = McpClient::mock("novanet").with_validation(config);
1674 client.connect().await.unwrap();
1675
1676 let result = client
1678 .call_tool(
1679 "novanet_context",
1680 serde_json::json!({
1681 "focus_key": "qr-code"
1682 }),
1684 )
1685 .await;
1686
1687 assert!(result.is_ok(), "Should succeed: {:?}", result.err());
1689 }
1690
1691 #[tokio::test]
1692 async fn test_call_tool_without_validation_works() {
1693 let client = McpClient::mock("novanet");
1695
1696 let result = client
1698 .call_tool(
1699 "novanet_context",
1700 serde_json::json!({
1701 }),
1703 )
1704 .await;
1705
1706 assert!(result.is_ok(), "Should succeed: {:?}", result.err());
1707 }
1708
1709 #[tokio::test]
1710 async fn test_validation_for_unknown_tool_passes() {
1711 let client = McpClient::mock("novanet").with_validation(ValidationConfig::default());
1712 client.connect().await.unwrap();
1713
1714 let result = client
1716 .call_tool(
1717 "unknown_tool",
1718 serde_json::json!({
1719 "anything": "goes"
1720 }),
1721 )
1722 .await;
1723
1724 assert!(result.is_ok(), "Should succeed: {:?}", result.err());
1725 }
1726
1727 #[test]
1732 fn test_with_cache_enables_caching() {
1733 let config = McpConfig::new("test", "echo");
1734 let client = McpClient::new(config)
1735 .unwrap()
1736 .with_cache(CacheConfig::default());
1737
1738 assert!(client.cache.is_some());
1740 }
1741
1742 #[test]
1743 fn test_cache_stats_returns_none_when_disabled() {
1744 let client = McpClient::mock("test");
1745 assert!(client.cache_stats().is_none());
1746 }
1747
1748 #[test]
1749 fn test_cache_stats_returns_some_when_enabled() {
1750 let client = McpClient::mock("test").with_cache(CacheConfig::default());
1751 let stats = client.cache_stats();
1752 assert!(stats.is_some());
1753 let stats = stats.unwrap();
1754 assert_eq!(stats.entries, 0);
1755 assert_eq!(stats.hits, 0);
1756 assert_eq!(stats.misses, 0);
1757 }
1758
1759 #[tokio::test]
1760 async fn test_cache_hit_returns_cached_result() {
1761 let client = McpClient::mock("test").with_cache(CacheConfig::default());
1762
1763 let params = serde_json::json!({"entity": "qr-code"});
1764
1765 let result1 = client.call_tool("novanet_context", params.clone()).await;
1767 assert!(result1.is_ok(), "Should succeed: {:?}", result1.err());
1768
1769 let stats = client.cache_stats().unwrap();
1770 assert_eq!(stats.misses, 1);
1771 assert_eq!(stats.hits, 0);
1772 assert_eq!(stats.entries, 1);
1773
1774 let result2 = client.call_tool("novanet_context", params.clone()).await;
1776 assert!(result2.is_ok(), "Should succeed: {:?}", result2.err());
1777
1778 let stats = client.cache_stats().unwrap();
1779 assert_eq!(stats.misses, 1);
1780 assert_eq!(stats.hits, 1);
1781
1782 let r1 = result1.unwrap();
1784 let r2 = result2.unwrap();
1785 assert_eq!(r1.content.len(), r2.content.len());
1786 }
1787
1788 #[tokio::test]
1789 async fn test_cache_different_params_miss() {
1790 let client = McpClient::mock("test").with_cache(CacheConfig::default());
1791
1792 let params_a = serde_json::json!({"focus_key": "qr-code"});
1794 client.call_tool("novanet_context", params_a).await.unwrap();
1795
1796 let params_b = serde_json::json!({"focus_key": "barcode"});
1798 client.call_tool("novanet_context", params_b).await.unwrap();
1799
1800 let stats = client.cache_stats().unwrap();
1801 assert_eq!(stats.misses, 2);
1802 assert_eq!(stats.hits, 0);
1803 assert_eq!(stats.entries, 2);
1804 }
1805
1806 #[tokio::test]
1807 async fn test_cache_different_tools_miss() {
1808 let client = McpClient::mock("test").with_cache(CacheConfig::default());
1809
1810 let params = serde_json::json!({});
1811
1812 client
1814 .call_tool("novanet_describe", params.clone())
1815 .await
1816 .unwrap();
1817
1818 client
1820 .call_tool("novanet_search", params.clone())
1821 .await
1822 .unwrap();
1823
1824 let stats = client.cache_stats().unwrap();
1825 assert_eq!(stats.misses, 2);
1826 assert_eq!(stats.hits, 0);
1827 }
1828
1829 #[tokio::test]
1830 async fn test_cache_ttl_expiration() {
1831 use std::time::Duration;
1832
1833 let client = McpClient::mock("test").with_cache(CacheConfig {
1835 ttl: Duration::from_millis(50),
1836 max_entries: 100,
1837 });
1838
1839 let params = serde_json::json!({"test": true});
1840
1841 client.call_tool("test_tool", params.clone()).await.unwrap();
1843 assert_eq!(client.cache_stats().unwrap().entries, 1);
1844
1845 tokio::time::sleep(Duration::from_millis(60)).await;
1847
1848 client.call_tool("test_tool", params.clone()).await.unwrap();
1850
1851 let stats = client.cache_stats().unwrap();
1852 assert_eq!(stats.misses, 2); assert_eq!(stats.hits, 0);
1854 }
1855
1856 #[test]
1857 fn test_cache_hit_rate_calculation() {
1858 let stats = super::ResponseCacheStats {
1859 entries: 10,
1860 hits: 80,
1861 misses: 20,
1862 };
1863 assert!((stats.hit_rate() - 0.8).abs() < 0.001);
1864 }
1865
1866 #[test]
1867 fn test_cache_hit_rate_zero_total() {
1868 let stats = super::ResponseCacheStats {
1869 entries: 0,
1870 hits: 0,
1871 misses: 0,
1872 };
1873 assert_eq!(stats.hit_rate(), 0.0);
1874 }
1875
1876 #[test]
1877 fn test_cache_key_deterministic() {
1878 let params = serde_json::json!({"entity": "qr-code", "locale": "fr-FR"});
1879
1880 let key1 = super::ResponseCache::cache_key("tool", ¶ms);
1881 let key2 = super::ResponseCache::cache_key("tool", ¶ms);
1882
1883 assert_eq!(key1, key2);
1884 }
1885
1886 #[test]
1887 fn test_cache_key_different_for_different_params() {
1888 let params1 = serde_json::json!({"entity": "qr-code"});
1889 let params2 = serde_json::json!({"entity": "barcode"});
1890
1891 let key1 = super::ResponseCache::cache_key("tool", ¶ms1);
1892 let key2 = super::ResponseCache::cache_key("tool", ¶ms2);
1893
1894 assert_ne!(key1, key2);
1895 }
1896
1897 #[test]
1898 fn test_cache_key_different_for_different_tools() {
1899 let params = serde_json::json!({"test": true});
1900
1901 let key1 = super::ResponseCache::cache_key("tool_a", ¶ms);
1902 let key2 = super::ResponseCache::cache_key("tool_b", ¶ms);
1903
1904 assert_ne!(key1, key2);
1905 }
1906
1907 #[tokio::test]
1912 async fn test_ping_mock_client_succeeds() {
1913 let client = McpClient::mock("test");
1914
1915 let result = client.ping().await;
1916 assert!(result.is_ok(), "Should succeed: {:?}", result.err());
1917
1918 let ping = result.unwrap();
1919 assert_eq!(ping.server, "test");
1920 assert!(ping.was_connected);
1921 assert!(ping.tool_count > 0);
1922 assert!(ping.latency.as_millis() < 100);
1924 }
1925
1926 #[test]
1927 fn test_mcp_ping_error_types() {
1928 let start_failed = super::McpPingError::StartFailed {
1929 server: "novanet".to_string(),
1930 details: "command not found".to_string(),
1931 };
1932 assert!(start_failed.to_string().contains("failed to start"));
1933 assert!(!start_failed.suggestion().is_empty());
1934
1935 let timeout = super::McpPingError::Timeout {
1936 server: "slow-server".to_string(),
1937 timeout: std::time::Duration::from_secs(10),
1938 };
1939 assert!(timeout.to_string().contains("timed out"));
1940
1941 let refused = super::McpPingError::ConnectionRefused {
1942 server: "offline".to_string(),
1943 };
1944 assert!(refused.to_string().contains("refused"));
1945
1946 let server_err = super::McpPingError::ServerError {
1947 server: "broken".to_string(),
1948 details: "internal error".to_string(),
1949 };
1950 assert!(server_err.to_string().contains("error"));
1951 }
1952
1953 #[tokio::test]
1954 async fn test_ping_result_has_valid_fields() {
1955 let client = McpClient::mock("novanet");
1956
1957 let result = client.ping().await.unwrap();
1958
1959 assert_eq!(result.server, "novanet");
1961 assert!(result.tool_count >= 3); assert!(result.was_connected); }
1964
1965 #[test]
1966 fn test_is_configured_returns_true_for_mock() {
1967 let client = McpClient::mock("test");
1968 assert!(client.is_configured());
1969 }
1970
1971 #[test]
1972 fn test_is_configured_returns_true_for_real_client() {
1973 let config = McpConfig::new("test", "echo");
1974 let client = McpClient::new(config).unwrap();
1975 assert!(client.is_configured());
1976 }
1977
1978 #[tokio::test]
1983 async fn test_call_tool_with_retry_events_mock_success() {
1984 use nika_event::EventLog;
1985
1986 let client = McpClient::mock("novanet");
1987 let event_log = EventLog::new();
1988 let task_id: Arc<str> = Arc::from("test_retry_events");
1989
1990 let result = client
1992 .call_tool_with_retry_events(
1993 "novanet_context",
1994 serde_json::json!({"focus_key": "qr-code"}),
1995 &task_id,
1996 &event_log,
1997 )
1998 .await;
1999
2000 assert!(
2001 result.is_ok(),
2002 "Mock call should succeed: {:?}",
2003 result.err()
2004 );
2005
2006 let events = event_log.filter_task("test_retry_events");
2008 let retry_events: Vec<_> = events
2009 .iter()
2010 .filter(|e| matches!(e.kind, EventKind::McpRetry { .. }))
2011 .collect();
2012 assert!(
2013 retry_events.is_empty(),
2014 "No retry events for successful calls"
2015 );
2016 }
2017
2018 #[tokio::test]
2019 async fn test_call_tool_with_retry_events_uses_cache() {
2020 use nika_event::EventLog;
2021 use std::time::Duration;
2022
2023 let client = McpClient::mock("novanet").with_cache(CacheConfig {
2025 ttl: Duration::from_secs(60),
2026 max_entries: 100,
2027 });
2028 let event_log = EventLog::new();
2029 let task_id: Arc<str> = Arc::from("test_cache_hit");
2030
2031 let params = serde_json::json!({"focus_key": "qr-code"});
2032
2033 let result1 = client
2035 .call_tool_with_retry_events("novanet_context", params.clone(), &task_id, &event_log)
2036 .await
2037 .unwrap();
2038 assert!(!result1.was_cached);
2039
2040 let result2 = client
2042 .call_tool_with_retry_events("novanet_context", params.clone(), &task_id, &event_log)
2043 .await
2044 .unwrap();
2045 assert!(result2.was_cached);
2046 }
2047
2048 #[tokio::test]
2049 async fn test_call_tool_with_retry_events_not_connected_fails() {
2050 use nika_event::EventLog;
2051
2052 let config = McpConfig::new("test", "nonexistent_command");
2054 let client = McpClient::new(config).unwrap();
2055 let event_log = EventLog::new();
2056 let task_id: Arc<str> = Arc::from("test_not_connected");
2057
2058 let result = client
2059 .call_tool_with_retry_events("some_tool", serde_json::json!({}), &task_id, &event_log)
2060 .await;
2061
2062 assert!(result.is_err());
2063 match result.unwrap_err() {
2064 McpError::McpNotConnected { .. } => {} err => panic!("Expected McpNotConnected, got: {err:?}"),
2066 }
2067 }
2068
2069 #[tokio::test]
2074 async fn test_disconnect_clears_response_cache() {
2075 let client = McpClient::mock("test_cache");
2076
2077 assert!(client.is_connected());
2079 client.disconnect().await.unwrap();
2080 assert!(!client.is_connected());
2081 }
2082
2083 #[tokio::test]
2084 async fn test_disconnect_clears_response_cache_with_entries() {
2085 let cache_config = CacheConfig {
2086 ttl: std::time::Duration::from_secs(300),
2087 max_entries: 100,
2088 };
2089 let client = McpClient::mock("test_cache_entries").with_cache(cache_config);
2090
2091 let _ = client
2093 .call_tool("novanet_describe", serde_json::json!({}))
2094 .await;
2095
2096 let stats = client.cache_stats();
2098 assert!(stats.is_some());
2099
2100 client.disconnect().await.unwrap();
2102 assert!(!client.is_connected());
2103 }
2104
2105 #[tokio::test]
2106 async fn test_disconnect_invalidates_tool_cache_via_adapter() {
2107 let config = McpConfig::new("test_adapter_cache", "echo");
2110 let client = McpClient::new(config).unwrap();
2111
2112 client.disconnect().await.unwrap();
2114 assert!(!client.is_connected());
2115 }
2116
2117 #[test]
2125 fn wave2_cache_key_canonical_json_ordering() {
2126 use serde_json::json;
2127
2128 let mut map_a = serde_json::Map::new();
2132 map_a.insert("alpha".to_string(), json!("first"));
2133 map_a.insert("beta".to_string(), json!("second"));
2134 map_a.insert("gamma".to_string(), json!("third"));
2135
2136 let mut map_b = serde_json::Map::new();
2137 map_b.insert("gamma".to_string(), json!("third"));
2138 map_b.insert("alpha".to_string(), json!("first"));
2139 map_b.insert("beta".to_string(), json!("second"));
2140
2141 let value_a = Value::Object(map_a);
2142 let value_b = Value::Object(map_b);
2143
2144 let json_a = serde_json::to_string(&value_a).unwrap();
2147 let json_b = serde_json::to_string(&value_b).unwrap();
2148
2149 let key_a = ResponseCache::cache_key("test_tool", &value_a);
2151 let key_b = ResponseCache::cache_key("test_tool", &value_b);
2152
2153 assert_eq!(
2157 key_a, key_b,
2158 "Canonical cache keys should match regardless of key insertion order. \
2159 json_a='{}', json_b='{}'",
2160 json_a, json_b
2161 );
2162 }
2163
2164 #[test]
2172 fn wave2_evict_oldest_collects_all_entries() {
2173 use std::time::Duration;
2174
2175 let cache = ResponseCache::new(CacheConfig {
2177 ttl: Duration::from_secs(300),
2178 max_entries: 5,
2179 });
2180
2181 for i in 0..6 {
2183 let params = serde_json::json!({"i": i});
2184 cache.put(
2185 &format!("tool_{}", i),
2186 ¶ms,
2187 ToolCallResult::success(vec![ContentBlock::text(format!("result_{}", i))]),
2188 );
2189 }
2190
2191 let stats = cache.stats();
2193 assert!(stats.entries <= 6, "Cache should have at most 6 entries");
2194
2195 let to_remove = 5 / 10; let actual_remove = to_remove.max(1); assert_eq!(
2209 actual_remove, 1,
2210 "Eviction removes max(max_entries/10, 1) entries. \
2211 BUG: This requires iterating ALL entries + sorting to find the oldest one. \
2212 An LRU cache would do this in O(1)."
2213 );
2214 }
2215
2216 #[test]
2221 fn test_is_connection_error_broken_pipe() {
2222 let err = McpError::McpToolError {
2223 tool: "test".into(),
2224 reason: "Broken pipe".into(),
2225 error_code: None,
2226 };
2227 assert!(McpClient::is_connection_error(&err));
2228 }
2229
2230 #[test]
2231 fn test_is_connection_error_connection_reset() {
2232 let err = McpError::McpToolError {
2233 tool: "test".into(),
2234 reason: "Connection reset by peer".into(),
2235 error_code: None,
2236 };
2237 assert!(McpClient::is_connection_error(&err));
2238 }
2239
2240 #[test]
2241 fn test_is_connection_error_connection_refused() {
2242 let err = McpError::McpToolError {
2243 tool: "test".into(),
2244 reason: "Connection refused".into(),
2245 error_code: None,
2246 };
2247 assert!(McpClient::is_connection_error(&err));
2248 }
2249
2250 #[test]
2251 fn test_is_connection_error_eof() {
2252 let err = McpError::McpToolError {
2253 tool: "test".into(),
2254 reason: "unexpected EOF".into(),
2255 error_code: None,
2256 };
2257 assert!(McpClient::is_connection_error(&err));
2258 }
2259
2260 #[test]
2261 fn test_is_connection_error_stdin_not_available() {
2262 let err = McpError::McpToolError {
2263 tool: "test".into(),
2264 reason: "stdin not available".into(),
2265 error_code: None,
2266 };
2267 assert!(McpClient::is_connection_error(&err));
2268 }
2269
2270 #[test]
2271 fn test_is_connection_error_stdout_not_available() {
2272 let err = McpError::McpToolError {
2273 tool: "test".into(),
2274 reason: "stdout not available".into(),
2275 error_code: None,
2276 };
2277 assert!(McpClient::is_connection_error(&err));
2278 }
2279
2280 #[test]
2281 fn test_is_connection_error_transport_closed() {
2282 let err = McpError::McpToolError {
2283 tool: "test".into(),
2284 reason: "Transport closed unexpectedly".into(),
2285 error_code: None,
2286 };
2287 assert!(McpClient::is_connection_error(&err));
2288 }
2289
2290 #[test]
2291 fn test_is_connection_error_transport_send() {
2292 let err = McpError::McpToolError {
2293 tool: "test".into(),
2294 reason: "Transport send failed".into(),
2295 error_code: None,
2296 };
2297 assert!(McpClient::is_connection_error(&err));
2298 }
2299
2300 #[test]
2301 fn test_is_connection_error_non_connection_error() {
2302 let err = McpError::McpToolError {
2303 tool: "test".into(),
2304 reason: "invalid parameter 'mode'".into(),
2305 error_code: None,
2306 };
2307 assert!(!McpClient::is_connection_error(&err));
2308 }
2309
2310 #[test]
2311 fn test_is_connection_error_not_connected() {
2312 let err = McpError::McpNotConnected {
2313 name: "novanet".into(),
2314 };
2315 assert!(!McpClient::is_connection_error(&err));
2317 }
2318}