Skip to main content

perspective_viewer/
session.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13pub(crate) mod column_defaults_update;
14pub(crate) mod drag_drop_update;
15mod metadata;
16mod props;
17pub(crate) mod replace_expression_update;
18mod view_subscription;
19
20use std::cell::{Ref, RefCell};
21use std::collections::HashSet;
22use std::future::Future;
23use std::ops::Deref;
24use std::rc::Rc;
25
26use perspective_client::config::*;
27use perspective_client::{Client, ClientError, ReconnectCallback, View, ViewWindow};
28use perspective_js::apierror;
29use perspective_js::utils::*;
30use wasm_bindgen::prelude::*;
31use yew::html::ImplicitClone;
32use yew::prelude::*;
33
34use self::metadata::*;
35pub use self::metadata::{MetadataRef, SessionMetadata, SessionMetadataRc};
36pub use self::props::{SessionProps, TableLoadState};
37pub use self::view_subscription::ViewStats;
38use self::view_subscription::*;
39use crate::js::plugin::*;
40use crate::utils::*;
41
42/// Immutable state for `Session`.
43#[derive(Default)]
44pub struct SessionHandle {
45    session_data: RefCell<SessionData>,
46    pub table_updated: PubSub<()>,
47    pub table_loaded: PubSub<()>,
48    pub table_unloaded: PubSub<bool>,
49    pub view_created: PubSub<()>,
50    pub view_config_changed: PubSub<()>,
51    pub title_changed: PubSub<Option<String>>,
52
53    /// Injected callback from the root component, replacing the former
54    /// `stats_changed: PubSub` field.  Fires when view stats are updated.
55    pub on_stats_changed: RefCell<Option<Callback<()>>>,
56
57    /// Injected callback from the root component, replacing the former
58    /// `table_errored: PubSub` field.  Fires when an error is set on the
59    /// session (table load failure, client disconnect, invalid config, etc.).
60    pub on_table_errored: RefCell<Option<Callback<()>>>,
61}
62
63impl Deref for SessionHandle {
64    type Target = RefCell<SessionData>;
65
66    fn deref(&self) -> &Self::Target {
67        &self.session_data
68    }
69}
70
71/// Mutable state for `Session`.
72#[derive(Default)]
73pub struct SessionData {
74    client: Option<perspective_client::Client>,
75    table: Option<perspective_client::Table>,
76    metadata: SessionMetadata,
77    old_config: Option<ViewConfig>,
78    config: ViewConfig,
79    view_sub: Option<ViewSubscription>,
80    stats: Option<ViewStats>,
81    is_loading: bool,
82    is_clean: bool,
83    is_paused: bool,
84    error: Option<TableErrorState>,
85    title: Option<String>,
86}
87
88#[derive(Clone)]
89pub struct TableErrorState(ApiError, Option<ReconnectCallback>);
90
91impl PartialEq for TableErrorState {
92    fn eq(&self, other: &Self) -> bool {
93        self.0.to_string() == other.0.to_string()
94    }
95}
96
97impl std::fmt::Debug for TableErrorState {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_tuple("TableErrorState")
100            .field(&self.0.to_string())
101            .finish()
102    }
103}
104
105impl TableErrorState {
106    pub fn message(&self) -> String {
107        self.0.message()
108    }
109
110    pub fn stacktrace(&self) -> String {
111        self.0.stacktrace()
112    }
113
114    pub fn kind(&self) -> &'static str {
115        self.0.kind()
116    }
117
118    pub fn is_reconnect(&self) -> bool {
119        self.1.is_some()
120    }
121}
122
123#[derive(Debug, Default)]
124pub enum TableIntermediateState {
125    #[default]
126    Ejected,
127    Reloaded,
128}
129
130/// Options for [`Session::reset`]
131#[derive(Default)]
132pub struct ResetOptions {
133    /// Reset user defined expressions
134    pub expressions: bool,
135
136    /// Reset the [`Table`]
137    pub table: Option<TableIntermediateState>,
138
139    /// Reset the [`ViewConfig`]
140    pub config: bool,
141
142    /// Manually reset the [`ViewStats`]
143    pub stats: bool,
144}
145
146/// The `Session` struct is the principal interface to the Perspective engine,
147/// the `Table` and `View` objects for this viewer, and all associated state
148/// including the `ViewConfig`.
149#[derive(Clone)]
150pub struct Session(Rc<SessionHandle>);
151
152impl ImplicitClone for Session {}
153
154impl Deref for Session {
155    type Target = SessionHandle;
156
157    fn deref(&self) -> &Self::Target {
158        &self.0
159    }
160}
161
162impl PartialEq for Session {
163    fn eq(&self, other: &Self) -> bool {
164        Rc::ptr_eq(&self.0, &other.0)
165    }
166}
167
168impl Session {
169    /// Uses [`Self::new`] instead of [`Default`] to prevent accidental
170    /// instantiation in props/etc.
171    #[allow(clippy::new_without_default)]
172    pub fn new() -> Self {
173        Self(Rc::default())
174    }
175
176    pub(crate) fn metadata(&self) -> MetadataRef<'_> {
177        std::cell::Ref::map(self.borrow(), |x| &x.metadata)
178    }
179
180    pub(crate) fn metadata_mut(&self) -> MetadataMutRef<'_> {
181        std::cell::RefMut::map(self.borrow_mut(), |x| &mut x.metadata)
182    }
183
184    pub(crate) fn get_title(&self) -> Option<String> {
185        self.borrow().title.clone()
186    }
187
188    pub fn set_title(&self, title: Option<String>) {
189        let new_title = title.filter(|x| !x.is_empty());
190        self.borrow_mut().title.clone_from(&new_title);
191        self.title_changed.emit(new_title);
192    }
193
194    /// Reset this (presumably shared) `Session` to its initial state, returning
195    /// a bool indicating whether this `Session` had a table which was
196    /// deleted. TODO Table should be an immutable constructor parameter to
197    /// `Session`.
198    pub fn reset(&self, options: ResetOptions) -> impl Future<Output = ApiResult<()>> + use<> {
199        self.borrow_mut().is_clean = false;
200        let view = self.0.borrow_mut().view_sub.take();
201        let err = self.get_error();
202        self.borrow_mut().error = None;
203        if options.stats {
204            self.update_stats(ViewStats::default());
205        }
206
207        if options.config {
208            self.borrow_mut().config.reset(options.expressions);
209        }
210
211        match options.table {
212            Some(TableIntermediateState::Ejected) => {
213                self.borrow_mut().is_loading = false;
214                self.borrow_mut().table = None;
215                self.borrow_mut().metadata = SessionMetadata::default();
216            },
217            Some(TableIntermediateState::Reloaded) => {
218                self.borrow_mut().is_loading = true;
219                self.borrow_mut().table = None;
220                self.borrow_mut().metadata = SessionMetadata::default();
221            },
222            _ => {
223                self.borrow_mut().is_loading = false;
224            },
225        };
226
227        let table_unloaded = self.table_unloaded.callback();
228        self.borrow_mut().is_clean = false;
229        async move {
230            let res = view.delete().await;
231            if options.table.is_some() {
232                table_unloaded.emit(true)
233            }
234
235            if let Some(err) = err { Err(err) } else { res }
236        }
237    }
238
239    pub(crate) fn has_table(&self) -> Option<TableLoadState> {
240        let data = self.borrow();
241        if data.table.is_some() {
242            Some(TableLoadState::Loaded)
243        } else if data.is_loading {
244            Some(TableLoadState::Loading)
245        } else {
246            None
247        }
248    }
249
250    pub fn get_table(&self) -> Option<perspective_client::Table> {
251        self.borrow().table.clone()
252    }
253
254    pub fn set_client(&self, client: Client) -> bool {
255        if Some(&client) != self.borrow().client.as_ref() {
256            self.borrow_mut().client = Some(client);
257            self.borrow_mut().table = None;
258            true
259        } else {
260            false
261        }
262    }
263
264    pub fn get_client(&self) -> Option<Client> {
265        self.borrow().client.clone()
266    }
267
268    /// Reset this `Session`'s state with a new `Table`.  Implicitly clears the
269    /// `ViewSubscription`, which will need to be re-initialized later via
270    /// `create_view()`.
271    ///
272    /// # Arguments
273    ///
274    /// - `table_name` The name of the `Table` to load, which must exist on the
275    ///   loaded `Client`.
276    ///
277    /// # Returns
278    ///
279    /// `table_name` is unique per `Client`, so if this value has not changed,
280    /// `Session::set_table` does nothing and returns `Ok(false)`.
281    pub async fn set_table(&self, table_name: String) -> ApiResult<bool> {
282        if Some(table_name.as_str()) == self.0.borrow().table.as_ref().map(|x| x.get_name()) {
283            return Ok(false);
284        }
285
286        let client = self.0.borrow().client.clone().into_apierror()?;
287        let table = client.open_table(table_name.clone()).await?;
288        match SessionMetadata::from_table(&table).await {
289            Ok(metadata) => {
290                let client = table.get_client();
291                let on_error = self.on_table_errored.borrow().clone();
292                let session = self.clone();
293                let poll_loop = LocalPollLoop::new(move |(message, reconnect): (ApiError, _)| {
294                    session.borrow_mut().error = Some(TableErrorState(message, reconnect));
295                    if let Some(cb) = &on_error {
296                        cb.emit(());
297                    }
298                    if let Some(sub) = session.borrow_mut().view_sub.take() {
299                        sub.dismiss();
300                    }
301
302                    Ok(JsValue::UNDEFINED)
303                });
304
305                let _callback_id = client
306                    .on_error(Box::new(move |message: ClientError, reconnect| {
307                        let poll_loop = poll_loop.clone();
308                        async move {
309                            poll_loop.poll((message.into(), reconnect)).await;
310                            Ok(())
311                        }
312                    }))
313                    .await?;
314
315                let sub = self.borrow_mut().view_sub.take();
316                self.borrow_mut().metadata = metadata;
317                self.borrow_mut().table = Some(table);
318                self.borrow_mut().is_loading = false;
319                self.borrow_mut().is_clean = false;
320                sub.delete().await?;
321                self.table_loaded.emit(());
322                Ok(true)
323            },
324            Err(err) => self.set_error(false, err).await.map(|_| false),
325        }
326    }
327
328    pub fn update_column_defaults(&self, requirements: &ViewConfigRequirements) {
329        if self.borrow().config.columns.is_empty() {
330            let mut update = ViewConfigUpdate::default();
331            self.set_update_column_defaults(&mut update, requirements);
332            self.borrow_mut().config.apply_update(update);
333        }
334    }
335
336    pub async fn set_error(&self, reset_table: bool, err: ApiError) -> ApiResult<()> {
337        let session = self.clone();
338        let poll_loop = LocalPollLoop::new(move |()| {
339            ApiFuture::spawn(session.reset(ResetOptions {
340                config: true,
341                expressions: true,
342                ..ResetOptions::default()
343            }));
344            Ok(JsValue::UNDEFINED)
345        });
346
347        self.borrow_mut().error = Some(TableErrorState(
348            err.clone(),
349            Some(ReconnectCallback::new(move || {
350                clone!(poll_loop);
351                Box::pin(async move {
352                    poll_loop.poll(()).await;
353                    Ok(())
354                })
355            })),
356        ));
357
358        if let Some(cb) = self.on_table_errored.borrow().as_ref() {
359            cb.emit(());
360        }
361
362        let sub = self.borrow_mut().view_sub.take();
363        if reset_table {
364            self.borrow_mut().metadata = SessionMetadata::default();
365            self.borrow_mut().table = None;
366        }
367
368        sub.delete().await?;
369        Err(err)
370    }
371
372    pub fn set_pause(&self, pause: bool) -> bool {
373        self.borrow_mut().is_clean = false;
374        if pause == self.borrow().is_paused {
375            false
376        } else if pause {
377            ApiFuture::spawn(self.borrow_mut().view_sub.take().delete());
378            self.borrow_mut().is_paused = true;
379            true
380        } else {
381            self.borrow_mut().is_paused = false;
382            true
383        }
384    }
385
386    pub async fn await_table(&self) -> ApiResult<()> {
387        if self.js_get_table().is_none() {
388            self.table_loaded.read_next().await?;
389            let _ = self.js_get_table().ok_or("No table set")?;
390        }
391
392        Ok(())
393    }
394
395    pub fn js_get_table(&self) -> Option<JsValue> {
396        Some(perspective_js::Table::from(self.borrow().table.clone()?).into())
397    }
398
399    pub(crate) fn is_errored(&self) -> bool {
400        self.borrow().error.is_some()
401    }
402
403    pub(crate) fn get_error(&self) -> Option<ApiError> {
404        self.borrow().error.as_ref().map(|x| x.0.clone())
405    }
406
407    pub async fn reconnect(&self) -> ApiResult<()> {
408        let err = self.borrow().error.clone();
409        if let Some(TableErrorState(_, Some(reconnect))) = err {
410            reconnect().await?;
411            self.borrow_mut().is_loading = false;
412            self.borrow_mut().error = None;
413            self.borrow_mut().is_clean = false;
414            self.borrow_mut().view_sub = None;
415            self.table_loaded.emit(());
416        }
417
418        Ok(())
419    }
420
421    /// Validate an expression string and marshall the results.
422    pub async fn validate_expr(
423        &self,
424        expr: &str,
425    ) -> Result<Option<perspective_client::ExprValidationError>, ApiError> {
426        let table = self.borrow().table.as_ref().unwrap().clone();
427        let errors = table
428            .validate_expressions(
429                ExpressionsDeserde::Map(std::collections::HashMap::from_iter([(
430                    "_".to_string(),
431                    expr.to_string(),
432                )]))
433                .into(),
434            )
435            .await?
436            .errors;
437
438        Ok(errors.get("_").cloned())
439    }
440
441    pub async fn arrow_as_vec(
442        &self,
443        flat: bool,
444        window: Option<ViewWindow>,
445    ) -> Result<Vec<u8>, ApiError> {
446        Ok(self
447            .flat_view(flat)
448            .await?
449            .to_arrow(window.unwrap_or_default())
450            .await?
451            .into())
452    }
453
454    pub async fn arrow_as_jsvalue(
455        &self,
456        flat: bool,
457        window: Option<ViewWindow>,
458    ) -> Result<js_sys::ArrayBuffer, ApiError> {
459        let arrow = self
460            .flat_view(flat)
461            .await?
462            .to_arrow(window.unwrap_or_default())
463            .await?;
464        Ok(js_sys::Uint8Array::from(&arrow[..])
465            .buffer()
466            .unchecked_into())
467    }
468
469    pub async fn ndjson_as_jsvalue(
470        &self,
471        flat: bool,
472        window: Option<ViewWindow>,
473    ) -> Result<js_sys::JsString, ApiError> {
474        let json: String = self
475            .flat_view(flat)
476            .await?
477            .to_ndjson(window.unwrap_or_default())
478            .await?;
479
480        Ok(json.into())
481    }
482
483    pub async fn json_as_jsvalue(
484        &self,
485        flat: bool,
486        window: Option<ViewWindow>,
487    ) -> Result<js_sys::Object, ApiError> {
488        let json: String = self
489            .flat_view(flat)
490            .await?
491            .to_columns_string(window.unwrap_or_default())
492            .await?;
493
494        Ok(js_sys::JSON::parse(&json)?.unchecked_into())
495    }
496
497    pub async fn csv_as_jsvalue(
498        &self,
499        flat: bool,
500        window: Option<ViewWindow>,
501    ) -> Result<js_sys::JsString, ApiError> {
502        let window = window.unwrap_or_default();
503        let csv = self.flat_view(flat).await?.to_csv(window).await;
504        Ok(csv.map(js_sys::JsString::from)?)
505    }
506
507    pub fn get_view(&self) -> Option<View> {
508        self.borrow()
509            .view_sub
510            .as_ref()
511            .map(|sub| sub.get_view().clone())
512    }
513
514    pub(crate) fn get_table_stats(&self) -> Option<ViewStats> {
515        self.borrow().stats.clone()
516    }
517
518    pub fn get_view_config(&'_ self) -> Ref<'_, ViewConfig> {
519        Ref::map(self.borrow(), |x| &x.config)
520    }
521
522    /// Get all unique column values for a given column name.
523    ///
524    /// Use the `.to_csv()` method, as I suspected copying this large string
525    /// once was more efficient than copying many smaller strings, and
526    /// string copying shows up frequently when doing performance analysis.
527    ///
528    /// TODO Does not work with expressions yet.
529    ///
530    /// # Arguments
531    /// - `column` The name of the column (or expression).
532    pub async fn get_column_values(&self, column: String) -> Result<Vec<String>, ApiError> {
533        let expressions = Some(self.borrow().config.expressions.clone());
534        let config = ViewConfigUpdate {
535            group_by: Some(vec![column]),
536            columns: Some(vec![]),
537            expressions,
538            ..ViewConfigUpdate::default()
539        };
540
541        let table = self.borrow().table.clone().unwrap();
542        let view = table.view(Some(config.clone())).await?;
543        let csv = view.to_csv(ViewWindow::default()).await?;
544
545        ApiFuture::spawn(async move {
546            view.delete().await?;
547            Ok(())
548        });
549
550        let res = csv
551            .lines()
552            .map(|val| {
553                if val.starts_with('\"') && val.ends_with('\"') {
554                    (val[1..val.len() - 1]).to_owned()
555                } else {
556                    val.to_owned()
557                }
558            })
559            .skip(2)
560            .collect::<Vec<String>>();
561        Ok(res)
562    }
563
564    pub fn set_update_column_defaults(
565        &self,
566        config_update: &mut ViewConfigUpdate,
567        requirements: &ViewConfigRequirements,
568    ) {
569        use self::column_defaults_update::*;
570        config_update.set_update_column_defaults(
571            &self.metadata(),
572            &self.get_view_config().columns,
573            requirements,
574        )
575    }
576
577    /// Update the config, setting the `columns` property to the plugin defaults
578    /// if provided.
579    pub fn update_view_config(&self, config_update: ViewConfigUpdate) -> ApiResult<()> {
580        if let Some(x) = self.borrow().error.as_ref() {
581            tracing::warn!("Errored state");
582
583            // Load bearing return
584            return Err(ApiError::new(x.0.clone()));
585        }
586
587        if self.borrow_mut().config.apply_update(config_update) && self.0.borrow().is_clean {
588            self.0.borrow_mut().is_clean = false;
589            self.view_config_changed.emit(());
590        }
591
592        Ok(())
593    }
594
595    /// In order to create a new view in this session, the session must first be
596    /// validated to create a `ValidSession<'_>` guard.
597    pub async fn validate(&self) -> Result<ValidSession<'_>, ApiError> {
598        let old = self.borrow_mut().old_config.take();
599        let is_diff = match old.as_ref() {
600            Some(old) => !old.is_equivalent(&self.borrow().config),
601            None => true,
602        };
603
604        if let Err(err) = self.validate_view_config().await {
605            let session = self.clone();
606            let poll_loop = LocalPollLoop::new(move |()| {
607                ApiFuture::spawn(session.reset(ResetOptions {
608                    config: true,
609                    expressions: true,
610                    ..ResetOptions::default()
611                }));
612                Ok(JsValue::UNDEFINED)
613            });
614
615            self.borrow_mut().error = Some(TableErrorState(
616                err.clone(),
617                Some(ReconnectCallback::new(move || {
618                    clone!(poll_loop);
619                    Box::pin(async move {
620                        poll_loop.poll(()).await;
621                        Ok(())
622                    })
623                })),
624            ));
625
626            if let Some(config) = old {
627                self.borrow_mut().config = config;
628            } else {
629                self.reset(ResetOptions {
630                    config: true,
631                    expressions: true,
632                    ..ResetOptions::default()
633                })
634                .await?;
635            }
636
637            return Err(err);
638        } else {
639            let old_config = Some(self.borrow().config.clone());
640            self.borrow_mut().old_config = old_config;
641        }
642
643        Ok(ValidSession(self, is_diff))
644    }
645
646    async fn flat_view(&self, flat: bool) -> ApiResult<View> {
647        if flat {
648            let table = self.borrow().table.clone().into_apierror()?;
649            Ok(table.view(None).await?)
650        } else {
651            self.borrow()
652                .view_sub
653                .as_ref()
654                .map(|x| x.get_view().clone())
655                .into_apierror()
656        }
657    }
658
659    fn update_stats(&self, stats: ViewStats) {
660        self.borrow_mut().stats = Some(stats);
661        if let Some(cb) = self.on_stats_changed.borrow().as_ref() {
662            cb.emit(());
663        }
664    }
665
666    fn all_columns(&self) -> Vec<String> {
667        self.metadata()
668            .get_table_columns()
669            .into_iter()
670            .flatten()
671            .cloned()
672            .collect()
673    }
674
675    async fn validate_view_config(&self) -> ApiResult<()> {
676        let mut config = self.borrow().config.clone();
677        let table_columns = self.all_columns();
678        let all_columns: HashSet<String> = table_columns.iter().cloned().collect();
679        let mut view_columns: HashSet<&str> = HashSet::new();
680        let table = self
681            .borrow()
682            .table
683            .as_ref()
684            .ok_or_else(|| apierror!(NoTableError))?
685            .clone();
686
687        let expression_names = if self.metadata().get_features().unwrap().expressions {
688            let valid_recs = table
689                .validate_expressions(config.expressions.clone())
690                .await?;
691
692            self.metadata_mut().update_expressions(&valid_recs)?
693        } else {
694            HashSet::default()
695        };
696
697        if config.columns.is_empty() {
698            config.columns = table_columns.into_iter().map(Some).collect();
699        }
700
701        for column in config.columns.iter().flatten() {
702            if all_columns.contains(column) || expression_names.contains(column) {
703                let _existed = view_columns.insert(column);
704            } else {
705                return Err(apierror!(InvalidViewerConfigError(
706                    "columns",
707                    column.to_owned()
708                )));
709            }
710        }
711
712        for column in config.group_by.iter() {
713            if all_columns.contains(column) || expression_names.contains(column) {
714                let _existed = view_columns.insert(column);
715            } else {
716                return Err(apierror!(InvalidViewerConfigError(
717                    "group_by",
718                    column.to_owned(),
719                )));
720            }
721        }
722
723        for column in config.split_by.iter() {
724            if all_columns.contains(column) || expression_names.contains(column) {
725                let _existed = view_columns.insert(column);
726            } else {
727                return Err(apierror!(InvalidViewerConfigError(
728                    "split_by",
729                    column.to_owned(),
730                )));
731            }
732        }
733
734        for sort in config.sort.iter() {
735            if all_columns.contains(&sort.0) || expression_names.contains(&sort.0) {
736                let _existed = view_columns.insert(&sort.0);
737            } else {
738                return Err(apierror!(InvalidViewerConfigError(
739                    "sort",
740                    sort.0.to_owned(),
741                )));
742            }
743        }
744
745        for filter in config.filter.iter() {
746            // TODO check filter op
747            if all_columns.contains(filter.column()) || expression_names.contains(filter.column()) {
748                let _existed = view_columns.insert(filter.column());
749            } else {
750                return Err(apierror!(InvalidViewerConfigError(
751                    "filter",
752                    filter.column().to_owned(),
753                )));
754            }
755        }
756
757        config
758            .aggregates
759            .retain(|column, _| view_columns.contains(column.as_str()));
760
761        self.borrow_mut().config = config;
762        Ok(())
763    }
764
765    fn reset_clean(&self) -> bool {
766        let mut is_clean = true;
767        std::mem::swap(&mut is_clean, &mut self.0.borrow_mut().is_clean);
768        is_clean
769    }
770
771    /// Snapshot the current session state as a [`SessionProps`] value suitable
772    /// for passing as a Yew prop.  Called by the root component whenever a
773    /// session-related PubSub event fires.
774    pub fn to_props(&self) -> SessionProps {
775        let data = self.borrow();
776        SessionProps {
777            config: PtrEqRc::new(data.config.clone()),
778            stats: data.stats.clone(),
779            has_table: if data.table.is_some() {
780                Some(TableLoadState::Loaded)
781            } else if data.is_loading {
782                Some(TableLoadState::Loading)
783            } else {
784                None
785            },
786            error: data.error.clone(),
787            title: data.title.clone(),
788            metadata: PtrEqRc::new(data.metadata.clone()),
789        }
790    }
791}
792
793/// A newtype wrapper which only provides `create_view()`
794pub struct ValidSession<'a>(&'a Session, bool);
795
796impl ValidSession<'_> {
797    /// Set a new `View` (derived from this `Session`'s `Table`), and create the
798    /// `update()` subscription, consuming this `ValidSession<'_>` and returning
799    /// the original `&Session`.
800    pub async fn create_view(&self) -> Result<Option<View>, ApiError> {
801        if !self.0.reset_clean() && !self.0.borrow().is_paused {
802            if !self.1 {
803                let config = self.0.borrow().config.clone();
804                if let Some(sub) = &mut self.0.borrow_mut().view_sub.as_mut() {
805                    sub.update_view_config(Rc::new(config));
806                    return Ok(Some(sub.get_view().clone()));
807                }
808            }
809
810            let table = self
811                .0
812                .borrow()
813                .table
814                .clone()
815                .ok_or("`restore()` called before `load()`")?;
816
817            let mut view_config = self.0.borrow().config.clone();
818
819            // Populate the aggreagtes with defaults as a courtesy to the
820            // virtual server api.
821            for col in view_config
822                .columns
823                .iter()
824                .flatten()
825                .chain(view_config.sort.iter().map(|x| &x.0))
826            {
827                if !view_config.aggregates.contains_key(col.as_str()) {
828                    let agg = self
829                        .0
830                        .metadata()
831                        .get_column_aggregates(col.as_str())
832                        .and_then(|mut aggs| aggs.next())
833                        .into_apierror();
834
835                    match agg {
836                        Err(_) => tracing::warn!(
837                            "No default aggregate for column '{}' found, skipping",
838                            col
839                        ),
840                        Ok(agg) => _ = view_config.aggregates.insert(col.to_string(), agg),
841                    };
842                }
843            }
844
845            let view = table.view(Some(view_config.into())).await?;
846            let view_schema = view.schema().await?;
847            self.0.metadata_mut().update_view_schema(&view_schema)?;
848            let on_stats = Callback::from({
849                let this = self.0.clone();
850                move |stats| this.update_stats(stats)
851            });
852
853            let sub = {
854                let config = self.0.borrow().config.clone();
855                let on_update = self
856                    .0
857                    .metadata()
858                    .get_features()
859                    .unwrap()
860                    .on_update
861                    .then(|| self.0.table_updated.callback());
862
863                ViewSubscription::new(view, config, on_stats, on_update).await?
864            };
865
866            let view = self.0.borrow_mut().view_sub.take();
867            ApiFuture::spawn(view.delete());
868            self.0.borrow_mut().view_sub = Some(sub);
869        }
870
871        Ok(self.0.get_view())
872    }
873}
874
875impl Drop for ValidSession<'_> {
876    /// `ValidSession` is a guard for listeners of the `view_created` pubsub
877    /// event.
878    fn drop(&mut self) {
879        self.0.view_created.emit(());
880    }
881}