1#![deny(missing_docs)]
37
38use reqwest::{Client, Error as ReqwestError};
39use sophia_api::prelude::*;
40use sophia_api::sparql::{
41 IntoQuery, Query as SparqlQuery, SparqlBindings, SparqlDataset, SparqlResult,
42};
43use sophia_api::term::SimpleTerm;
44use sophia_turtle::parser::{nt, turtle};
45use sophia_xml::parser as rdfxml;
46use std::borrow::Borrow;
47use std::io::Cursor;
48
49mod results;
50pub use results::BindingsDocument as Bindings;
51pub use results::ResultsDocument;
52
53type StaticTerm = SimpleTerm<'static>;
54
55pub struct SparqlClient {
59 endpoint: Box<str>,
60 client: Client,
61 accept: Option<String>,
62}
63
64impl SparqlClient {
65 pub const DEFAULT_ACCEPT: &'static str = "application/sparql-results+json,application/sparql-results+xml;q=0.8,text/turtle,application/n-triples;q=0.9,application/rdf+xml;q=0.8";
67
68 #[must_use]
70 pub fn new(endpoint: &str) -> Self {
71 Self {
72 endpoint: Box::from(endpoint),
73 client: Client::new(),
74 accept: None,
75 }
76 }
77
78 #[must_use]
80 pub fn with_client(mut self, client: Client) -> Self {
81 self.client = client;
82 self
83 }
84
85 #[must_use]
91 pub fn with_accept<T: Into<String>>(mut self, accept: T) -> Self {
92 self.accept = Some(accept.into());
93 self
94 }
95
96 #[must_use]
98 pub fn accept(&self) -> &str {
99 self.accept.as_deref().unwrap_or(Self::DEFAULT_ACCEPT)
100 }
101
102 #[allow(clippy::unnecessary_wraps)]
103 fn wrap_triple_source<T: TripleSource + 'static>(
104 triples: T,
105 ) -> Result<SparqlResult<Self>, Error>
106 where
107 Error: From<T::Error>,
108 {
109 let it: Box<dyn Iterator<Item = Result<[StaticTerm; 3], Error>>> = Box::new(
110 triples
111 .map_triples(|t| [t.s().into_term(), t.p().into_term(), t.o().into_term()])
112 .into_iter()
113 .map(|r| r.map_err(Error::from)),
114 );
115 Ok(SparqlResult::Triples(it))
116 }
117
118 pub async fn async_query<Q>(&self, query: Q) -> Result<SparqlResult<Self>, Error>
120 where
121 Q: IntoQuery<Query>,
122 {
123 let query = query.into_query()?;
124 let resp = self
125 .client
126 .post(&self.endpoint[..])
127 .header("Accept", self.accept())
128 .header("Content-type", "application/sparql-query")
129 .header("User-Agent", "Sophia SPARQL Client")
130 .body(query.borrow().0.to_string())
131 .send()
132 .await?;
133 let ctype = resp
134 .headers()
135 .get("content-type")
136 .map_or("application/octet-stream", |h| h.to_str().unwrap())
137 .split(';')
138 .next()
139 .unwrap();
140 match ctype {
141 "application/sparql-results+json" => match resp.json::<ResultsDocument>().await? {
142 ResultsDocument::Boolean { boolean, .. } => Ok(SparqlResult::Boolean(boolean)),
143 ResultsDocument::Bindings { doc } => Ok(SparqlResult::Bindings(doc)),
144 },
145 "application/sparql-results+xml" => {
146 let body: Cursor<Vec<u8>> = Cursor::new(resp.bytes().await?.into());
147 match ResultsDocument::from_xml(body)? {
148 ResultsDocument::Boolean { boolean, .. } => Ok(SparqlResult::Boolean(boolean)),
149 ResultsDocument::Bindings { doc } => Ok(SparqlResult::Bindings(doc)),
150 }
151 }
152 "text/turtle" => {
153 let body: Cursor<Vec<u8>> = Cursor::new(resp.bytes().await?.into());
154 Self::wrap_triple_source(turtle::parse_bufread(body))
155 }
156 "application/n-triples" => {
157 let body: Cursor<Vec<u8>> = Cursor::new(resp.bytes().await?.into());
158 Self::wrap_triple_source(nt::parse_bufread(body))
159 }
160 "application/rdf+xml" => {
161 let body: Cursor<Vec<u8>> = Cursor::new(resp.bytes().await?.into());
162 Self::wrap_triple_source(rdfxml::parse_bufread(body))
163 }
164 _ => Err(Error::Unsupported(format!(
165 "unsupported content-type: {0}",
166 &ctype,
167 ))),
168 }
169 }
170}
171
172impl SparqlDataset for SparqlClient {
173 type BindingsTerm = StaticTerm;
174 type BindingsResult = Bindings;
175 type TriplesResult = Box<dyn Iterator<Item = Result<[StaticTerm; 3], Error>>>;
176 type SparqlError = Error;
177 type Query = Query;
178
179 fn query<Q>(&self, query: Q) -> Result<SparqlResult<Self>, Error>
180 where
181 Q: IntoQuery<Query>,
182 {
183 let rt = tokio::runtime::Builder::new_current_thread()
184 .enable_all()
185 .build()
186 .expect("Could not build tokio runtime");
187 rt.block_on(self.async_query(query))
188 }
189}
190
191impl SparqlBindings<SparqlClient> for Bindings {
192 fn variables(&self) -> Vec<&str> {
193 self.head
194 .vars
195 .iter()
196 .map(AsRef::as_ref)
197 .collect::<Vec<&str>>()
198 }
199}
200
201impl Iterator for Bindings {
202 type Item = Result<Vec<Option<StaticTerm>>, Error>;
203
204 fn next(&mut self) -> Option<Self::Item> {
205 if self.results.bindings.is_empty() {
206 None
207 } else {
208 Some(self.pop_binding())
209 }
210 }
211}
212
213#[derive(Debug, thiserror::Error)]
214pub enum Error {
216 #[error("i/o error: {0}")]
217 Io(
219 #[source]
220 #[from]
221 std::io::Error,
222 ),
223
224 #[error("http error: {0}")]
225 Http(#[source] Box<ReqwestError>),
227
228 #[error("{0}")]
229 Unsupported(String),
231
232 #[error("invalid iri: {0}")]
233 Iri(
235 #[source]
236 #[from]
237 sophia_iri::InvalidIri,
238 ),
239
240 #[error("invalid bnode identifier: {0}")]
241 BNode(
243 #[source]
244 #[from]
245 sophia_api::term::bnode_id::InvalidBnodeId,
246 ),
247
248 #[error("invalid language tag: {0}")]
249 LanguageTag(
251 #[source]
252 #[from]
253 sophia_api::term::language_tag::InvalidLanguageTag,
254 ),
255
256 #[error("invalid base direction: {0}")]
257 BaseDirection(Box<str>),
259
260 #[error("turtle parsing error: {0}")]
261 Turtle(
263 #[source]
264 #[from]
265 sophia_turtle::parser::Error,
266 ),
267
268 #[error("RDF/XML parsing error: {0}")]
269 RdfXml(
271 #[source]
272 #[from]
273 sophia_xml::parser::Error,
274 ),
275
276 #[error("JSON results parsing error: {0}")]
277 Json(
279 #[source]
280 #[from]
281 serde_json::Error,
282 ),
283
284 #[error("XML results parsing error: {0}")]
285 Xml(
287 #[source]
288 #[from]
289 quick_xml::Error,
290 ),
291
292 #[error("XML encoding error: {0}")]
293 XmlEncoding(
295 #[source]
296 #[from]
297 quick_xml::encoding::EncodingError,
298 ),
299
300 #[error("XML results structural error: {0}")]
301 SparqlXml(String),
303}
304
305impl From<ReqwestError> for Error {
306 fn from(other: ReqwestError) -> Error {
307 Error::Http(Box::new(other))
308 }
309}
310
311pub struct Query(Box<str>);
317
318impl SparqlQuery for Query {
319 type Error = Error;
320
321 fn parse(query_source: &str) -> Result<Self, Self::Error> {
322 Ok(Query(
323 <String as SparqlQuery>::parse(query_source).unwrap().into(),
324 ))
325 }
326
327 fn parse_with(query_source: &str, base: Iri<&str>) -> Result<Self, Self::Error> {
328 Ok(Query(
329 <String as SparqlQuery>::parse_with(query_source, base)
330 .unwrap()
331 .into(),
332 ))
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use SparqlResult::*;
340 use sophia_api::term::{LanguageTag, TermKind};
341 use sophia_isomorphism::isomorphic_graphs;
342
343 type TestResult<T = ()> = Result<T, Box<dyn std::error::Error>>;
344
345 fn client() -> TestResult<SparqlClient> {
346 let endpoint = std::env::var("SOPHIA_SPARQL_ENDPOINT")
347 .map_err(|_| "Please set SOPHIA_SPARQL_ENDPOINT with an endpoint URL".to_string())?;
348 Ok(SparqlClient::new(&endpoint))
349 }
350
351 #[test]
352 #[ignore = "requires a running SPARQL endpoint"]
353 fn select_simple() -> TestResult {
354 match client()?.query("SELECT (42 as ?answer) {}")? {
355 Bindings(b) => {
356 assert_eq!(b.variables(), vec!["answer".to_string()]);
357 let bindings = b.into_iter().collect::<Vec<_>>();
358 assert_eq!(bindings.len(), 1);
359 assert_eq!(bindings[0].as_ref().unwrap()[0], Some(42.into_term()));
360 }
361 _ => panic!(),
362 };
363 Ok(())
364 }
365
366 #[test]
367 #[ignore = "requires a running SPARQL endpoint"]
368 fn select_complex() -> TestResult {
369 match client()?.query(
370 r#"
371 PREFIX : <tag:>
372 SELECT ?x ?y ?z {}
373 VALUES (?x ?y ?z) {
374 (:a "simple" 42)
375 (UNDEF "lang"@en UNDEF)
376 (UNDEF UNDEF UNDEF)
377 }
378 "#,
379 )? {
380 Bindings(b) => {
381 assert_eq!(
382 b.variables(),
383 vec!["x".to_string(), "y".to_string(), "z".to_string()]
384 );
385 let bindings = b.into_iter().collect::<Vec<_>>();
386 assert_eq!(bindings.len(), 3);
387 assert_eq!(
388 bindings[0].as_ref().unwrap()[0],
389 Some(Iri::new_unchecked("tag:a").into_term())
390 );
391 assert_eq!(bindings[0].as_ref().unwrap()[1], Some("simple".into_term()));
392 assert_eq!(bindings[0].as_ref().unwrap()[2], Some(42.into_term()));
393 assert_eq!(bindings[1].as_ref().unwrap()[0], None);
394 assert_eq!(
395 bindings[1].as_ref().unwrap()[1],
396 Some(("lang" * LanguageTag::new_unchecked("en")).into_term())
397 );
398 assert_eq!(bindings[1].as_ref().unwrap()[2], None);
399 assert_eq!(bindings[2].as_ref().unwrap()[0], None);
400 assert_eq!(bindings[2].as_ref().unwrap()[1], None);
401 assert_eq!(bindings[2].as_ref().unwrap()[2], None);
402 }
403 _ => panic!(),
404 };
405 Ok(())
406 }
407
408 #[test]
409 #[ignore = "requires a running SPARQL endpoint"]
410 fn select_bnode() -> TestResult {
411 match client()?.query(
412 r#"
413 PREFIX : <tag:>
414 SELECT ?x {
415 BIND(BNODE() as ?x)
416 }
417 "#,
418 )? {
419 Bindings(b) => {
420 assert_eq!(b.variables(), vec!["x".to_string(),]);
421 let bindings = b.into_iter().collect::<Vec<_>>();
422 assert_eq!(bindings.len(), 1);
423 assert_eq!(
424 bindings[0].as_ref().unwrap()[0].as_ref().unwrap().kind(),
425 TermKind::BlankNode
426 );
427 }
428 _ => panic!(),
429 };
430 Ok(())
431 }
432
433 #[test]
434 #[ignore = "requires a running SPARQL endpoint"]
435 fn ask_true() -> TestResult {
436 match client()?.query("ASK {}")? {
437 Boolean(true) => (),
438 _ => panic!(),
439 };
440 Ok(())
441 }
442
443 #[test]
444 #[ignore = "requires a running SPARQL endpoint"]
445 fn ask_false() -> TestResult {
446 match client()?.query("PREFIX : <tag:> ASK {:abcdef :ghijkl :mnopqr}")? {
447 Boolean(false) => (),
448 _ => panic!(),
449 };
450 Ok(())
451 }
452
453 #[test]
454 #[ignore = "requires a running SPARQL endpoint"]
455 fn construct_empty() -> TestResult {
456 test_construct(client()?, "")
457 }
458
459 #[test]
460 #[ignore = "requires a running SPARQL endpoint"]
461 fn construct_one_triple() -> TestResult {
462 test_construct(client()?, "[] a 42.")
463 }
464
465 #[test]
466 #[ignore = "requires a running SPARQL endpoint"]
467 fn construct_complex() -> TestResult {
468 test_construct(client()?, COMPLEX_TTL)
469 }
470
471 #[test]
472 #[ignore = "requires a running SPARQL endpoint"]
473 fn construct_ntriples() -> TestResult {
474 test_construct(client()?.with_accept("application/n-triples"), COMPLEX_TTL)
475 }
476
477 #[test]
478 #[ignore = "requires a running SPARQL endpoint"]
479 fn construct_rdfxml() -> TestResult {
480 test_construct(client()?.with_accept("application/rdf+xml"), COMPLEX_TTL)
481 }
482
483 const COMPLEX_TTL: &str = r#"
484 :s :p1 :o1, :o2;
485 :p2 :o1, :o3;
486 :label "S".
487 "#;
488
489 fn test_construct(client: SparqlClient, ttl: &str) -> TestResult {
490 let src = format!("@prefix : <tag:>. {}", ttl);
491 let exp: Vec<[StaticTerm; 3]> = turtle::parse_str(&src).collect_triples()?;
492 let q = format!("PREFIX : <tag:> CONSTRUCT {{ {} }} {{}}", ttl);
493
494 match client.query(q.as_str())? {
495 Triples(triples) => {
496 let got: Vec<[StaticTerm; 3]> = triples.collect_triples()?;
497 assert!(isomorphic_graphs(&got, &exp)?);
498 }
499 _ => panic!(),
500 };
501 Ok(())
502 }
503}