perspective_viewer/
session.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13mod column_defaults_update;
14mod drag_drop_update;
15mod metadata;
16mod replace_expression_update;
17mod view_subscription;
18
19use std::cell::{Ref, RefCell};
20use std::collections::HashSet;
21use std::future::Future;
22use std::ops::Deref;
23use std::rc::Rc;
24use std::sync::Arc;
25
26use perspective_client::config::*;
27use perspective_client::{ReconnectCallback, View, ViewWindow};
28use perspective_js::utils::*;
29use wasm_bindgen::prelude::*;
30use yew::html::ImplicitClone;
31use yew::prelude::*;
32
33use self::metadata::*;
34use self::replace_expression_update::*;
35pub use self::view_subscription::ViewStats;
36use self::view_subscription::*;
37use crate::dragdrop::*;
38use crate::js::plugin::*;
39use crate::utils::*;
40
41/// The `Session` struct is the principal interface to the Perspective engine,
42/// the `Table` and `View` objects for this viewer, and all associated state
43/// including the `ViewConfig`.
44#[derive(Clone, Default)]
45pub struct Session(Arc<SessionHandle>);
46
47impl ImplicitClone for Session {}
48
49/// Immutable state for `Session`.
50#[derive(Default)]
51pub struct SessionHandle {
52    session_data: RefCell<SessionData>,
53    pub table_updated: PubSub<()>,
54    pub table_loaded: PubSub<()>,
55    pub view_created: PubSub<()>,
56    pub view_config_changed: PubSub<()>,
57    pub stats_changed: PubSub<Option<ViewStats>>,
58    pub table_errored: PubSub<Option<String>>,
59}
60
61/// Mutable state for `Session`.
62#[derive(Default)]
63pub struct SessionData {
64    table: Option<perspective_client::Table>,
65    metadata: SessionMetadata,
66    old_config: Option<ViewConfig>,
67    config: ViewConfig,
68    view_sub: Option<ViewSubscription>,
69    stats: Option<ViewStats>,
70    is_clean: bool,
71    is_paused: bool,
72    error: Option<TableErrorState>,
73}
74
75#[derive(Clone, Default)]
76pub struct TableErrorState(Option<String>, Option<ReconnectCallback>);
77
78impl Deref for Session {
79    type Target = SessionHandle;
80
81    fn deref(&self) -> &Self::Target {
82        &self.0
83    }
84}
85
86impl PartialEq for Session {
87    fn eq(&self, other: &Self) -> bool {
88        Arc::ptr_eq(&self.0, &other.0)
89    }
90}
91
92impl Deref for SessionHandle {
93    type Target = RefCell<SessionData>;
94
95    fn deref(&self) -> &Self::Target {
96        &self.session_data
97    }
98}
99
100pub type MetadataRef<'a> = std::cell::Ref<'a, SessionMetadata>;
101pub type MetadataMutRef<'a> = std::cell::RefMut<'a, SessionMetadata>;
102
103impl Session {
104    pub fn metadata(&self) -> MetadataRef<'_> {
105        std::cell::Ref::map(self.borrow(), |x| &x.metadata)
106    }
107
108    pub fn metadata_mut(&self) -> MetadataMutRef<'_> {
109        std::cell::RefMut::map(self.borrow_mut(), |x| &mut x.metadata)
110    }
111
112    pub fn invalidate(&self) {
113        self.borrow_mut().error = None;
114        self.borrow_mut().is_clean = false;
115        self.borrow_mut().view_sub = None;
116    }
117
118    /// Reset this `Session`'s `View` state, but preserve the `Table`.
119    ///
120    /// # Arguments
121    /// - `reset_expressions` Whether to reset the `expressions` property.
122    pub fn reset(&self, reset_expressions: bool) -> impl Future<Output = ApiResult<()>> + use<> {
123        self.borrow_mut().is_clean = false;
124        let view = self.0.borrow_mut().view_sub.take();
125        self.borrow_mut().view_sub = None;
126        self.borrow_mut().config.reset(reset_expressions);
127        view.delete()
128    }
129
130    /// Reset this (presumably shared) `Session` to its initial state, returning
131    /// a bool indicating whether this `Session` had a table which was
132    /// deleted. TODO Table should be an immutable constructor parameter to
133    /// `Session`.
134    pub async fn delete(&self) -> ApiResult<()> {
135        self.borrow_mut().is_clean = false;
136        self.borrow_mut().config.reset(true);
137        self.borrow_mut().metadata = SessionMetadata::default();
138        self.borrow_mut().table = None;
139        let view = self.borrow_mut().view_sub.take();
140        view.delete().await?;
141        Ok(())
142    }
143
144    pub fn has_table(&self) -> bool {
145        self.borrow().table.is_some()
146    }
147
148    pub fn get_table(&self) -> Option<perspective_client::Table> {
149        self.borrow().table.clone()
150    }
151
152    /// Reset this `Session`'s state with a new `Table`.  Implicitly clears the
153    /// `ViewSubscription`, which will need to be re-initialized later via
154    /// `create_view()`.
155    pub async fn set_table(&self, table: perspective_client::Table) -> ApiResult<JsValue> {
156        match SessionMetadata::from_table(&table).await {
157            Ok(metadata) => {
158                let client = table.get_client();
159                let set_error = self.table_errored.as_boxfn();
160                let session = self.clone();
161                let poll_loop =
162                    LocalPollLoop::new(move |(message, reconnect): (Option<String>, _)| {
163                        set_error(message.clone());
164                        session.borrow_mut().error = Some(TableErrorState(message, reconnect));
165                        Ok(JsValue::UNDEFINED)
166                    });
167
168                let _callback_id = client
169                    .on_error(Box::new(move |message, reconnect| {
170                        let poll_loop = poll_loop.clone();
171                        async move {
172                            poll_loop.poll((message, reconnect)).await;
173                            Ok(())
174                        }
175                    }))
176                    .await?;
177
178                let sub = self.borrow_mut().view_sub.take();
179                self.borrow_mut().metadata = metadata;
180                self.borrow_mut().table = Some(table);
181                sub.delete().await?;
182                self.table_loaded.emit(());
183                Ok(JsValue::UNDEFINED)
184            },
185            Err(err) => self
186                .set_error(err.to_string())
187                .await
188                .map(|_| JsValue::UNDEFINED),
189        }
190    }
191
192    pub async fn set_error(&self, err: String) -> ApiResult<()> {
193        self.borrow_mut().error = Some(TableErrorState(Some(err.clone()), None));
194        self.table_errored.emit(Some(err.clone()));
195        let sub = self.borrow_mut().view_sub.take();
196        self.borrow_mut().metadata = SessionMetadata::default();
197        self.borrow_mut().table = None;
198        sub.delete().await?;
199        Err(err.into())
200    }
201
202    pub fn set_pause(&self, pause: bool) -> bool {
203        self.borrow_mut().is_clean = false;
204        if pause == self.borrow().is_paused {
205            false
206        } else if pause {
207            ApiFuture::spawn(self.borrow_mut().view_sub.take().delete());
208            self.borrow_mut().is_paused = true;
209            true
210        } else {
211            self.borrow_mut().is_paused = false;
212            true
213        }
214    }
215
216    pub async fn await_table(&self) -> ApiResult<()> {
217        if self.js_get_table().is_none() {
218            self.table_loaded.listen_once().await?;
219            let _ = self.js_get_table().ok_or("No table set")?;
220        }
221
222        Ok(())
223    }
224
225    pub fn js_get_table(&self) -> Option<JsValue> {
226        Some(perspective_js::Table::from(self.borrow().table.clone()?).into())
227    }
228
229    pub fn js_get_view(&self) -> Option<JsValue> {
230        let view = self.borrow().view_sub.as_ref()?.get_view().clone();
231        Some(perspective_js::View::from(view).into())
232    }
233
234    pub fn get_error(&self) -> Option<String> {
235        self.borrow().error.as_ref().and_then(|x| x.0.clone())
236    }
237
238    pub fn is_reconnect(&self) -> bool {
239        self.borrow()
240            .error
241            .as_ref()
242            .map(|x| x.1.is_some())
243            .unwrap_or_default()
244    }
245
246    pub async fn reconnect(&self) -> ApiResult<()> {
247        let err = self.borrow().error.clone();
248        if let Some(TableErrorState(_, Some(reconnect))) = err {
249            reconnect().await?;
250            self.borrow_mut().error = None;
251            self.borrow_mut().is_clean = false;
252            self.borrow_mut().view_sub = None;
253        }
254
255        Ok(())
256    }
257
258    pub fn is_column_expression_in_use(&self, name: &str) -> bool {
259        self.borrow().config.is_column_expression_in_use(name)
260    }
261
262    /// Is this column currently being used or not
263    pub fn is_column_active(&self, name: &str) -> bool {
264        let config = Ref::map(self.borrow(), |x| &x.config);
265        config.columns.iter().any(|maybe_col| {
266            maybe_col
267                .as_ref()
268                .map(|col| col == name)
269                .unwrap_or_default()
270        }) || config.group_by.iter().any(|col| col == name)
271            || config.split_by.iter().any(|col| col == name)
272            || config.filter.iter().any(|col| col.column() == name)
273            || config.sort.iter().any(|col| col.0 == name)
274    }
275
276    pub fn create_drag_drop_update(
277        &self,
278        column: String,
279        index: usize,
280        drop: DragTarget,
281        drag: DragEffect,
282        requirements: &ViewConfigRequirements,
283    ) -> ViewConfigUpdate {
284        use self::drag_drop_update::*;
285        let col_type = self
286            .metadata()
287            .get_column_table_type(column.as_str())
288            .unwrap();
289
290        self.get_view_config().create_drag_drop_update(
291            column,
292            col_type,
293            index,
294            drop,
295            drag,
296            requirements,
297            self.metadata().get_features().unwrap(),
298        )
299    }
300
301    /// An async task which replaces a `column` aliased expression with another.
302    pub async fn create_replace_expression_update(
303        &self,
304        old_expr_name: &str,
305        new_expr: &Expression<'static>,
306    ) -> ViewConfigUpdate {
307        let old_expr_val = self
308            .metadata()
309            .get_expression_by_alias(old_expr_name)
310            .unwrap();
311
312        let old_expr = Expression::new(Some(old_expr_name.into()), old_expr_val.into());
313
314        use self::replace_expression_update::*;
315        self.get_view_config()
316            .create_replace_expression_update(&old_expr, new_expr)
317    }
318
319    pub async fn create_rename_expression_update(
320        &self,
321        old_expr_name: String,
322        new_expr_name: Option<String>,
323    ) -> ViewConfigUpdate {
324        let old_expr_val = self
325            .metadata()
326            .get_expression_by_alias(&old_expr_name)
327            .expect_throw(&format!("Unable to get expr with name {old_expr_name}"));
328        let old_expr = Expression::new(Some(old_expr_name.into()), old_expr_val.clone().into());
329        let new_expr = Expression::new(new_expr_name.map(|n| n.into()), old_expr_val.into());
330        self.get_view_config()
331            .create_replace_expression_update(&old_expr, &new_expr)
332    }
333
334    /// Validate an expression string and marshall the results.
335    pub async fn validate_expr(
336        &self,
337        expr: &str,
338    ) -> Result<Option<perspective_client::ExprValidationError>, ApiError> {
339        // let arr = HashMap::from_iter([("_".to_string(), expr.to_string())]);
340        let table = self.borrow().table.as_ref().unwrap().clone();
341        let errors = table
342            .validate_expressions(
343                ExpressionsDeserde::Map(std::collections::HashMap::from_iter([(
344                    "_".to_string(),
345                    expr.to_string(),
346                )]))
347                .into(),
348            )
349            .await?
350            .errors;
351
352        Ok(errors.get("_").cloned())
353    }
354
355    pub async fn arrow_as_vec(
356        &self,
357        flat: bool,
358        window: Option<ViewWindow>,
359    ) -> Result<Vec<u8>, ApiError> {
360        Ok(self
361            .flat_view(flat)
362            .await?
363            .to_arrow(window.unwrap_or_default())
364            .await?
365            .into())
366    }
367
368    pub async fn arrow_as_jsvalue(
369        &self,
370        flat: bool,
371        window: Option<ViewWindow>,
372    ) -> Result<js_sys::ArrayBuffer, ApiError> {
373        let arrow = self
374            .flat_view(flat)
375            .await?
376            .to_arrow(window.unwrap_or_default())
377            .await?;
378        Ok(js_sys::Uint8Array::from(&arrow[..])
379            .buffer()
380            .unchecked_into())
381    }
382
383    pub async fn ndjson_as_jsvalue(
384        &self,
385        flat: bool,
386        window: Option<ViewWindow>,
387    ) -> Result<js_sys::JsString, ApiError> {
388        let json: String = self
389            .flat_view(flat)
390            .await?
391            .to_ndjson(window.unwrap_or_default())
392            .await?;
393
394        Ok(json.into())
395    }
396
397    pub async fn json_as_jsvalue(
398        &self,
399        flat: bool,
400        window: Option<ViewWindow>,
401    ) -> Result<js_sys::Object, ApiError> {
402        let json: String = self
403            .flat_view(flat)
404            .await?
405            .to_columns_string(window.unwrap_or_default())
406            .await?;
407
408        Ok(js_sys::JSON::parse(&json)?.unchecked_into())
409    }
410
411    pub async fn csv_as_jsvalue(
412        &self,
413        flat: bool,
414        window: Option<ViewWindow>,
415    ) -> Result<js_sys::JsString, ApiError> {
416        let window = window.unwrap_or_default();
417        let csv = self.flat_view(flat).await?.to_csv(window).await;
418        Ok(csv.map(js_sys::JsString::from)?)
419    }
420
421    pub fn get_view(&self) -> Option<View> {
422        self.borrow()
423            .view_sub
424            .as_ref()
425            .map(|sub| sub.get_view().clone())
426    }
427
428    pub fn get_table_stats(&self) -> Option<ViewStats> {
429        self.borrow().stats.clone()
430    }
431
432    pub fn get_view_config(&self) -> Ref<ViewConfig> {
433        Ref::map(self.borrow(), |x| &x.config)
434    }
435
436    /// Get all unique column values for a given column name.
437    ///
438    /// Use the `.to_csv()` method, as I suspected copying this large string
439    /// once was more efficient than copying many smaller strings, and
440    /// string copying shows up frequently when doing performance analysis.
441    ///
442    /// TODO Does not work with expressions yet.
443    ///
444    /// # Arguments
445    /// - `column` The name of the column (or expression).
446    pub async fn get_column_values(&self, column: String) -> Result<Vec<String>, ApiError> {
447        let expressions = Some(self.borrow().config.expressions.clone());
448        let config = ViewConfigUpdate {
449            group_by: Some(vec![column]),
450            columns: Some(vec![]),
451            expressions,
452            ..ViewConfigUpdate::default()
453        };
454
455        let table = self.borrow().table.clone().unwrap();
456        let view = table.view(Some(config.clone())).await?;
457        let csv = view.to_csv(ViewWindow::default()).await?;
458
459        ApiFuture::spawn(async move {
460            view.delete().await?;
461            Ok(())
462        });
463
464        let res = csv
465            .lines()
466            .map(|val| {
467                if val.starts_with('\"') && val.ends_with('\"') {
468                    (val[1..val.len() - 1]).to_owned()
469                } else {
470                    val.to_owned()
471                }
472            })
473            .skip(2)
474            .collect::<Vec<String>>();
475        Ok(res)
476    }
477
478    pub fn set_update_column_defaults(
479        &self,
480        config_update: &mut ViewConfigUpdate,
481        requirements: &ViewConfigRequirements,
482    ) {
483        use self::column_defaults_update::*;
484        config_update.set_update_column_defaults(
485            &self.metadata(),
486            &self.borrow().config.columns,
487            requirements,
488        )
489    }
490
491    /// Update the config, setting the `columns` property to the plugin defaults
492    /// if provided.
493    pub fn update_view_config(&self, config_update: ViewConfigUpdate) -> ApiResult<()> {
494        if let Some(x) = self.borrow().error.as_ref() {
495            tracing::warn!("Errored state");
496
497            // Load bearing return
498            return Err(ApiError::new(
499                x.0.clone().unwrap_or_else(|| "Unknown error".to_string()),
500            ));
501        }
502
503        if self.borrow_mut().config.apply_update(config_update) {
504            self.0.borrow_mut().is_clean = false;
505            self.view_config_changed.emit(());
506        }
507
508        Ok(())
509    }
510
511    pub fn reset_stats(&self) {
512        self.update_stats(ViewStats::default());
513    }
514
515    #[cfg(test)]
516    pub fn set_stats(&self, stats: ViewStats) {
517        self.update_stats(stats)
518    }
519
520    /// In order to create a new view in this session, the session must first be
521    /// validated to create a `ValidSession<'_>` guard.
522    pub async fn validate(&self) -> Result<ValidSession<'_>, JsValue> {
523        let old = self.borrow_mut().old_config.take();
524        let is_diff = match old.as_ref() {
525            Some(old) => !old.is_equivalent(&self.borrow().config),
526            None => true,
527        };
528
529        if let Err(err) = self.validate_view_config().await {
530            self.borrow_mut().error = Some(TableErrorState(Some(err.to_string()), None));
531            web_sys::console::error_2(&"Failed to apply config:".into(), &err.clone().into());
532            if let Some(config) = old {
533                self.borrow_mut().config = config;
534            } else {
535                self.reset(true).await?;
536            }
537
538            return Err(err)?;
539        } else {
540            let old_config = Some(self.borrow().config.clone());
541            self.borrow_mut().old_config = old_config;
542        }
543
544        Ok(ValidSession(self, is_diff))
545    }
546
547    async fn flat_view(&self, flat: bool) -> ApiResult<View> {
548        if flat {
549            let table = self.borrow().table.clone().into_apierror()?;
550            Ok(table.view(None).await?)
551        } else {
552            self.borrow()
553                .view_sub
554                .as_ref()
555                .map(|x| x.get_view().clone())
556                .into_apierror()
557        }
558    }
559
560    fn update_stats(&self, stats: ViewStats) {
561        self.borrow_mut().stats = Some(stats.clone());
562        self.stats_changed.emit(Some(stats));
563    }
564
565    async fn validate_view_config(&self) -> ApiResult<()> {
566        let config = self.borrow().config.clone();
567        let table_columns = self
568            .metadata()
569            .get_table_columns()
570            .into_iter()
571            .flatten()
572            .cloned()
573            .collect::<Vec<String>>();
574
575        let all_columns: HashSet<String> = table_columns.iter().cloned().collect();
576
577        let mut view_columns: HashSet<&str> = HashSet::new();
578
579        let table = self
580            .borrow()
581            .table
582            .as_ref()
583            .ok_or("Trying to draw the viewer with no table attached")?
584            .clone();
585
586        let valid_recs = table.validate_expressions(config.expressions).await?;
587        let expression_names = self.metadata_mut().update_expressions(&valid_recs)?;
588
589        // re-fetch config after `await`; `expressions` and `all_columns` are ok,
590        // but `config` may have changed as it is unlocked.
591        let mut config = self.borrow().config.clone();
592
593        if config.columns.is_empty() {
594            config.columns = table_columns.into_iter().map(Some).collect();
595        }
596
597        for column in config.columns.iter().flatten() {
598            if all_columns.contains(column) || expression_names.contains(column) {
599                let _existed = view_columns.insert(column);
600            } else {
601                return Err(format!("Unknown \"{}\" in `columns`", column).into());
602            }
603        }
604
605        for column in config.group_by.iter() {
606            if all_columns.contains(column) || expression_names.contains(column) {
607                let _existed = view_columns.insert(column);
608            } else {
609                return Err(format!("Unknown \"{}\" in `group_by`", column).into());
610            }
611        }
612
613        for column in config.split_by.iter() {
614            if all_columns.contains(column) || expression_names.contains(column) {
615                let _existed = view_columns.insert(column);
616            } else {
617                return Err(format!("Unknown \"{}\" in `split_by`", column).into());
618            }
619        }
620
621        for sort in config.sort.iter() {
622            if all_columns.contains(&sort.0) || expression_names.contains(&sort.0) {
623                let _existed = view_columns.insert(&sort.0);
624            } else {
625                return Err(format!("Unknown \"{}\" in `sort`", sort.0).into());
626            }
627        }
628
629        for filter in config.filter.iter() {
630            // TODO check filter op
631            if all_columns.contains(filter.column()) || expression_names.contains(filter.column()) {
632                let _existed = view_columns.insert(filter.column());
633            } else {
634                return Err(format!("Unknown \"{}\" in `filter`", filter.column()).into());
635            }
636        }
637
638        config
639            .aggregates
640            .retain(|column, _| view_columns.contains(column.as_str()));
641
642        self.borrow_mut().config = config;
643        Ok(())
644    }
645
646    fn reset_clean(&self) -> bool {
647        let mut is_clean = true;
648        std::mem::swap(&mut is_clean, &mut self.0.borrow_mut().is_clean);
649        is_clean
650    }
651}
652
653/// A newtype wrapper which only provides `create_view()`
654pub struct ValidSession<'a>(&'a Session, bool);
655
656impl<'a> ValidSession<'a> {
657    /// Set a new `View` (derived from this `Session`'s `Table`), and create the
658    /// `update()` subscription, consuming this `ValidSession<'_>` and returning
659    /// the original `&Session`.
660    pub async fn create_view(&self) -> Result<&'a Session, ApiError> {
661        if !self.0.reset_clean() && !self.0.borrow().is_paused {
662            if !self.1 {
663                let config = self.0.borrow().config.clone();
664                if let Some(sub) = &mut self.0.borrow_mut().view_sub.as_mut() {
665                    sub.update_view_config(Rc::new(config));
666                    return Ok(self.0);
667                }
668            }
669
670            let table = self
671                .0
672                .borrow()
673                .table
674                .clone()
675                .ok_or("`restore()` called before `load()`")?;
676
677            let view_config = self.0.borrow().config.clone();
678            let view = table.view(Some(view_config.into())).await?;
679            let view_schema = view.schema().await?;
680            self.0.metadata_mut().update_view_schema(&view_schema)?;
681            let on_stats = Callback::from({
682                let this = self.0.clone();
683                move |stats| this.update_stats(stats)
684            });
685
686            let sub = {
687                let config = self.0.borrow().config.clone();
688                let on_update = self.0.table_updated.callback();
689                ViewSubscription::new(view, config, on_stats, on_update)
690            };
691
692            let view = self.0.borrow_mut().view_sub.take();
693            ApiFuture::spawn(view.delete());
694            self.0.borrow_mut().view_sub = Some(sub);
695        }
696
697        Ok(self.0)
698    }
699}
700
701impl Drop for ValidSession<'_> {
702    /// `ValidSession` is a guard for listeners of the `view_created` pubsub
703    /// event.
704    fn drop(&mut self) {
705        self.0.view_created.emit(());
706    }
707}