Skip to main content

perspective_js/
typed_array.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::io::Cursor;
14
15use arrow_array::Array as _;
16use arrow_array::cast::AsArray;
17use arrow_array::types::*;
18use arrow_ipc::reader::StreamReader;
19use arrow_schema::{DataType, TimeUnit};
20use js_sys::{Array, Function, JsString, Uint8Array};
21use perspective_client::ViewWindow;
22use ts_rs::TS;
23use wasm_bindgen::JsCast;
24use wasm_bindgen::prelude::*;
25
26#[wasm_bindgen]
27unsafe extern "C" {
28    #[wasm_bindgen(typescript_type = "TypedArrayWindow")]
29    #[derive(Clone)]
30    pub type JsTypedArrayWindow;
31}
32
33/// Options for `with_typed_arrays`, extending `ViewWindow` with
34/// typed-array-specific options.
35#[derive(Default, serde::Deserialize, TS)]
36pub struct TypedArrayWindow {
37    #[serde(flatten)]
38    pub view_window: ViewWindow,
39
40    /// When `true`, Float64/Date32/Timestamp columns are output as
41    /// `Float32Array` instead of `Float64Array`.
42    #[serde(default)]
43    pub float32: bool,
44}
45
46impl From<TypedArrayWindow> for ViewWindow {
47    fn from(w: TypedArrayWindow) -> Self {
48        w.view_window
49    }
50}
51
52/// Decode an Arrow IPC batch and call `callback` once with all columns.
53///
54/// Callback signature:
55/// ```js
56/// callback(names: string[], values: TypedArray[], validities: (Uint8Array|null)[], dictionaries: (string[]|null)[]) => void | Promise<void>
57/// ```
58///
59/// If the callback returns a `Promise`, it is awaited before the Arrow
60/// batch (and therefore the zero-copy typed-array views into it) is
61/// dropped. A synchronous callback returning `undefined` is supported
62/// with no promise-handling overhead.
63pub(crate) async fn decode_and_call(
64    arrow: &[u8],
65    float32: bool,
66    callback: &Function,
67) -> Result<(), JsValue> {
68    let cursor = Cursor::new(arrow);
69    let reader = StreamReader::try_new(cursor, None)
70        .map_err(|e| JsValue::from_str(&format!("Arrow decode error: {e}")))?;
71
72    let batch = reader
73        .into_iter()
74        .next()
75        .ok_or_else(|| JsValue::from_str("Arrow IPC contained no record batches"))?
76        .map_err(|e| JsValue::from_str(&format!("Arrow batch error: {e}")))?;
77
78    let schema = batch.schema();
79    let num_cols = batch.num_columns();
80
81    let js_names = Array::new_with_length(num_cols as u32);
82    let js_values = Array::new_with_length(num_cols as u32);
83    let js_validities = Array::new_with_length(num_cols as u32);
84    let js_dicts = Array::new_with_length(num_cols as u32);
85
86    // Storage for allocated conversion buffers. These MUST outlive the
87    // callback because `js_sys::*Array::view()` creates zero-copy views
88    // into their heap memory. Using `Box<[T]>` (rather than `Vec<T>`)
89    // yields a stable pointer that won't move when the outer Vec grows.
90    let mut f32_storage: Vec<Box<[f32]>> = Vec::new();
91    let mut f64_storage: Vec<Box<[f64]>> = Vec::new();
92
93    for col_idx in 0..num_cols {
94        let field = schema.field(col_idx);
95        let col = batch.column(col_idx);
96        let validity = col.nulls().map(|nulls| nulls.validity());
97
98        js_names.set(col_idx as u32, JsString::from(field.name().as_str()).into());
99
100        match col.data_type() {
101            DataType::UInt32 => {
102                let vals = col.as_primitive::<UInt32Type>().values();
103                let arr = unsafe { js_sys::Uint32Array::view(vals.as_ref()) };
104                js_values.set(col_idx as u32, arr.into());
105                js_dicts.set(col_idx as u32, JsValue::NULL);
106            },
107            DataType::Int32 => {
108                let vals = col.as_primitive::<Int32Type>().values();
109                let arr = unsafe { js_sys::Int32Array::view(vals.as_ref()) };
110                js_values.set(col_idx as u32, arr.into());
111                js_dicts.set(col_idx as u32, JsValue::NULL);
112            },
113            DataType::Float32 => {
114                let vals = col.as_primitive::<Float32Type>().values();
115                let arr = unsafe { js_sys::Float32Array::view(vals.as_ref()) };
116                js_values.set(col_idx as u32, arr.into());
117                js_dicts.set(col_idx as u32, JsValue::NULL);
118            },
119            DataType::Float64 => {
120                if float32 {
121                    let vals = col.as_primitive::<Float64Type>().values();
122                    f32_storage.push(vals.iter().map(|&v| v as f32).collect());
123                } else {
124                    let vals = col.as_primitive::<Float64Type>().values();
125                    let arr = unsafe { js_sys::Float64Array::view(vals.as_ref()) };
126                    js_values.set(col_idx as u32, arr.into());
127                }
128                js_dicts.set(col_idx as u32, JsValue::NULL);
129            },
130            DataType::Date32 => {
131                // Datetime values are always emitted as Float64 — narrowing
132                // epoch-ms to f32 collapses ~256 ms of resolution at modern
133                // timestamps, so the `float32` flag is intentionally ignored
134                // for date/timestamp columns.
135                let typed = col.as_primitive::<Date32Type>();
136                f64_storage.push(
137                    typed
138                        .values()
139                        .iter()
140                        .map(|&v| v as f64 * 86_400_000.0)
141                        .collect(),
142                );
143                js_dicts.set(col_idx as u32, JsValue::NULL);
144            },
145            DataType::Timestamp(TimeUnit::Millisecond, _) => {
146                let typed = col.as_primitive::<TimestampMillisecondType>();
147                f64_storage.push(typed.values().iter().map(|&v| v as f64).collect());
148                js_dicts.set(col_idx as u32, JsValue::NULL);
149            },
150            DataType::Int64 => {
151                let typed = col.as_primitive::<Int64Type>();
152                if float32 {
153                    f32_storage.push(typed.values().iter().map(|&v| v as f32).collect());
154                } else {
155                    f64_storage.push(typed.values().iter().map(|&v| v as f64).collect());
156                }
157                js_dicts.set(col_idx as u32, JsValue::NULL);
158            },
159            DataType::Dictionary(..) => {
160                let dict = col.as_dictionary::<Int32Type>();
161                let keys = dict.keys();
162                let arr = unsafe { js_sys::Int32Array::view(keys.values().as_ref()) };
163                js_values.set(col_idx as u32, arr.into());
164
165                let values = dict.values().as_string::<i32>();
166                let js_dict = Array::new_with_length(values.len() as u32);
167                for i in 0..values.len() {
168                    js_dict.set(i as u32, JsValue::from_str(values.value(i)));
169                }
170                js_dicts.set(col_idx as u32, js_dict.into());
171            },
172            dt => {
173                return Err(JsValue::from_str(&format!(
174                    "Unsupported column type for typed array: {dt}"
175                )));
176            },
177        }
178
179        // SAFETY: Validity bitmap is owned by `batch` which outlives the
180        // callback — safe to view zero-copy.
181        let js_validity = validity.map(|v| unsafe { Uint8Array::view(v) });
182        js_validities.set(
183            col_idx as u32,
184            js_validity
185                .as_ref()
186                .map(JsValue::from)
187                .unwrap_or(JsValue::NULL),
188        );
189    }
190
191    // Second pass: fill in value views for columns backed by f32_storage /
192    // f64_storage. The Box<[T]> buffers are heap-allocated and stable; their
193    // data pointers remain valid even as the outer Vec grows.
194    let mut f32_idx = 0;
195    let mut f64_idx = 0;
196    for col_idx in 0..num_cols {
197        let col = batch.column(col_idx);
198        let uses_f32_storage = matches!(
199            (col.data_type(), float32),
200            (DataType::Float64, true) | (DataType::Int64, true),
201        );
202        let uses_f64_storage = matches!(
203            (col.data_type(), float32),
204            (DataType::Date32, _)
205                | (DataType::Timestamp(TimeUnit::Millisecond, _), _)
206                | (DataType::Int64, false),
207        );
208
209        if uses_f32_storage {
210            let arr = unsafe { js_sys::Float32Array::view(&f32_storage[f32_idx]) };
211            js_values.set(col_idx as u32, arr.into());
212            f32_idx += 1;
213        } else if uses_f64_storage {
214            let arr = unsafe { js_sys::Float64Array::view(&f64_storage[f64_idx]) };
215            js_values.set(col_idx as u32, arr.into());
216            f64_idx += 1;
217        }
218    }
219
220    let ret = callback.call4(
221        &JsValue::UNDEFINED,
222        &js_names.into(),
223        &js_values.into(),
224        &js_validities.into(),
225        &js_dicts.into(),
226    )?;
227
228    // If the callback returned a Promise, await it before releasing the
229    // batch — zero-copy TypedArray views into `batch`/`f32_storage`/
230    // `f64_storage` must remain valid for the full lifetime of the
231    // awaited work.
232    if ret.is_instance_of::<js_sys::Promise>() {
233        let promise: js_sys::Promise = ret.unchecked_into();
234        wasm_bindgen_futures::JsFuture::from(promise).await?;
235    }
236
237    // Keep storage alive until after the callback (and its awaited
238    // promise, if any) returns.
239    drop(f32_storage);
240    drop(f64_storage);
241
242    Ok(())
243}