lazy_sparql_result_reader/
lib.rs

1pub mod parser;
2pub mod sparql;
3
4use crate::parser::{Parser, PartialResult};
5use js_sys::Uint8Array;
6use wasm_bindgen::{JsCast, JsValue};
7use web_sys::{ReadableStream, ReadableStreamDefaultReader};
8
9#[derive(Debug)]
10pub enum SparqlResultReaderError {
11    CorruptStream,
12    Canceled,
13    JsonParseError(String),
14}
15
16#[cfg(feature = "call_from_rust")]
17pub async fn read<F: AsyncFn(PartialResult)>(
18    stream: ReadableStream,
19    batch_size: usize,
20    limit: Option<usize>,
21    offset: usize,
22    callback: F,
23) -> Result<(), SparqlResultReaderError> {
24    let reader: ReadableStreamDefaultReader = stream.get_reader().unchecked_into();
25    let mut parser = Parser::new(batch_size, limit, offset);
26
27    loop {
28        let chunk = wasm_bindgen_futures::JsFuture::from(reader.read())
29            .await
30            .map_err(|err| {
31                let reason = err.as_string();
32                if reason.is_some_and(|reason| reason == "Query was canceled") {
33                    SparqlResultReaderError::Canceled
34                } else {
35                    SparqlResultReaderError::CorruptStream
36                }
37            })?;
38        if js_sys::Reflect::get(&chunk, &JsValue::from_str("done"))
39            .map_err(|_| SparqlResultReaderError::CorruptStream)?
40            .as_bool()
41            .unwrap_or(false)
42        {
43            break;
44        }
45        let bytes = Uint8Array::new(
46            &js_sys::Reflect::get(&chunk, &JsValue::from_str("value"))
47                .map_err(|_| SparqlResultReaderError::CorruptStream)?,
48        )
49        .to_vec();
50        for byte in bytes {
51            if let Some(partial_result) = parser
52                .read_byte(byte)
53                .map_err(|err| SparqlResultReaderError::JsonParseError(err.to_string()))?
54            {
55                callback(partial_result).await;
56            }
57        }
58    }
59    if let Some(buffered_bindings) = parser.flush() {
60        callback(buffered_bindings).await;
61    }
62    Ok(())
63}
64
65#[cfg(not(feature = "call_from_rust"))]
66use js_sys::Function;
67#[cfg(not(feature = "call_from_rust"))]
68use wasm_bindgen::prelude::wasm_bindgen;
69#[cfg(not(feature = "call_from_rust"))]
70#[wasm_bindgen]
71pub async fn read(
72    stream: ReadableStream,
73    batch_size: usize,
74    limit: Option<usize>,
75    offset: usize,
76    callback: &Function,
77) -> Result<(), JsValue> {
78    let reader: ReadableStreamDefaultReader = stream.get_reader().unchecked_into();
79    let mut parser = Parser::new(batch_size, limit, offset);
80
81    loop {
82        let chunk = wasm_bindgen_futures::JsFuture::from(reader.read()).await?;
83        if js_sys::Reflect::get(&chunk, &JsValue::from_str("done"))?
84            .as_bool()
85            .unwrap_or(false)
86        {
87            break;
88        }
89        let bytes =
90            Uint8Array::new(&js_sys::Reflect::get(&chunk, &JsValue::from_str("value"))?).to_vec();
91        for bytes in bytes {
92            if let Some(partial_result) = parser
93                .read_byte(bytes)
94                .map_err(|err| JsValue::from_str(&format!("JSON parse error: {err}")))?
95            {
96                callback
97                    .call1(
98                        &JsValue::NULL,
99                        &serde_wasm_bindgen::to_value(&partial_result)
100                            .expect("Every ParsedChunk should be serialiable"),
101                    )
102                    .expect("The JS function should not throw an error");
103            }
104        }
105    }
106
107    if let Some(PartialResult::Bindings(bindings)) = parser.flush() {
108        callback.call1(&JsValue::NULL, &serde_wasm_bindgen::to_value(&bindings)?)?;
109    }
110    Ok(())
111}