oxirs_core/federation/
client.rs1use crate::OxirsError;
4use reqwest::{Client, StatusCode};
5use std::time::Duration;
6use tracing::{debug, info, warn};
7
8#[derive(Debug, Clone)]
10pub struct FederationConfig {
11 pub timeout_secs: u64,
13 pub max_retries: u32,
15 pub user_agent: String,
17 pub accept: String,
19}
20
21impl Default for FederationConfig {
22 fn default() -> Self {
23 Self {
24 timeout_secs: 30,
25 max_retries: 3,
26 user_agent: format!("OxiRS/{}", env!("CARGO_PKG_VERSION")),
27 accept: "application/sparql-results+json".to_string(),
28 }
29 }
30}
31
32pub struct FederationClient {
34 client: Client,
35 config: FederationConfig,
36}
37
38impl FederationClient {
39 pub fn new() -> Result<Self, OxirsError> {
41 Self::with_config(FederationConfig::default())
42 }
43
44 pub fn with_config(config: FederationConfig) -> Result<Self, OxirsError> {
46 let client = Client::builder()
47 .timeout(Duration::from_secs(config.timeout_secs))
48 .user_agent(&config.user_agent)
49 .build()
50 .map_err(|e| OxirsError::Federation(format!("Failed to create HTTP client: {}", e)))?;
51
52 Ok(Self { client, config })
53 }
54
55 pub async fn execute_query(
65 &self,
66 endpoint: &str,
67 query: &str,
68 silent: bool,
69 ) -> Result<String, OxirsError> {
70 debug!("Executing federated query to endpoint: {}", endpoint);
71 debug!("Query: {}", query);
72
73 let mut last_error = None;
74
75 for attempt in 1..=self.config.max_retries {
76 match self.execute_query_once(endpoint, query).await {
77 Ok(response) => {
78 info!(
79 "Successfully executed federated query (attempt {}/{})",
80 attempt, self.config.max_retries
81 );
82 return Ok(response);
83 }
84 Err(e) => {
85 warn!(
86 "Federated query failed (attempt {}/{}): {}",
87 attempt, self.config.max_retries, e
88 );
89 last_error = Some(e);
90
91 if attempt < self.config.max_retries {
92 let delay = Duration::from_millis(100 * 2u64.pow(attempt - 1));
94 tokio::time::sleep(delay).await;
95 }
96 }
97 }
98 }
99
100 if silent {
101 warn!("SERVICE SILENT: Returning empty results after all retries failed");
102 Ok(r#"{"head":{"vars":[]},"results":{"bindings":[]}}"#.to_string())
104 } else {
105 Err(last_error.unwrap_or_else(|| {
106 OxirsError::Federation("Federated query failed with unknown error".to_string())
107 }))
108 }
109 }
110
111 async fn execute_query_once(&self, endpoint: &str, query: &str) -> Result<String, OxirsError> {
113 let response = self
115 .client
116 .post(endpoint)
117 .header("Accept", &self.config.accept)
118 .header("Content-Type", "application/x-www-form-urlencoded")
119 .body(format!("query={}", urlencoding::encode(query)))
120 .send()
121 .await
122 .map_err(|e| {
123 OxirsError::Federation(format!("Failed to send request to {}: {}", endpoint, e))
124 })?;
125
126 let status = response.status();
127 let body = response
128 .text()
129 .await
130 .map_err(|e| OxirsError::Federation(format!("Failed to read response body: {}", e)))?;
131
132 match status {
133 StatusCode::OK => Ok(body),
134 StatusCode::BAD_REQUEST => Err(OxirsError::Federation(format!(
135 "Bad request (400): {}",
136 body
137 ))),
138 StatusCode::NOT_FOUND => Err(OxirsError::Federation(format!(
139 "Endpoint not found (404): {}",
140 endpoint
141 ))),
142 StatusCode::INTERNAL_SERVER_ERROR => Err(OxirsError::Federation(format!(
143 "Server error (500): {}",
144 body
145 ))),
146 StatusCode::SERVICE_UNAVAILABLE => Err(OxirsError::Federation(format!(
147 "Service unavailable (503): {}",
148 endpoint
149 ))),
150 StatusCode::GATEWAY_TIMEOUT => Err(OxirsError::Federation(format!(
151 "Gateway timeout (504): {}",
152 endpoint
153 ))),
154 _ => Err(OxirsError::Federation(format!(
155 "Unexpected status code {}: {}",
156 status, body
157 ))),
158 }
159 }
160
161 pub async fn check_endpoint(&self, endpoint: &str) -> bool {
163 debug!("Checking endpoint health: {}", endpoint);
164
165 let test_query = "ASK { ?s ?p ?o } LIMIT 1";
167
168 match self.execute_query(endpoint, test_query, true).await {
169 Ok(_) => {
170 info!("Endpoint {} is healthy", endpoint);
171 true
172 }
173 Err(e) => {
174 warn!("Endpoint {} is unhealthy: {}", endpoint, e);
175 false
176 }
177 }
178 }
179
180 pub async fn get_service_description(&self, endpoint: &str) -> Result<String, OxirsError> {
182 debug!("Fetching service description for: {}", endpoint);
183
184 let describe_query = r#"
185 PREFIX sd: <http://www.w3.org/ns/sparql-service-description#>
186 DESCRIBE ?service
187 WHERE {
188 ?service a sd:Service
189 }
190 "#;
191
192 self.execute_query(endpoint, describe_query, false).await
193 }
194}
195
196impl Default for FederationClient {
197 fn default() -> Self {
198 Self::new().expect("Failed to create default federation client")
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 #[tokio::test]
207 async fn test_client_creation() {
208 let client = FederationClient::new();
209 assert!(client.is_ok());
210 }
211
212 #[tokio::test]
213 async fn test_client_with_custom_config() {
214 let config = FederationConfig {
215 timeout_secs: 60,
216 max_retries: 5,
217 user_agent: "TestAgent/1.0".to_string(),
218 accept: "application/sparql-results+json".to_string(),
219 };
220
221 let client = FederationClient::with_config(config);
222 assert!(client.is_ok());
223 }
224
225 #[tokio::test]
226 async fn test_silent_mode_on_error() {
227 let client = FederationClient::new().expect("construction should succeed");
228
229 let result = client
231 .execute_query(
232 "http://nonexistent.example.org/sparql",
233 "SELECT * WHERE { ?s ?p ?o }",
234 true,
235 )
236 .await;
237
238 assert!(result.is_ok());
239 let body = result.expect("should have value");
240 assert!(body.contains("bindings"));
241 }
242}