perspective_js/
typed_array.rs1use std::io::Cursor;
14
15use arrow_array::Array as _;
16use arrow_array::cast::AsArray;
17use arrow_array::types::*;
18use arrow_ipc::reader::StreamReader;
19use arrow_schema::{DataType, TimeUnit};
20use js_sys::{Array, Function, JsString, Uint8Array};
21use perspective_client::ViewWindow;
22use ts_rs::TS;
23use wasm_bindgen::JsCast;
24use wasm_bindgen::prelude::*;
25
26#[wasm_bindgen]
27unsafe extern "C" {
28 #[wasm_bindgen(typescript_type = "TypedArrayWindow")]
29 #[derive(Clone)]
30 pub type JsTypedArrayWindow;
31}
32
33#[derive(Default, serde::Deserialize, TS)]
36pub struct TypedArrayWindow {
37 #[serde(flatten)]
38 pub view_window: ViewWindow,
39
40 #[serde(default)]
43 pub float32: bool,
44}
45
46impl From<TypedArrayWindow> for ViewWindow {
47 fn from(w: TypedArrayWindow) -> Self {
48 w.view_window
49 }
50}
51
52pub(crate) async fn decode_and_call(
64 arrow: &[u8],
65 float32: bool,
66 callback: &Function,
67) -> Result<(), JsValue> {
68 let cursor = Cursor::new(arrow);
69 let reader = StreamReader::try_new(cursor, None)
70 .map_err(|e| JsValue::from_str(&format!("Arrow decode error: {e}")))?;
71
72 let batch = reader
73 .into_iter()
74 .next()
75 .ok_or_else(|| JsValue::from_str("Arrow IPC contained no record batches"))?
76 .map_err(|e| JsValue::from_str(&format!("Arrow batch error: {e}")))?;
77
78 let schema = batch.schema();
79 let num_cols = batch.num_columns();
80
81 let js_names = Array::new_with_length(num_cols as u32);
82 let js_values = Array::new_with_length(num_cols as u32);
83 let js_validities = Array::new_with_length(num_cols as u32);
84 let js_dicts = Array::new_with_length(num_cols as u32);
85
86 let mut f32_storage: Vec<Box<[f32]>> = Vec::new();
91 let mut f64_storage: Vec<Box<[f64]>> = Vec::new();
92
93 for col_idx in 0..num_cols {
94 let field = schema.field(col_idx);
95 let col = batch.column(col_idx);
96 let validity = col.nulls().map(|nulls| nulls.validity());
97
98 js_names.set(col_idx as u32, JsString::from(field.name().as_str()).into());
99
100 match col.data_type() {
101 DataType::UInt32 => {
102 let vals = col.as_primitive::<UInt32Type>().values();
103 let arr = unsafe { js_sys::Uint32Array::view(vals.as_ref()) };
104 js_values.set(col_idx as u32, arr.into());
105 js_dicts.set(col_idx as u32, JsValue::NULL);
106 },
107 DataType::Int32 => {
108 let vals = col.as_primitive::<Int32Type>().values();
109 let arr = unsafe { js_sys::Int32Array::view(vals.as_ref()) };
110 js_values.set(col_idx as u32, arr.into());
111 js_dicts.set(col_idx as u32, JsValue::NULL);
112 },
113 DataType::Float32 => {
114 let vals = col.as_primitive::<Float32Type>().values();
115 let arr = unsafe { js_sys::Float32Array::view(vals.as_ref()) };
116 js_values.set(col_idx as u32, arr.into());
117 js_dicts.set(col_idx as u32, JsValue::NULL);
118 },
119 DataType::Float64 => {
120 if float32 {
121 let vals = col.as_primitive::<Float64Type>().values();
122 f32_storage.push(vals.iter().map(|&v| v as f32).collect());
123 } else {
124 let vals = col.as_primitive::<Float64Type>().values();
125 let arr = unsafe { js_sys::Float64Array::view(vals.as_ref()) };
126 js_values.set(col_idx as u32, arr.into());
127 }
128 js_dicts.set(col_idx as u32, JsValue::NULL);
129 },
130 DataType::Date32 => {
131 let typed = col.as_primitive::<Date32Type>();
136 f64_storage.push(
137 typed
138 .values()
139 .iter()
140 .map(|&v| v as f64 * 86_400_000.0)
141 .collect(),
142 );
143 js_dicts.set(col_idx as u32, JsValue::NULL);
144 },
145 DataType::Timestamp(TimeUnit::Millisecond, _) => {
146 let typed = col.as_primitive::<TimestampMillisecondType>();
147 f64_storage.push(typed.values().iter().map(|&v| v as f64).collect());
148 js_dicts.set(col_idx as u32, JsValue::NULL);
149 },
150 DataType::Int64 => {
151 let typed = col.as_primitive::<Int64Type>();
152 if float32 {
153 f32_storage.push(typed.values().iter().map(|&v| v as f32).collect());
154 } else {
155 f64_storage.push(typed.values().iter().map(|&v| v as f64).collect());
156 }
157 js_dicts.set(col_idx as u32, JsValue::NULL);
158 },
159 DataType::Dictionary(..) => {
160 let dict = col.as_dictionary::<Int32Type>();
161 let keys = dict.keys();
162 let arr = unsafe { js_sys::Int32Array::view(keys.values().as_ref()) };
163 js_values.set(col_idx as u32, arr.into());
164
165 let values = dict.values().as_string::<i32>();
166 let js_dict = Array::new_with_length(values.len() as u32);
167 for i in 0..values.len() {
168 js_dict.set(i as u32, JsValue::from_str(values.value(i)));
169 }
170 js_dicts.set(col_idx as u32, js_dict.into());
171 },
172 dt => {
173 return Err(JsValue::from_str(&format!(
174 "Unsupported column type for typed array: {dt}"
175 )));
176 },
177 }
178
179 let js_validity = validity.map(|v| unsafe { Uint8Array::view(v) });
182 js_validities.set(
183 col_idx as u32,
184 js_validity
185 .as_ref()
186 .map(JsValue::from)
187 .unwrap_or(JsValue::NULL),
188 );
189 }
190
191 let mut f32_idx = 0;
195 let mut f64_idx = 0;
196 for col_idx in 0..num_cols {
197 let col = batch.column(col_idx);
198 let uses_f32_storage = matches!(
199 (col.data_type(), float32),
200 (DataType::Float64, true) | (DataType::Int64, true),
201 );
202 let uses_f64_storage = matches!(
203 (col.data_type(), float32),
204 (DataType::Date32, _)
205 | (DataType::Timestamp(TimeUnit::Millisecond, _), _)
206 | (DataType::Int64, false),
207 );
208
209 if uses_f32_storage {
210 let arr = unsafe { js_sys::Float32Array::view(&f32_storage[f32_idx]) };
211 js_values.set(col_idx as u32, arr.into());
212 f32_idx += 1;
213 } else if uses_f64_storage {
214 let arr = unsafe { js_sys::Float64Array::view(&f64_storage[f64_idx]) };
215 js_values.set(col_idx as u32, arr.into());
216 f64_idx += 1;
217 }
218 }
219
220 let ret = callback.call4(
221 &JsValue::UNDEFINED,
222 &js_names.into(),
223 &js_values.into(),
224 &js_validities.into(),
225 &js_dicts.into(),
226 )?;
227
228 if ret.is_instance_of::<js_sys::Promise>() {
233 let promise: js_sys::Promise = ret.unchecked_into();
234 wasm_bindgen_futures::JsFuture::from(promise).await?;
235 }
236
237 drop(f32_storage);
240 drop(f64_storage);
241
242 Ok(())
243}