use std::{future::Future, sync::Arc};
use tokio::sync::{Mutex, RwLock};
use crate::service::tools::helpers::{
NODE_INFO_MODE_LIST, RDF_FORMAT_LIST, READER_MODES_LIST, RESULT_FORMAT_LIST, SHACL_FORMAT_LIST, SHEX_FORMAT_LIST,
SPARQL_RESULT_FORMAT_LIST,
};
use crate::service::{logging::LogRateLimiter, prompts, state, tools};
use rmcp::{
RoleServer,
handler::server::router::{prompt::PromptRouter, tool::ToolRouter},
model::LoggingLevel,
service::RequestContext,
};
use rudof_lib::{
Rudof, RudofConfig,
formats::{DataFormat, InputSpec, ResultDataFormat},
};
tokio::task_local! {
static REQUEST_CONTEXT: RequestContext<RoleServer>;
}
#[derive(Debug)]
pub enum ServiceCreationError {
ConfigError(String),
RudofError(String),
}
impl std::fmt::Display for ServiceCreationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ConfigError(e) => write!(f, "Failed to create Rudof configuration: {}", e),
Self::RudofError(e) => write!(f, "Failed to initialize Rudof: {}", e),
}
}
}
impl std::error::Error for ServiceCreationError {}
#[derive(Clone)]
pub struct RudofMcpService {
pub rudof: Arc<Mutex<Rudof>>,
pub tool_router: ToolRouter<RudofMcpService>,
pub prompt_router: PromptRouter<RudofMcpService>,
pub current_min_log_level: Arc<RwLock<Option<LoggingLevel>>>,
pub(crate) log_rate_limiter: Arc<Mutex<LogRateLimiter>>,
}
impl RudofMcpService {
pub fn new() -> Self {
Self::try_new().expect("Failed to create RudofMcpService")
}
pub fn try_new() -> Result<Self, ServiceCreationError> {
let rudof_config = RudofConfig::new();
let mut rudof = Rudof::new(rudof_config);
if let Some(persisted_state) = state::load_state()
&& let Some(rdf_ntriples) = &persisted_state.rdf_data_ntriples
&& !rdf_ntriples.is_empty()
{
tracing::info!(
"Restoring {} triples from persisted state",
persisted_state.triple_count.unwrap_or(0)
);
let spec = InputSpec::Str(rdf_ntriples.clone());
let data_specs = vec![spec];
if let Err(e) = rudof
.load_data()
.with_data(&data_specs)
.with_data_format(&DataFormat::NTriples)
.execute()
{
tracing::warn!("Failed to restore persisted RDF data: {}", e);
} else {
tracing::info!("Successfully restored RDF data from persisted state");
}
}
Ok(Self {
rudof: Arc::new(Mutex::new(rudof)),
tool_router: tools::tool_router_public(),
prompt_router: prompts::prompt_router_public(),
current_min_log_level: Arc::new(RwLock::new(None)),
log_rate_limiter: Arc::new(Mutex::new(LogRateLimiter::default())),
})
}
pub async fn with_request_context<F, T>(context: RequestContext<RoleServer>, future: F) -> T
where
F: Future<Output = T>,
{
REQUEST_CONTEXT.scope(context, future).await
}
pub fn current_request_context() -> Option<RequestContext<RoleServer>> {
REQUEST_CONTEXT.try_with(Clone::clone).ok()
}
pub fn count_triples_in_ntriples(ntriples: &str) -> usize {
ntriples
.lines()
.filter(|line| {
let trimmed = line.trim();
!trimmed.is_empty() && !trimmed.starts_with('#')
})
.count()
}
pub async fn persist_state_with(
&self,
prebuilt_ntriples: Option<String>,
) -> Result<bool, state::StatePersistenceError> {
if !state::is_persistence_available() {
tracing::debug!("State persistence not available (no state directory)");
return Ok(false);
}
let rdf_ntriples = if let Some(ntriples) = prebuilt_ntriples {
ntriples
} else {
let mut rudof = self.rudof.lock().await;
let mut v = Vec::new();
rudof
.serialize_data(&mut v)
.with_result_data_format(&ResultDataFormat::NTriples)
.execute()
.map_err(|e| state::StatePersistenceError::RdfSerialization(e.to_string()))?;
String::from_utf8(v).map_err(|e| state::StatePersistenceError::Json(e.to_string()))?
};
let triple_count = Self::count_triples_in_ntriples(&rdf_ntriples);
let persisted_state = state::PersistedState::with_rdf_data(rdf_ntriples, triple_count);
state::save_state(&persisted_state)?;
tracing::info!("Persisted {} triples to state file", triple_count);
Ok(true)
}
pub async fn persist_state(&self) -> Result<bool, state::StatePersistenceError> {
self.persist_state_with(None).await
}
pub fn get_format_argument_completions(argument_name: &str) -> Vec<String> {
let list: &[&str] = match argument_name {
"format" | "input_format" | "output_format" | "rdf_format" => RDF_FORMAT_LIST,
"schema_format" | "shex_format" => SHEX_FORMAT_LIST,
"shacl_format" | "shapes_format" => SHACL_FORMAT_LIST,
"result_format" => RESULT_FORMAT_LIST,
"mode" => NODE_INFO_MODE_LIST,
_ => return vec![],
};
list.iter().map(|s| s.to_string()).collect()
}
pub fn get_prompt_argument_completions(&self, prompt_name: &str, argument_name: &str) -> Vec<String> {
tracing::debug!(
prompt_name = %prompt_name,
argument_name = %argument_name,
"Getting prompt argument completions"
);
let format_completions = Self::get_format_argument_completions(argument_name);
if !format_completions.is_empty() {
return format_completions;
}
match (prompt_name, argument_name) {
(_, "base") | (_, "base_iri") => {
vec![
"http://example.org/".to_string(),
"https://schema.org/".to_string(),
"http://www.w3.org/2001/XMLSchema#".to_string(),
]
},
("analyze_rdf_data", "focus") | (_, "focus") => {
vec![
"all".to_string(),
"structure".to_string(),
"quality".to_string(),
"statistics".to_string(),
]
},
("validation_guide", "technology") | (_, "technology") => {
vec!["shex".to_string(), "shacl".to_string()]
},
_ => vec![],
}
}
pub fn get_resource_uri_completions(&self, uri: &str, argument_name: &str) -> Vec<String> {
tracing::debug!(
uri = %uri,
argument_name = %argument_name,
"Getting resource URI completions"
);
if !uri.starts_with("rudof://") {
return vec![];
}
match argument_name {
"endpoint" => vec![
"https://query.wikidata.org/sparql".to_string(),
"https://dbpedia.org/sparql".to_string(),
],
"result_format" => SPARQL_RESULT_FORMAT_LIST.iter().map(|s| s.to_string()).collect(),
"reader_mode" => READER_MODES_LIST.iter().map(|s| s.to_string()).collect(),
_ => Self::get_format_argument_completions(argument_name),
}
}
}
impl Default for RudofMcpService {
fn default() -> Self {
Self::new()
}
}