syslog_server_mcp/transport/
stdio.rs1use crate::config::Config;
2use crate::error::Result;
3use crate::notifier::ListChangedNotifier;
4use crate::rest_client::RestClient;
5use crate::tools::{ToolInventory, ToolRegistry};
6use rmcp::{
7 ErrorData, ServerHandler,
8 model::{
9 CallToolRequestParams, CallToolResult, Content, Implementation, ListToolsResult,
10 PaginatedRequestParams, ServerCapabilities, ServerInfo, Tool as McpTool,
11 },
12 service::{NotificationContext, Peer, RequestContext, RoleServer},
13 transport::stdio,
14};
15use std::sync::Arc;
16
17pub use crate::probe::ServerPhase;
18
19#[derive(Clone)]
20pub struct McpServer {
21 rest: RestClient,
22 registry: Arc<ToolRegistry>,
23 notifier: Arc<ListChangedNotifier>,
24}
25
26impl McpServer {
27 pub fn new(
28 rest: RestClient,
29 registry: Arc<ToolRegistry>,
30 notifier: Arc<ListChangedNotifier>,
31 ) -> Self {
32 Self {
33 rest,
34 registry,
35 notifier,
36 }
37 }
38}
39
40pub struct RmcpPeerAdapter(pub Peer<RoleServer>);
41
42#[async_trait::async_trait]
43impl crate::notifier::PeerNotifier for RmcpPeerAdapter {
44 async fn notify_tool_list_changed(&self) -> std::result::Result<(), String> {
45 self.0
46 .notify_tool_list_changed()
47 .await
48 .map_err(|e| e.to_string())
49 }
50}
51
52impl ServerHandler for McpServer {
53 fn get_info(&self) -> ServerInfo {
54 ServerInfo::new(
55 ServerCapabilities::builder()
56 .enable_tools()
57 .enable_tool_list_changed()
58 .build(),
59 )
60 .with_server_info(Implementation::new(
61 "syslog-server-mcp",
62 env!("CARGO_PKG_VERSION"),
63 ))
64 .with_instructions(
65 "Read-only SysLog-Server investigation tools. \
66 See https://github.com/darylmcd/SysLog-Server-MCP",
67 )
68 }
69
70 async fn list_tools(
71 &self,
72 _request: Option<PaginatedRequestParams>,
73 _context: RequestContext<RoleServer>,
74 ) -> std::result::Result<ListToolsResult, ErrorData> {
75 let tools: Vec<McpTool> = self
76 .registry
77 .list()
78 .iter()
79 .map(|t| {
80 let schema = t.input_schema();
81 let schema_map: serde_json::Map<String, serde_json::Value> =
82 schema.as_object().cloned().unwrap_or_default();
83 McpTool::new(t.name(), t.description(), Arc::new(schema_map))
84 })
85 .collect();
86 Ok(ListToolsResult {
87 tools,
88 ..Default::default()
89 })
90 }
91
92 async fn call_tool(
93 &self,
94 request: CallToolRequestParams,
95 _context: RequestContext<RoleServer>,
96 ) -> std::result::Result<CallToolResult, ErrorData> {
97 let tool = self.registry.get(&request.name).ok_or_else(|| {
98 ErrorData::invalid_params(format!("unknown tool: {}", request.name), None)
99 })?;
100 let input = request
101 .arguments
102 .map(serde_json::Value::Object)
103 .unwrap_or(serde_json::Value::Null);
104 match tool.call(&self.rest, input).await {
105 Ok(out) => Ok(CallToolResult::success(vec![Content::text(
106 serde_json::to_string_pretty(&out).unwrap_or_default(),
107 )])),
108 Err(e) => Ok(CallToolResult::error(vec![Content::text(e.to_string())])),
109 }
110 }
111
112 async fn on_initialized(&self, context: NotificationContext<RoleServer>) {
113 self.notifier
114 .register(Arc::new(RmcpPeerAdapter(context.peer)))
115 .await;
116 }
117}
118
119pub fn build_inventory(phase: ServerPhase) -> ToolInventory {
121 let mut inventory = ToolInventory::new();
122 inventory.register(Arc::new(crate::tools::thin::QueryEventsTool));
123 inventory.register(Arc::new(crate::tools::thin::ListAlertRulesTool));
124 inventory.register(Arc::new(crate::tools::thin::ListSavedSearchesTool));
125 inventory.register(Arc::new(crate::tools::thin::GetAuditLogTool));
126 inventory.register(Arc::new(crate::tools::system::HealthSummaryTool));
127 inventory.register(Arc::new(crate::tools::system::ListSpoolSegmentsTool));
128
129 if phase == ServerPhase::Phase1 {
130 inventory.register(Arc::new(crate::tools::investigators::InvestigateEventTool));
131 inventory.register(Arc::new(crate::tools::investigators::InvestigateSourceTool));
132 inventory.register(Arc::new(crate::tools::investigators::InvestigateAlertTool));
133 }
134
135 inventory
136}
137
138pub async fn serve(config: Config) -> Result<()> {
139 use rmcp::ServiceExt;
140
141 let rest = RestClient::new(
142 config.base_url.clone(),
143 config.api_key.clone(),
144 config.insecure,
145 config.mcp_client.clone(),
146 )?;
147
148 let probe_result = crate::probe::run_probes(&rest, &config.base_url, config.insecure).await;
149 crate::probe::log_probe_results(&probe_result);
150
151 if config.probe_once {
152 return Ok(());
153 }
154
155 let initial_phase = detect_initial_phase(&rest, config.phase_override).await;
156 let registry = Arc::new(ToolRegistry::from_inventory(build_inventory(initial_phase)));
157 let notifier = Arc::new(ListChangedNotifier::new());
158 if config.phase_override.is_none() {
159 crate::probe::CapabilityProbe::spawn_loop(
160 rest.clone(),
161 Arc::clone(®istry),
162 Arc::clone(¬ifier),
163 config.capability_probe_interval,
164 initial_phase,
165 );
166 }
167
168 let server = McpServer::new(rest, registry, notifier);
169 let service = server
170 .serve(stdio())
171 .await
172 .map_err(|e| crate::error::Error::Mcp(e.to_string()))?;
173 service
174 .waiting()
175 .await
176 .map_err(|e| crate::error::Error::Mcp(e.to_string()))?;
177 Ok(())
178}
179
180pub async fn detect_initial_phase(
181 rest: &RestClient,
182 phase_override: Option<ServerPhase>,
183) -> ServerPhase {
184 if let Some(phase) = phase_override {
185 tracing::warn!(phase = ?phase, "phase override set; skipping capability probe");
186 return phase;
187 }
188
189 match crate::probe::CapabilityProbe::detect_phase(rest).await {
190 crate::probe::ProbeResult::Phase(phase) => {
191 tracing::info!(phase = ?phase, "detected server phase");
192 phase
193 }
194 crate::probe::ProbeResult::Unknown => {
195 tracing::warn!("initial capability probe returned unknown; assuming Phase0");
196 ServerPhase::Phase0
197 }
198 }
199}