Skip to main content

syslog_server_mcp/transport/
stdio.rs

1use crate::config::Config;
2use crate::error::Result;
3use crate::notifier::ListChangedNotifier;
4use crate::rest_client::RestClient;
5use crate::tools::{ToolInventory, ToolRegistry};
6use rmcp::{
7    ErrorData, ServerHandler,
8    model::{
9        CallToolRequestParams, CallToolResult, Content, Implementation, ListToolsResult,
10        PaginatedRequestParams, ServerCapabilities, ServerInfo, Tool as McpTool,
11    },
12    service::{NotificationContext, Peer, RequestContext, RoleServer},
13    transport::stdio,
14};
15use std::sync::Arc;
16
17pub use crate::probe::ServerPhase;
18
19#[derive(Clone)]
20pub struct McpServer {
21    rest: RestClient,
22    registry: Arc<ToolRegistry>,
23    notifier: Arc<ListChangedNotifier>,
24}
25
26impl McpServer {
27    pub fn new(
28        rest: RestClient,
29        registry: Arc<ToolRegistry>,
30        notifier: Arc<ListChangedNotifier>,
31    ) -> Self {
32        Self {
33            rest,
34            registry,
35            notifier,
36        }
37    }
38}
39
40pub struct RmcpPeerAdapter(pub Peer<RoleServer>);
41
42#[async_trait::async_trait]
43impl crate::notifier::PeerNotifier for RmcpPeerAdapter {
44    async fn notify_tool_list_changed(&self) -> std::result::Result<(), String> {
45        self.0
46            .notify_tool_list_changed()
47            .await
48            .map_err(|e| e.to_string())
49    }
50}
51
52impl ServerHandler for McpServer {
53    fn get_info(&self) -> ServerInfo {
54        ServerInfo::new(
55            ServerCapabilities::builder()
56                .enable_tools()
57                .enable_tool_list_changed()
58                .build(),
59        )
60        .with_server_info(Implementation::new(
61            "syslog-server-mcp",
62            env!("CARGO_PKG_VERSION"),
63        ))
64        .with_instructions(
65            "Read-only SysLog-Server investigation tools. \
66                 See https://github.com/darylmcd/SysLog-Server-MCP",
67        )
68    }
69
70    async fn list_tools(
71        &self,
72        _request: Option<PaginatedRequestParams>,
73        _context: RequestContext<RoleServer>,
74    ) -> std::result::Result<ListToolsResult, ErrorData> {
75        let tools: Vec<McpTool> = self
76            .registry
77            .list()
78            .iter()
79            .map(|t| {
80                let schema = t.input_schema();
81                let schema_map: serde_json::Map<String, serde_json::Value> =
82                    schema.as_object().cloned().unwrap_or_default();
83                McpTool::new(t.name(), t.description(), Arc::new(schema_map))
84            })
85            .collect();
86        Ok(ListToolsResult {
87            tools,
88            ..Default::default()
89        })
90    }
91
92    async fn call_tool(
93        &self,
94        request: CallToolRequestParams,
95        _context: RequestContext<RoleServer>,
96    ) -> std::result::Result<CallToolResult, ErrorData> {
97        let tool = self.registry.get(&request.name).ok_or_else(|| {
98            ErrorData::invalid_params(format!("unknown tool: {}", request.name), None)
99        })?;
100        let input = request
101            .arguments
102            .map(serde_json::Value::Object)
103            .unwrap_or(serde_json::Value::Null);
104        match tool.call(&self.rest, input).await {
105            Ok(out) => Ok(CallToolResult::success(vec![Content::text(
106                serde_json::to_string_pretty(&out).unwrap_or_default(),
107            )])),
108            Err(e) => Ok(CallToolResult::error(vec![Content::text(e.to_string())])),
109        }
110    }
111
112    async fn on_initialized(&self, context: NotificationContext<RoleServer>) {
113        self.notifier
114            .register(Arc::new(RmcpPeerAdapter(context.peer)))
115            .await;
116    }
117}
118
119/// Build the tool inventory appropriate for the detected server phase.
120pub fn build_inventory(phase: ServerPhase) -> ToolInventory {
121    let mut inventory = ToolInventory::new();
122    inventory.register(Arc::new(crate::tools::thin::QueryEventsTool));
123    inventory.register(Arc::new(crate::tools::thin::ListAlertRulesTool));
124    inventory.register(Arc::new(crate::tools::thin::ListSavedSearchesTool));
125    inventory.register(Arc::new(crate::tools::thin::GetAuditLogTool));
126    inventory.register(Arc::new(crate::tools::system::HealthSummaryTool));
127    inventory.register(Arc::new(crate::tools::system::ListSpoolSegmentsTool));
128
129    if phase == ServerPhase::Phase1 {
130        inventory.register(Arc::new(crate::tools::investigators::InvestigateEventTool));
131        inventory.register(Arc::new(crate::tools::investigators::InvestigateSourceTool));
132        inventory.register(Arc::new(crate::tools::investigators::InvestigateAlertTool));
133    }
134
135    inventory
136}
137
138pub async fn serve(config: Config) -> Result<()> {
139    use rmcp::ServiceExt;
140
141    let rest = RestClient::new(
142        config.base_url.clone(),
143        config.api_key.clone(),
144        config.insecure,
145        config.mcp_client.clone(),
146    )?;
147
148    let probe_result = crate::probe::run_probes(&rest, &config.base_url, config.insecure).await;
149    crate::probe::log_probe_results(&probe_result);
150
151    if config.probe_once {
152        return Ok(());
153    }
154
155    let initial_phase = detect_initial_phase(&rest, config.phase_override).await;
156    let registry = Arc::new(ToolRegistry::from_inventory(build_inventory(initial_phase)));
157    let notifier = Arc::new(ListChangedNotifier::new());
158    if config.phase_override.is_none() {
159        crate::probe::CapabilityProbe::spawn_loop(
160            rest.clone(),
161            Arc::clone(&registry),
162            Arc::clone(&notifier),
163            config.capability_probe_interval,
164            initial_phase,
165        );
166    }
167
168    let server = McpServer::new(rest, registry, notifier);
169    let service = server
170        .serve(stdio())
171        .await
172        .map_err(|e| crate::error::Error::Mcp(e.to_string()))?;
173    service
174        .waiting()
175        .await
176        .map_err(|e| crate::error::Error::Mcp(e.to_string()))?;
177    Ok(())
178}
179
180pub async fn detect_initial_phase(
181    rest: &RestClient,
182    phase_override: Option<ServerPhase>,
183) -> ServerPhase {
184    if let Some(phase) = phase_override {
185        tracing::warn!(phase = ?phase, "phase override set; skipping capability probe");
186        return phase;
187    }
188
189    match crate::probe::CapabilityProbe::detect_phase(rest).await {
190        crate::probe::ProbeResult::Phase(phase) => {
191            tracing::info!(phase = ?phase, "detected server phase");
192            phase
193        }
194        crate::probe::ProbeResult::Unknown => {
195            tracing::warn!("initial capability probe returned unknown; assuming Phase0");
196            ServerPhase::Phase0
197        }
198    }
199}