Skip to main content

sophia_sparql_client/
lib.rs

1//! A client implementation of the [SPARQL1.1 protocol] based on [Sophia].
2//! It implements [`sophia_api::sparql::SparqlDataset`].
3//!
4//! Example:
5//! ```
6//! use sophia_api::sparql::{SparqlDataset, SparqlResult};
7//! use sophia_api::term::Term;
8//! use sophia_sparql_client::SparqlClient;
9//!
10//! # fn bla() -> Result<(), Box<dyn std::error::Error>> {
11//! let cli = SparqlClient::new("https://query.wikidata.org/bigdata/namespace/wdq/sparql");
12//! let query = r#"
13//!     #All Dr. Who performers
14//!     SELECT ?doctor ?doctorLabel ?ordinal ?performer ?performerLabel
15//!     WHERE {
16//!       ?doctor p:P31 ?type. ?type ps:P31 wd:Q47543030 .
17//!       OPTIONAL { ?type pq:P1545 ?ordinal } OPTIONAL { ?doctor wdt:P1545 ?ordinal }
18//!       OPTIONAL { ?doctor p:P175 / ps:P175 ?performer }
19//!       SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en" }
20//!     }
21//!     ORDER BY ASC(xsd:integer(?ordinal) )
22//! "#;
23//! if let SparqlResult::Bindings(bindings) = cli.query(query)? {
24//!     for b in bindings {
25//!         let b = b?;
26//!         let doctor_label = b[1].as_ref().and_then(|t| t.lexical_form()).unwrap();
27//!         let performer_label = b[4].as_ref().and_then(|t| t.lexical_form()).unwrap_or("NULL".into());
28//!         println!("{}\t{}", doctor_label, performer_label);
29//!     }
30//! }
31//! # Ok(()) }
32//! ```
33//!
34//! [SPARQL1.1 protocol]: https://www.w3.org/TR/sparql11-protocol/
35//! [Sophia]: https://docs.rs/sophia/
36#![deny(missing_docs)]
37
38use reqwest::{Client, Error as ReqwestError};
39use sophia_api::prelude::*;
40use sophia_api::sparql::{
41    IntoQuery, Query as SparqlQuery, SparqlBindings, SparqlDataset, SparqlResult,
42};
43use sophia_api::term::SimpleTerm;
44use sophia_turtle::parser::{nt, turtle};
45use sophia_xml::parser as rdfxml;
46use std::borrow::Borrow;
47use std::io::Cursor;
48
49mod results;
50pub use results::BindingsDocument as Bindings;
51pub use results::ResultsDocument;
52
53type StaticTerm = SimpleTerm<'static>;
54
55/// A [SPARQL 1.1] client implementing [`SparqlDataset`].
56///
57/// [SPARQL 1.1]: https://www.w3.org/TR/sparql11-protocol/
58pub struct SparqlClient {
59    endpoint: Box<str>,
60    client: Client,
61    accept: Option<String>,
62}
63
64impl SparqlClient {
65    /// The default [Accept HTTP header](https://tools.ietf.org/html/rfc7231.html#section-5.3.2) used by clients.
66    pub const DEFAULT_ACCEPT: &'static str = "application/sparql-results+json,application/sparql-results+xml;q=0.8,text/turtle,application/n-triples;q=0.9,application/rdf+xml;q=0.8";
67
68    /// Create a [`SparqlClient`] on the given SPARQL-endpoint URL.
69    #[must_use]
70    pub fn new(endpoint: &str) -> Self {
71        Self {
72            endpoint: Box::from(endpoint),
73            client: Client::new(),
74            accept: None,
75        }
76    }
77
78    /// Replace the underlying [`reqwest::Client`] of this client.
79    #[must_use]
80    pub fn with_client(mut self, client: Client) -> Self {
81        self.client = client;
82        self
83    }
84
85    /// Replace the [Accept HTTP header](https://tools.ietf.org/html/rfc7231.html#section-5.3.2) used by this client.
86    ///
87    /// This might be useful if the endpoint implements content-negotiation incorrectly.
88    ///
89    /// See also [`DEFAULT_ACCEPT`](Self::DEFAULT_ACCEPT)
90    #[must_use]
91    pub fn with_accept<T: Into<String>>(mut self, accept: T) -> Self {
92        self.accept = Some(accept.into());
93        self
94    }
95
96    /// The [Accept HTTP header](https://tools.ietf.org/html/rfc7231.html#section-5.3.2) used by this client.
97    #[must_use]
98    pub fn accept(&self) -> &str {
99        self.accept.as_deref().unwrap_or(Self::DEFAULT_ACCEPT)
100    }
101
102    #[allow(clippy::unnecessary_wraps)]
103    fn wrap_triple_source<T: TripleSource + 'static>(
104        triples: T,
105    ) -> Result<SparqlResult<Self>, Error>
106    where
107        Error: From<T::Error>,
108    {
109        let it: Box<dyn Iterator<Item = Result<[StaticTerm; 3], Error>>> = Box::new(
110            triples
111                .map_triples(|t| [t.s().into_term(), t.p().into_term(), t.o().into_term()])
112                .into_iter()
113                .map(|r| r.map_err(Error::from)),
114        );
115        Ok(SparqlResult::Triples(it))
116    }
117
118    /// Async counterpart of [`SparqlDataset::query`].
119    pub async fn async_query<Q>(&self, query: Q) -> Result<SparqlResult<Self>, Error>
120    where
121        Q: IntoQuery<Query>,
122    {
123        let query = query.into_query()?;
124        let resp = self
125            .client
126            .post(&self.endpoint[..])
127            .header("Accept", self.accept())
128            .header("Content-type", "application/sparql-query")
129            .header("User-Agent", "Sophia SPARQL Client")
130            .body(query.borrow().0.to_string())
131            .send()
132            .await?;
133        let ctype = resp
134            .headers()
135            .get("content-type")
136            .map_or("application/octet-stream", |h| h.to_str().unwrap())
137            .split(';')
138            .next()
139            .unwrap();
140        match ctype {
141            "application/sparql-results+json" => match resp.json::<ResultsDocument>().await? {
142                ResultsDocument::Boolean { boolean, .. } => Ok(SparqlResult::Boolean(boolean)),
143                ResultsDocument::Bindings { doc } => Ok(SparqlResult::Bindings(doc)),
144            },
145            "application/sparql-results+xml" => {
146                let body: Cursor<Vec<u8>> = Cursor::new(resp.bytes().await?.into());
147                match ResultsDocument::from_xml(body)? {
148                    ResultsDocument::Boolean { boolean, .. } => Ok(SparqlResult::Boolean(boolean)),
149                    ResultsDocument::Bindings { doc } => Ok(SparqlResult::Bindings(doc)),
150                }
151            }
152            "text/turtle" => {
153                let body: Cursor<Vec<u8>> = Cursor::new(resp.bytes().await?.into());
154                Self::wrap_triple_source(turtle::parse_bufread(body))
155            }
156            "application/n-triples" => {
157                let body: Cursor<Vec<u8>> = Cursor::new(resp.bytes().await?.into());
158                Self::wrap_triple_source(nt::parse_bufread(body))
159            }
160            "application/rdf+xml" => {
161                let body: Cursor<Vec<u8>> = Cursor::new(resp.bytes().await?.into());
162                Self::wrap_triple_source(rdfxml::parse_bufread(body))
163            }
164            _ => Err(Error::Unsupported(format!(
165                "unsupported content-type: {0}",
166                &ctype,
167            ))),
168        }
169    }
170}
171
172impl SparqlDataset for SparqlClient {
173    type BindingsTerm = StaticTerm;
174    type BindingsResult = Bindings;
175    type TriplesResult = Box<dyn Iterator<Item = Result<[StaticTerm; 3], Error>>>;
176    type SparqlError = Error;
177    type Query = Query;
178
179    fn query<Q>(&self, query: Q) -> Result<SparqlResult<Self>, Error>
180    where
181        Q: IntoQuery<Query>,
182    {
183        let rt = tokio::runtime::Builder::new_current_thread()
184            .enable_all()
185            .build()
186            .expect("Could not build tokio runtime");
187        rt.block_on(self.async_query(query))
188    }
189}
190
191impl SparqlBindings<SparqlClient> for Bindings {
192    fn variables(&self) -> Vec<&str> {
193        self.head
194            .vars
195            .iter()
196            .map(AsRef::as_ref)
197            .collect::<Vec<&str>>()
198    }
199}
200
201impl Iterator for Bindings {
202    type Item = Result<Vec<Option<StaticTerm>>, Error>;
203
204    fn next(&mut self) -> Option<Self::Item> {
205        if self.results.bindings.is_empty() {
206            None
207        } else {
208            Some(self.pop_binding())
209        }
210    }
211}
212
213#[derive(Debug, thiserror::Error)]
214/// Error type produced by [`SparqlClient`].
215pub enum Error {
216    #[error("i/o error: {0}")]
217    /// An [`std::io::Error`] occurred while communicating with the SPARQL endpoint.
218    Io(
219        #[source]
220        #[from]
221        std::io::Error,
222    ),
223
224    #[error("http error: {0}")]
225    /// A [`ReqwestError`] occurred while communicating with the SPARQL endpoint.
226    Http(#[source] Box<ReqwestError>),
227
228    #[error("{0}")]
229    /// An unsupported media-type was returned by the SPARQL endpoint.
230    Unsupported(String),
231
232    #[error("invalid iri: {0}")]
233    /// An invalid IRI was returned by the SPARQL endpoint.
234    Iri(
235        #[source]
236        #[from]
237        sophia_iri::InvalidIri,
238    ),
239
240    #[error("invalid bnode identifier: {0}")]
241    /// An invalid blank node identifier was returned by the SPARQL endpoint.
242    BNode(
243        #[source]
244        #[from]
245        sophia_api::term::bnode_id::InvalidBnodeId,
246    ),
247
248    #[error("invalid language tag: {0}")]
249    /// An invalid language tag was returned by the SPARQL endpoint.
250    LanguageTag(
251        #[source]
252        #[from]
253        sophia_api::term::language_tag::InvalidLanguageTag,
254    ),
255
256    #[error("invalid base direction: {0}")]
257    /// An invalid language tag was returned by the SPARQL endpoint.
258    BaseDirection(Box<str>),
259
260    #[error("turtle parsing error: {0}")]
261    /// Invalid Turtle syntax was returned by the SPARQL endpoint.
262    Turtle(
263        #[source]
264        #[from]
265        sophia_turtle::parser::Error,
266    ),
267
268    #[error("RDF/XML parsing error: {0}")]
269    /// Invalid RDF/XML syntax was returned by the SPARQL endpoint.
270    RdfXml(
271        #[source]
272        #[from]
273        sophia_xml::parser::Error,
274    ),
275
276    #[error("JSON results parsing error: {0}")]
277    /// Invalid XML syntax was returned by the SPARQL endpoint.
278    Json(
279        #[source]
280        #[from]
281        serde_json::Error,
282    ),
283
284    #[error("XML results parsing error: {0}")]
285    /// Invalid XML syntax was returned by the SPARQL endpoint.
286    Xml(
287        #[source]
288        #[from]
289        quick_xml::Error,
290    ),
291
292    #[error("XML encoding error: {0}")]
293    /// Invalid XML syntax was returned by the SPARQL endpoint.
294    XmlEncoding(
295        #[source]
296        #[from]
297        quick_xml::encoding::EncodingError,
298    ),
299
300    #[error("XML results structural error: {0}")]
301    /// Invalid XML Sparql results were returned by the SPARQL endpoint.
302    SparqlXml(String),
303}
304
305impl From<ReqwestError> for Error {
306    fn from(other: ReqwestError) -> Error {
307        Error::Http(Box::new(other))
308    }
309}
310
311/// A SPARQL query prepared for a [`SparqlClient`].
312///
313/// NB: Actually, this type simply wraps the query as a `Box<str>`,
314/// so [preparing](sophia_api::sparql::SparqlDataset::prepare_query)
315/// it in advance has no benefit for this implementation.
316pub struct Query(Box<str>);
317
318impl SparqlQuery for Query {
319    type Error = Error;
320
321    fn parse(query_source: &str) -> Result<Self, Self::Error> {
322        Ok(Query(
323            <String as SparqlQuery>::parse(query_source).unwrap().into(),
324        ))
325    }
326
327    fn parse_with(query_source: &str, base: Iri<&str>) -> Result<Self, Self::Error> {
328        Ok(Query(
329            <String as SparqlQuery>::parse_with(query_source, base)
330                .unwrap()
331                .into(),
332        ))
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use SparqlResult::*;
340    use sophia_api::term::{LanguageTag, TermKind};
341    use sophia_isomorphism::isomorphic_graphs;
342
343    type TestResult<T = ()> = Result<T, Box<dyn std::error::Error>>;
344
345    fn client() -> TestResult<SparqlClient> {
346        let endpoint = std::env::var("SOPHIA_SPARQL_ENDPOINT")
347            .map_err(|_| "Please set SOPHIA_SPARQL_ENDPOINT with an endpoint URL".to_string())?;
348        Ok(SparqlClient::new(&endpoint))
349    }
350
351    #[test]
352    #[ignore = "requires a running SPARQL endpoint"]
353    fn select_simple() -> TestResult {
354        match client()?.query("SELECT (42 as ?answer) {}")? {
355            Bindings(b) => {
356                assert_eq!(b.variables(), vec!["answer".to_string()]);
357                let bindings = b.into_iter().collect::<Vec<_>>();
358                assert_eq!(bindings.len(), 1);
359                assert_eq!(bindings[0].as_ref().unwrap()[0], Some(42.into_term()));
360            }
361            _ => panic!(),
362        };
363        Ok(())
364    }
365
366    #[test]
367    #[ignore = "requires a running SPARQL endpoint"]
368    fn select_complex() -> TestResult {
369        match client()?.query(
370            r#"
371            PREFIX : <tag:>
372            SELECT ?x ?y ?z {}
373            VALUES (?x ?y ?z) {
374                (:a "simple" 42)
375                (UNDEF "lang"@en UNDEF)
376                (UNDEF UNDEF UNDEF)
377            }
378        "#,
379        )? {
380            Bindings(b) => {
381                assert_eq!(
382                    b.variables(),
383                    vec!["x".to_string(), "y".to_string(), "z".to_string()]
384                );
385                let bindings = b.into_iter().collect::<Vec<_>>();
386                assert_eq!(bindings.len(), 3);
387                assert_eq!(
388                    bindings[0].as_ref().unwrap()[0],
389                    Some(Iri::new_unchecked("tag:a").into_term())
390                );
391                assert_eq!(bindings[0].as_ref().unwrap()[1], Some("simple".into_term()));
392                assert_eq!(bindings[0].as_ref().unwrap()[2], Some(42.into_term()));
393                assert_eq!(bindings[1].as_ref().unwrap()[0], None);
394                assert_eq!(
395                    bindings[1].as_ref().unwrap()[1],
396                    Some(("lang" * LanguageTag::new_unchecked("en")).into_term())
397                );
398                assert_eq!(bindings[1].as_ref().unwrap()[2], None);
399                assert_eq!(bindings[2].as_ref().unwrap()[0], None);
400                assert_eq!(bindings[2].as_ref().unwrap()[1], None);
401                assert_eq!(bindings[2].as_ref().unwrap()[2], None);
402            }
403            _ => panic!(),
404        };
405        Ok(())
406    }
407
408    #[test]
409    #[ignore = "requires a running SPARQL endpoint"]
410    fn select_bnode() -> TestResult {
411        match client()?.query(
412            r#"
413            PREFIX : <tag:>
414            SELECT ?x {
415                BIND(BNODE() as ?x)
416            }
417        "#,
418        )? {
419            Bindings(b) => {
420                assert_eq!(b.variables(), vec!["x".to_string(),]);
421                let bindings = b.into_iter().collect::<Vec<_>>();
422                assert_eq!(bindings.len(), 1);
423                assert_eq!(
424                    bindings[0].as_ref().unwrap()[0].as_ref().unwrap().kind(),
425                    TermKind::BlankNode
426                );
427            }
428            _ => panic!(),
429        };
430        Ok(())
431    }
432
433    #[test]
434    #[ignore = "requires a running SPARQL endpoint"]
435    fn ask_true() -> TestResult {
436        match client()?.query("ASK {}")? {
437            Boolean(true) => (),
438            _ => panic!(),
439        };
440        Ok(())
441    }
442
443    #[test]
444    #[ignore = "requires a running SPARQL endpoint"]
445    fn ask_false() -> TestResult {
446        match client()?.query("PREFIX : <tag:> ASK {:abcdef :ghijkl :mnopqr}")? {
447            Boolean(false) => (),
448            _ => panic!(),
449        };
450        Ok(())
451    }
452
453    #[test]
454    #[ignore = "requires a running SPARQL endpoint"]
455    fn construct_empty() -> TestResult {
456        test_construct(client()?, "")
457    }
458
459    #[test]
460    #[ignore = "requires a running SPARQL endpoint"]
461    fn construct_one_triple() -> TestResult {
462        test_construct(client()?, "[] a 42.")
463    }
464
465    #[test]
466    #[ignore = "requires a running SPARQL endpoint"]
467    fn construct_complex() -> TestResult {
468        test_construct(client()?, COMPLEX_TTL)
469    }
470
471    #[test]
472    #[ignore = "requires a running SPARQL endpoint"]
473    fn construct_ntriples() -> TestResult {
474        test_construct(client()?.with_accept("application/n-triples"), COMPLEX_TTL)
475    }
476
477    #[test]
478    #[ignore = "requires a running SPARQL endpoint"]
479    fn construct_rdfxml() -> TestResult {
480        test_construct(client()?.with_accept("application/rdf+xml"), COMPLEX_TTL)
481    }
482
483    const COMPLEX_TTL: &str = r#"
484        :s :p1 :o1, :o2;
485        :p2 :o1, :o3;
486        :label "S".
487    "#;
488
489    fn test_construct(client: SparqlClient, ttl: &str) -> TestResult {
490        let src = format!("@prefix : <tag:>. {}", ttl);
491        let exp: Vec<[StaticTerm; 3]> = turtle::parse_str(&src).collect_triples()?;
492        let q = format!("PREFIX : <tag:> CONSTRUCT {{ {} }} {{}}", ttl);
493
494        match client.query(q.as_str())? {
495            Triples(triples) => {
496                let got: Vec<[StaticTerm; 3]> = triples.collect_triples()?;
497                assert!(isomorphic_graphs(&got, &exp)?);
498            }
499            _ => panic!(),
500        };
501        Ok(())
502    }
503}