lazy_sparql_result_reader/
lib.rs1pub mod parser;
2pub mod sparql;
3
4use crate::parser::{Parser, PartialResult};
5use js_sys::Uint8Array;
6use wasm_bindgen::{JsCast, JsValue};
7use web_sys::{ReadableStream, ReadableStreamDefaultReader};
8
9#[derive(Debug)]
10pub enum SparqlResultReaderError {
11 CorruptStream,
12 Canceled,
13 JsonParseError(String),
14}
15
16#[cfg(feature = "call_from_rust")]
17pub async fn read<F: AsyncFn(PartialResult)>(
18 stream: ReadableStream,
19 batch_size: usize,
20 limit: Option<usize>,
21 offset: usize,
22 callback: F,
23) -> Result<(), SparqlResultReaderError> {
24 let reader: ReadableStreamDefaultReader = stream.get_reader().unchecked_into();
25 let mut parser = Parser::new(batch_size, limit, offset);
26
27 loop {
28 let chunk = wasm_bindgen_futures::JsFuture::from(reader.read())
29 .await
30 .map_err(|err| {
31 let reason = err.as_string();
32 if reason.is_some_and(|reason| reason == "Query was canceled") {
33 SparqlResultReaderError::Canceled
34 } else {
35 SparqlResultReaderError::CorruptStream
36 }
37 })?;
38 if js_sys::Reflect::get(&chunk, &JsValue::from_str("done"))
39 .map_err(|_| SparqlResultReaderError::CorruptStream)?
40 .as_bool()
41 .unwrap_or(false)
42 {
43 break;
44 }
45 let bytes = Uint8Array::new(
46 &js_sys::Reflect::get(&chunk, &JsValue::from_str("value"))
47 .map_err(|_| SparqlResultReaderError::CorruptStream)?,
48 )
49 .to_vec();
50 for byte in bytes {
51 if let Some(partial_result) = parser
52 .read_byte(byte)
53 .map_err(|err| SparqlResultReaderError::JsonParseError(err.to_string()))?
54 {
55 callback(partial_result).await;
56 }
57 }
58 }
59 if let Some(buffered_bindings) = parser.flush() {
60 callback(buffered_bindings).await;
61 }
62 Ok(())
63}
64
65#[cfg(not(feature = "call_from_rust"))]
66use js_sys::Function;
67#[cfg(not(feature = "call_from_rust"))]
68use wasm_bindgen::prelude::wasm_bindgen;
69#[cfg(not(feature = "call_from_rust"))]
70#[wasm_bindgen]
71pub async fn read(
72 stream: ReadableStream,
73 batch_size: usize,
74 limit: Option<usize>,
75 offset: usize,
76 callback: &Function,
77) -> Result<(), JsValue> {
78 let reader: ReadableStreamDefaultReader = stream.get_reader().unchecked_into();
79 let mut parser = Parser::new(batch_size, limit, offset);
80
81 loop {
82 let chunk = wasm_bindgen_futures::JsFuture::from(reader.read()).await?;
83 if js_sys::Reflect::get(&chunk, &JsValue::from_str("done"))?
84 .as_bool()
85 .unwrap_or(false)
86 {
87 break;
88 }
89 let bytes =
90 Uint8Array::new(&js_sys::Reflect::get(&chunk, &JsValue::from_str("value"))?).to_vec();
91 for bytes in bytes {
92 if let Some(partial_result) = parser
93 .read_byte(bytes)
94 .map_err(|err| JsValue::from_str(&format!("JSON parse error: {err}")))?
95 {
96 callback
97 .call1(
98 &JsValue::NULL,
99 &serde_wasm_bindgen::to_value(&partial_result)
100 .expect("Every ParsedChunk should be serialiable"),
101 )
102 .expect("The JS function should not throw an error");
103 }
104 }
105 }
106
107 if let Some(PartialResult::Bindings(bindings)) = parser.flush() {
108 callback.call1(&JsValue::NULL, &serde_wasm_bindgen::to_value(&bindings)?)?;
109 }
110 Ok(())
111}