Skip to main content

oxirs_core/federation/
client.rs

1//! HTTP client for executing SPARQL queries against remote endpoints
2
3use crate::OxirsError;
4use reqwest::{Client, StatusCode};
5use std::time::Duration;
6use tracing::{debug, info, warn};
7
8/// Configuration for federation client
9#[derive(Debug, Clone)]
10pub struct FederationConfig {
11    /// Request timeout in seconds
12    pub timeout_secs: u64,
13    /// Maximum retries on failure
14    pub max_retries: u32,
15    /// User-Agent header
16    pub user_agent: String,
17    /// Accept header for SPARQL results
18    pub accept: String,
19}
20
21impl Default for FederationConfig {
22    fn default() -> Self {
23        Self {
24            timeout_secs: 30,
25            max_retries: 3,
26            user_agent: format!("OxiRS/{}", env!("CARGO_PKG_VERSION")),
27            accept: "application/sparql-results+json".to_string(),
28        }
29    }
30}
31
32/// HTTP client for federated SPARQL execution
33pub struct FederationClient {
34    client: Client,
35    config: FederationConfig,
36}
37
38impl FederationClient {
39    /// Create a new federation client with default configuration
40    pub fn new() -> Result<Self, OxirsError> {
41        Self::with_config(FederationConfig::default())
42    }
43
44    /// Create a new federation client with custom configuration
45    pub fn with_config(config: FederationConfig) -> Result<Self, OxirsError> {
46        let client = Client::builder()
47            .timeout(Duration::from_secs(config.timeout_secs))
48            .user_agent(&config.user_agent)
49            .build()
50            .map_err(|e| OxirsError::Federation(format!("Failed to create HTTP client: {}", e)))?;
51
52        Ok(Self { client, config })
53    }
54
55    /// Execute a SPARQL query against a remote endpoint
56    ///
57    /// # Arguments
58    /// * `endpoint` - The SPARQL endpoint URL
59    /// * `query` - The SPARQL query string
60    /// * `silent` - If true, suppress errors and return empty results
61    ///
62    /// # Returns
63    /// JSON response body as a string
64    pub async fn execute_query(
65        &self,
66        endpoint: &str,
67        query: &str,
68        silent: bool,
69    ) -> Result<String, OxirsError> {
70        debug!("Executing federated query to endpoint: {}", endpoint);
71        debug!("Query: {}", query);
72
73        let mut last_error = None;
74
75        for attempt in 1..=self.config.max_retries {
76            match self.execute_query_once(endpoint, query).await {
77                Ok(response) => {
78                    info!(
79                        "Successfully executed federated query (attempt {}/{})",
80                        attempt, self.config.max_retries
81                    );
82                    return Ok(response);
83                }
84                Err(e) => {
85                    warn!(
86                        "Federated query failed (attempt {}/{}): {}",
87                        attempt, self.config.max_retries, e
88                    );
89                    last_error = Some(e);
90
91                    if attempt < self.config.max_retries {
92                        // Exponential backoff
93                        let delay = Duration::from_millis(100 * 2u64.pow(attempt - 1));
94                        tokio::time::sleep(delay).await;
95                    }
96                }
97            }
98        }
99
100        if silent {
101            warn!("SERVICE SILENT: Returning empty results after all retries failed");
102            // Return empty SPARQL JSON results
103            Ok(r#"{"head":{"vars":[]},"results":{"bindings":[]}}"#.to_string())
104        } else {
105            Err(last_error.unwrap_or_else(|| {
106                OxirsError::Federation("Federated query failed with unknown error".to_string())
107            }))
108        }
109    }
110
111    /// Execute a single query attempt
112    async fn execute_query_once(&self, endpoint: &str, query: &str) -> Result<String, OxirsError> {
113        // SPARQL Protocol: POST with application/x-www-form-urlencoded
114        let response = self
115            .client
116            .post(endpoint)
117            .header("Accept", &self.config.accept)
118            .header("Content-Type", "application/x-www-form-urlencoded")
119            .body(format!("query={}", urlencoding::encode(query)))
120            .send()
121            .await
122            .map_err(|e| {
123                OxirsError::Federation(format!("Failed to send request to {}: {}", endpoint, e))
124            })?;
125
126        let status = response.status();
127        let body = response
128            .text()
129            .await
130            .map_err(|e| OxirsError::Federation(format!("Failed to read response body: {}", e)))?;
131
132        match status {
133            StatusCode::OK => Ok(body),
134            StatusCode::BAD_REQUEST => Err(OxirsError::Federation(format!(
135                "Bad request (400): {}",
136                body
137            ))),
138            StatusCode::NOT_FOUND => Err(OxirsError::Federation(format!(
139                "Endpoint not found (404): {}",
140                endpoint
141            ))),
142            StatusCode::INTERNAL_SERVER_ERROR => Err(OxirsError::Federation(format!(
143                "Server error (500): {}",
144                body
145            ))),
146            StatusCode::SERVICE_UNAVAILABLE => Err(OxirsError::Federation(format!(
147                "Service unavailable (503): {}",
148                endpoint
149            ))),
150            StatusCode::GATEWAY_TIMEOUT => Err(OxirsError::Federation(format!(
151                "Gateway timeout (504): {}",
152                endpoint
153            ))),
154            _ => Err(OxirsError::Federation(format!(
155                "Unexpected status code {}: {}",
156                status, body
157            ))),
158        }
159    }
160
161    /// Check if an endpoint is reachable (health check)
162    pub async fn check_endpoint(&self, endpoint: &str) -> bool {
163        debug!("Checking endpoint health: {}", endpoint);
164
165        // Simple ASK query to test connectivity
166        let test_query = "ASK { ?s ?p ?o } LIMIT 1";
167
168        match self.execute_query(endpoint, test_query, true).await {
169            Ok(_) => {
170                info!("Endpoint {} is healthy", endpoint);
171                true
172            }
173            Err(e) => {
174                warn!("Endpoint {} is unhealthy: {}", endpoint, e);
175                false
176            }
177        }
178    }
179
180    /// Get endpoint capabilities via SERVICE description
181    pub async fn get_service_description(&self, endpoint: &str) -> Result<String, OxirsError> {
182        debug!("Fetching service description for: {}", endpoint);
183
184        let describe_query = r#"
185            PREFIX sd: <http://www.w3.org/ns/sparql-service-description#>
186            DESCRIBE ?service
187            WHERE {
188                ?service a sd:Service
189            }
190        "#;
191
192        self.execute_query(endpoint, describe_query, false).await
193    }
194}
195
196impl Default for FederationClient {
197    fn default() -> Self {
198        Self::new().expect("Failed to create default federation client")
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    #[tokio::test]
207    async fn test_client_creation() {
208        let client = FederationClient::new();
209        assert!(client.is_ok());
210    }
211
212    #[tokio::test]
213    async fn test_client_with_custom_config() {
214        let config = FederationConfig {
215            timeout_secs: 60,
216            max_retries: 5,
217            user_agent: "TestAgent/1.0".to_string(),
218            accept: "application/sparql-results+json".to_string(),
219        };
220
221        let client = FederationClient::with_config(config);
222        assert!(client.is_ok());
223    }
224
225    #[tokio::test]
226    async fn test_silent_mode_on_error() {
227        let client = FederationClient::new().expect("construction should succeed");
228
229        // Non-existent endpoint should return empty results in silent mode
230        let result = client
231            .execute_query(
232                "http://nonexistent.example.org/sparql",
233                "SELECT * WHERE { ?s ?p ?o }",
234                true,
235            )
236            .await;
237
238        assert!(result.is_ok());
239        let body = result.expect("should have value");
240        assert!(body.contains("bindings"));
241    }
242}