use std::{collections::HashMap, io::Cursor, sync::Arc};
use tokio::sync::{Mutex, RwLock};
use crate::service::{prompts, state, tasks::TaskStore, tools};
use rmcp::{
RoleServer,
handler::server::router::{prompt::PromptRouter, tool::ToolRouter},
model::{LoggingLevel, ResourceUpdatedNotificationParam},
service::RequestContext,
};
use rudof_lib::{RDFFormat, ReaderMode, Rudof, RudofConfig};
#[derive(Debug)]
pub enum ServiceCreationError {
ConfigError(String),
RudofError(String),
}
impl std::fmt::Display for ServiceCreationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ConfigError(e) => write!(f, "Failed to create Rudof configuration: {}", e),
Self::RudofError(e) => write!(f, "Failed to initialize Rudof: {}", e),
}
}
}
impl std::error::Error for ServiceCreationError {}
#[derive(Clone)]
pub struct RudofMcpService {
pub rudof: Arc<Mutex<Rudof>>,
pub tool_router: ToolRouter<RudofMcpService>,
pub prompt_router: PromptRouter<RudofMcpService>,
pub resource_subscriptions: Arc<RwLock<HashMap<String, Vec<String>>>>,
pub current_min_log_level: Arc<RwLock<Option<LoggingLevel>>>,
pub current_context: Arc<RwLock<Option<RequestContext<RoleServer>>>>,
pub task_store: TaskStore,
}
impl RudofMcpService {
pub fn new() -> Self {
Self::try_new().expect("Failed to create RudofMcpService")
}
pub fn try_new() -> Result<Self, ServiceCreationError> {
let rudof_config = RudofConfig::new().map_err(|e| ServiceCreationError::ConfigError(e.to_string()))?;
let mut rudof = Rudof::new(&rudof_config).map_err(|e| ServiceCreationError::RudofError(e.to_string()))?;
if let Some(persisted_state) = state::load_state()
&& let Some(rdf_ntriples) = &persisted_state.rdf_data_ntriples
&& !rdf_ntriples.is_empty()
{
tracing::info!(
"Restoring {} triples from persisted state",
persisted_state.triple_count.unwrap_or(0)
);
let mut cursor = Cursor::new(rdf_ntriples.as_bytes());
if let Err(e) = rudof.read_data(
&mut cursor,
"persisted_state",
Some(&RDFFormat::NTriples),
None,
Some(&ReaderMode::default()),
Some(false),
) {
tracing::warn!("Failed to restore persisted RDF data: {}", e);
} else {
tracing::info!("Successfully restored RDF data from persisted state");
}
}
Ok(Self {
rudof: Arc::new(Mutex::new(rudof)),
tool_router: tools::tool_router_public(),
prompt_router: prompts::prompt_router_public(),
resource_subscriptions: Arc::new(RwLock::new(HashMap::new())),
current_min_log_level: Arc::new(RwLock::new(None)),
current_context: Arc::new(RwLock::new(None)),
task_store: TaskStore::new(),
})
}
pub async fn persist_state(&self) -> Result<bool, state::StatePersistenceError> {
if !state::is_persistence_available() {
tracing::debug!("State persistence not available (no state directory)");
return Ok(false);
}
let rudof = self.rudof.lock().await;
let mut buffer = Vec::new();
rudof
.serialize_data(Some(&RDFFormat::NTriples), &mut buffer)
.map_err(|e| state::StatePersistenceError::RdfSerialization(e.to_string()))?;
let rdf_ntriples = String::from_utf8(buffer).map_err(|e| state::StatePersistenceError::Json(e.to_string()))?;
let triple_count = rdf_ntriples
.lines()
.filter(|line| {
let trimmed = line.trim();
!trimmed.is_empty() && !trimmed.starts_with('#')
})
.count();
let persisted_state = state::PersistedState::with_rdf_data(rdf_ntriples, triple_count);
state::save_state(&persisted_state)?;
tracing::info!("Persisted {} triples to state file", triple_count);
Ok(true)
}
pub async fn subscribe_resource(&self, uri: String, subscriber_id: String) {
tracing::debug!(
uri = %uri,
subscriber_id = %subscriber_id,
"Subscribing to resource"
);
let mut subs = self.resource_subscriptions.write().await;
subs.entry(uri).or_insert_with(Vec::new).push(subscriber_id);
}
pub async fn unsubscribe_resource(&self, uri: &str, subscriber_id: &str) {
tracing::debug!(
uri = %uri,
subscriber_id = %subscriber_id,
"Unsubscribing from resource"
);
let mut subs = self.resource_subscriptions.write().await;
if let Some(subscribers) = subs.get_mut(uri) {
subscribers.retain(|id| id != subscriber_id);
if subscribers.is_empty() {
subs.remove(uri);
}
}
}
pub async fn get_resource_subscribers(&self, uri: &str) -> Vec<String> {
tracing::debug!(uri = %uri, "Getting resource subscribers");
let subs = self.resource_subscriptions.read().await;
subs.get(uri).cloned().unwrap_or_default()
}
pub async fn notify_resource_updated(&self, uri: String) {
tracing::debug!(uri = %uri, "Notifying resource updated");
let subscribers = self.get_resource_subscribers(&uri).await;
if subscribers.is_empty() {
tracing::debug!(uri = %uri, "No subscribers for resource update");
return;
}
let context_guard = self.current_context.read().await;
if let Some(context) = context_guard.as_ref() {
if let Err(e) = context
.peer
.notify_resource_updated(ResourceUpdatedNotificationParam { uri: uri.clone() })
.await
{
tracing::error!(
uri = %uri,
error = ?e,
"Failed to send resource updated notification"
);
} else {
tracing::debug!(
uri = %uri,
subscriber_count = subscribers.len(),
"Resource updated notification sent via rmcp"
);
}
} else {
tracing::debug!(
uri = %uri,
subscriber_count = subscribers.len(),
"Resource updated (no active request context)"
);
}
}
#[allow(dead_code)]
pub async fn notify_resource_list_changed(&self) {
tracing::debug!("Notifying resource list changed");
let context_guard = self.current_context.read().await;
if let Some(context) = context_guard.as_ref() {
if let Err(e) = context.peer.notify_resource_list_changed().await {
tracing::error!(
error = ?e,
"Failed to send resource list changed notification"
);
} else {
tracing::debug!("Resource list changed notification sent via rmcp");
}
} else {
tracing::debug!("Resource list changed (no active request context)");
}
}
pub fn get_prompt_argument_completions(&self, prompt_name: &str, argument_name: &str) -> Vec<String> {
tracing::debug!(
prompt_name = %prompt_name,
argument_name = %argument_name,
"Getting prompt argument completions"
);
match (prompt_name, argument_name) {
(_, "format") | (_, "input_format") | (_, "output_format") | (_, "rdf_format") => {
vec![
"turtle".to_string(),
"ntriples".to_string(),
"rdfxml".to_string(),
"trig".to_string(),
"nquads".to_string(),
"n3".to_string(),
"jsonld".to_string(),
]
},
(_, "schema_format") | (_, "shex_format") => {
vec![
"shexc".to_string(),
"shexj".to_string(),
"turtle".to_string(),
"ntriples".to_string(),
"rdfxml".to_string(),
"jsonld".to_string(),
"trig".to_string(),
"n3".to_string(),
"nquads".to_string(),
]
},
(_, "shacl_format") | (_, "shapes_format") => {
vec![
"turtle".to_string(),
"jsonld".to_string(),
"rdfxml".to_string(),
"trig".to_string(),
"nquads".to_string(),
"json".to_string(),
]
},
(_, "result_format") => {
vec![
"details".to_string(),
"compact".to_string(),
"json".to_string(),
"turtle".to_string(),
"ntriples".to_string(),
"rdfxml".to_string(),
"trig".to_string(),
"n3".to_string(),
"nquads".to_string(),
]
},
(_, "verbose") | (_, "debug") | (_, "strict") => {
vec!["true".to_string(), "false".to_string()]
},
(_, "base") | (_, "base_iri") => {
vec![
"http://example.org/".to_string(),
"https://schema.org/".to_string(),
"http://www.w3.org/2001/XMLSchema#".to_string(),
]
},
(_, "shape") | (_, "shape_label") | (_, "start_shape") => {
vec![
":Person".to_string(),
":Thing".to_string(),
":Organization".to_string(),
"schema:Person".to_string(),
"foaf:Person".to_string(),
]
},
(_, "node") | (_, "focus_node") => {
vec![":node1".to_string(), "<http://example.org/resource>".to_string()]
},
(_, "mode") => {
vec!["both".to_string(), "outgoing".to_string(), "incoming".to_string()]
},
("analyze_rdf_data", "focus") | (_, "focus") => {
vec![
"all".to_string(),
"structure".to_string(),
"quality".to_string(),
"statistics".to_string(),
]
},
("validation_guide", "technology") | (_, "technology") => {
vec!["shex".to_string(), "shacl".to_string()]
},
("sparql_builder", "query_type") | (_, "query_type") => {
vec![
"select".to_string(),
"construct".to_string(),
"ask".to_string(),
"describe".to_string(),
]
},
_ => vec![],
}
}
pub fn get_resource_uri_completions(&self, uri: &str, argument_name: &str) -> Vec<String> {
tracing::debug!(
uri = %uri,
argument_name = %argument_name,
"Getting resource URI completions"
);
if uri.starts_with("rudof://") {
match argument_name {
"format" => {
vec![
"turtle".to_string(),
"ntriples".to_string(),
"rdfxml".to_string(),
"jsonld".to_string(),
"trig".to_string(),
"nquads".to_string(),
"n3".to_string(),
]
},
"endpoint" => {
vec![
"https://query.wikidata.org/sparql".to_string(),
"https://dbpedia.org/sparql".to_string(),
"http://localhost:3030/sparql".to_string(),
]
},
"mode" => {
vec!["both".to_string(), "outgoing".to_string(), "incoming".to_string()]
},
"result_format" => {
vec![
"internal".to_string(),
"json".to_string(),
"xml".to_string(),
"csv".to_string(),
"tsv".to_string(),
"turtle".to_string(),
"ntriples".to_string(),
"rdfxml".to_string(),
"trig".to_string(),
]
},
"shex_format" | "schema_format" => {
vec![
"shexc".to_string(),
"shexj".to_string(),
"turtle".to_string(),
"ntriples".to_string(),
"rdfxml".to_string(),
"jsonld".to_string(),
]
},
"shacl_format" | "shapes_format" => {
vec![
"turtle".to_string(),
"jsonld".to_string(),
"rdfxml".to_string(),
"trig".to_string(),
"nquads".to_string(),
"json".to_string(),
]
},
"reader_mode" => {
vec!["strict".to_string(), "lax".to_string()]
},
_ => vec![],
}
} else {
vec![]
}
}
}
impl Default for RudofMcpService {
fn default() -> Self {
Self::new()
}
}