1pub(crate) mod column_defaults_update;
14pub(crate) mod drag_drop_update;
15mod metadata;
16mod props;
17pub(crate) mod replace_expression_update;
18mod view_subscription;
19
20use std::cell::{Ref, RefCell};
21use std::collections::HashSet;
22use std::future::Future;
23use std::ops::Deref;
24use std::rc::Rc;
25
26use perspective_client::config::*;
27use perspective_client::{Client, ClientError, ReconnectCallback, View, ViewWindow};
28use perspective_js::apierror;
29use perspective_js::utils::*;
30use wasm_bindgen::prelude::*;
31use yew::html::ImplicitClone;
32use yew::prelude::*;
33
34use self::metadata::*;
35pub use self::metadata::{MetadataRef, SessionMetadata, SessionMetadataRc};
36pub use self::props::{SessionProps, TableLoadState};
37pub use self::view_subscription::ViewStats;
38use self::view_subscription::*;
39use crate::js::plugin::*;
40use crate::utils::*;
41
42#[derive(Default)]
44pub struct SessionHandle {
45 session_data: RefCell<SessionData>,
46 pub table_updated: PubSub<()>,
47 pub table_loaded: PubSub<()>,
48 pub table_unloaded: PubSub<bool>,
49 pub view_created: PubSub<()>,
50 pub view_config_changed: PubSub<()>,
51 pub title_changed: PubSub<Option<String>>,
52
53 pub on_stats_changed: RefCell<Option<Callback<()>>>,
56
57 pub on_table_errored: RefCell<Option<Callback<()>>>,
61}
62
63impl Deref for SessionHandle {
64 type Target = RefCell<SessionData>;
65
66 fn deref(&self) -> &Self::Target {
67 &self.session_data
68 }
69}
70
71#[derive(Default)]
73pub struct SessionData {
74 client: Option<perspective_client::Client>,
75 table: Option<perspective_client::Table>,
76 metadata: SessionMetadata,
77 old_config: Option<ViewConfig>,
78 config: ViewConfig,
79 view_sub: Option<ViewSubscription>,
80 stats: Option<ViewStats>,
81 is_loading: bool,
82 is_clean: bool,
83 is_paused: bool,
84 error: Option<TableErrorState>,
85 title: Option<String>,
86}
87
88#[derive(Clone)]
89pub struct TableErrorState(ApiError, Option<ReconnectCallback>);
90
91impl PartialEq for TableErrorState {
92 fn eq(&self, other: &Self) -> bool {
93 self.0.to_string() == other.0.to_string()
94 }
95}
96
97impl std::fmt::Debug for TableErrorState {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_tuple("TableErrorState")
100 .field(&self.0.to_string())
101 .finish()
102 }
103}
104
105impl TableErrorState {
106 pub fn message(&self) -> String {
107 self.0.message()
108 }
109
110 pub fn stacktrace(&self) -> String {
111 self.0.stacktrace()
112 }
113
114 pub fn kind(&self) -> &'static str {
115 self.0.kind()
116 }
117
118 pub fn is_reconnect(&self) -> bool {
119 self.1.is_some()
120 }
121}
122
123#[derive(Debug, Default)]
124pub enum TableIntermediateState {
125 #[default]
126 Ejected,
127 Reloaded,
128}
129
130#[derive(Default)]
132pub struct ResetOptions {
133 pub expressions: bool,
135
136 pub table: Option<TableIntermediateState>,
138
139 pub config: bool,
141
142 pub stats: bool,
144}
145
146#[derive(Clone)]
150pub struct Session(Rc<SessionHandle>);
151
152impl ImplicitClone for Session {}
153
154impl Deref for Session {
155 type Target = SessionHandle;
156
157 fn deref(&self) -> &Self::Target {
158 &self.0
159 }
160}
161
162impl PartialEq for Session {
163 fn eq(&self, other: &Self) -> bool {
164 Rc::ptr_eq(&self.0, &other.0)
165 }
166}
167
168impl Session {
169 #[allow(clippy::new_without_default)]
172 pub fn new() -> Self {
173 Self(Rc::default())
174 }
175
176 pub(crate) fn metadata(&self) -> MetadataRef<'_> {
177 std::cell::Ref::map(self.borrow(), |x| &x.metadata)
178 }
179
180 pub(crate) fn metadata_mut(&self) -> MetadataMutRef<'_> {
181 std::cell::RefMut::map(self.borrow_mut(), |x| &mut x.metadata)
182 }
183
184 pub(crate) fn get_title(&self) -> Option<String> {
185 self.borrow().title.clone()
186 }
187
188 pub fn set_title(&self, title: Option<String>) {
189 let new_title = title.filter(|x| !x.is_empty());
190 self.borrow_mut().title.clone_from(&new_title);
191 self.title_changed.emit(new_title);
192 }
193
194 pub fn reset(&self, options: ResetOptions) -> impl Future<Output = ApiResult<()>> + use<> {
199 self.borrow_mut().is_clean = false;
200 let view = self.0.borrow_mut().view_sub.take();
201 let err = self.get_error();
202 self.borrow_mut().error = None;
203 if options.stats {
204 self.update_stats(ViewStats::default());
205 }
206
207 if options.config {
208 self.borrow_mut().config.reset(options.expressions);
209 }
210
211 match options.table {
212 Some(TableIntermediateState::Ejected) => {
213 self.borrow_mut().is_loading = false;
214 self.borrow_mut().table = None;
215 self.borrow_mut().metadata = SessionMetadata::default();
216 },
217 Some(TableIntermediateState::Reloaded) => {
218 self.borrow_mut().is_loading = true;
219 self.borrow_mut().table = None;
220 self.borrow_mut().metadata = SessionMetadata::default();
221 },
222 _ => {
223 self.borrow_mut().is_loading = false;
224 },
225 };
226
227 let table_unloaded = self.table_unloaded.callback();
228 self.borrow_mut().is_clean = false;
229 async move {
230 let res = view.delete().await;
231 if options.table.is_some() {
232 table_unloaded.emit(true)
233 }
234
235 if let Some(err) = err { Err(err) } else { res }
236 }
237 }
238
239 pub(crate) fn has_table(&self) -> Option<TableLoadState> {
240 let data = self.borrow();
241 if data.table.is_some() {
242 Some(TableLoadState::Loaded)
243 } else if data.is_loading {
244 Some(TableLoadState::Loading)
245 } else {
246 None
247 }
248 }
249
250 pub fn get_table(&self) -> Option<perspective_client::Table> {
251 self.borrow().table.clone()
252 }
253
254 pub fn set_client(&self, client: Client) -> bool {
255 if Some(&client) != self.borrow().client.as_ref() {
256 self.borrow_mut().client = Some(client);
257 self.borrow_mut().table = None;
258 true
259 } else {
260 false
261 }
262 }
263
264 pub fn get_client(&self) -> Option<Client> {
265 self.borrow().client.clone()
266 }
267
268 pub async fn set_table(&self, table_name: String) -> ApiResult<bool> {
282 if Some(table_name.as_str()) == self.0.borrow().table.as_ref().map(|x| x.get_name()) {
283 return Ok(false);
284 }
285
286 let client = self.0.borrow().client.clone().into_apierror()?;
287 let table = client.open_table(table_name.clone()).await?;
288 match SessionMetadata::from_table(&table).await {
289 Ok(metadata) => {
290 let client = table.get_client();
291 let on_error = self.on_table_errored.borrow().clone();
292 let session = self.clone();
293 let poll_loop = LocalPollLoop::new(move |(message, reconnect): (ApiError, _)| {
294 session.borrow_mut().error = Some(TableErrorState(message, reconnect));
295 if let Some(cb) = &on_error {
296 cb.emit(());
297 }
298 if let Some(sub) = session.borrow_mut().view_sub.take() {
299 sub.dismiss();
300 }
301
302 Ok(JsValue::UNDEFINED)
303 });
304
305 let _callback_id = client
306 .on_error(Box::new(move |message: ClientError, reconnect| {
307 let poll_loop = poll_loop.clone();
308 async move {
309 poll_loop.poll((message.into(), reconnect)).await;
310 Ok(())
311 }
312 }))
313 .await?;
314
315 let sub = self.borrow_mut().view_sub.take();
316 self.borrow_mut().metadata = metadata;
317 self.borrow_mut().table = Some(table);
318 self.borrow_mut().is_loading = false;
319 self.borrow_mut().is_clean = false;
320 sub.delete().await?;
321 self.table_loaded.emit(());
322 Ok(true)
323 },
324 Err(err) => self.set_error(false, err).await.map(|_| false),
325 }
326 }
327
328 pub fn update_column_defaults(&self, requirements: &ViewConfigRequirements) {
329 if self.borrow().config.columns.is_empty() {
330 let mut update = ViewConfigUpdate::default();
331 self.set_update_column_defaults(&mut update, requirements);
332 self.borrow_mut().config.apply_update(update);
333 }
334 }
335
336 pub async fn set_error(&self, reset_table: bool, err: ApiError) -> ApiResult<()> {
337 let session = self.clone();
338 let poll_loop = LocalPollLoop::new(move |()| {
339 ApiFuture::spawn(session.reset(ResetOptions {
340 config: true,
341 expressions: true,
342 ..ResetOptions::default()
343 }));
344 Ok(JsValue::UNDEFINED)
345 });
346
347 self.borrow_mut().error = Some(TableErrorState(
348 err.clone(),
349 Some(ReconnectCallback::new(move || {
350 clone!(poll_loop);
351 Box::pin(async move {
352 poll_loop.poll(()).await;
353 Ok(())
354 })
355 })),
356 ));
357
358 if let Some(cb) = self.on_table_errored.borrow().as_ref() {
359 cb.emit(());
360 }
361
362 let sub = self.borrow_mut().view_sub.take();
363 if reset_table {
364 self.borrow_mut().metadata = SessionMetadata::default();
365 self.borrow_mut().table = None;
366 }
367
368 sub.delete().await?;
369 Err(err)
370 }
371
372 pub fn set_pause(&self, pause: bool) -> bool {
373 self.borrow_mut().is_clean = false;
374 if pause == self.borrow().is_paused {
375 false
376 } else if pause {
377 ApiFuture::spawn(self.borrow_mut().view_sub.take().delete());
378 self.borrow_mut().is_paused = true;
379 true
380 } else {
381 self.borrow_mut().is_paused = false;
382 true
383 }
384 }
385
386 pub async fn await_table(&self) -> ApiResult<()> {
387 if self.js_get_table().is_none() {
388 self.table_loaded.read_next().await?;
389 let _ = self.js_get_table().ok_or("No table set")?;
390 }
391
392 Ok(())
393 }
394
395 pub fn js_get_table(&self) -> Option<JsValue> {
396 Some(perspective_js::Table::from(self.borrow().table.clone()?).into())
397 }
398
399 pub(crate) fn is_errored(&self) -> bool {
400 self.borrow().error.is_some()
401 }
402
403 pub(crate) fn get_error(&self) -> Option<ApiError> {
404 self.borrow().error.as_ref().map(|x| x.0.clone())
405 }
406
407 pub async fn reconnect(&self) -> ApiResult<()> {
408 let err = self.borrow().error.clone();
409 if let Some(TableErrorState(_, Some(reconnect))) = err {
410 reconnect().await?;
411 self.borrow_mut().is_loading = false;
412 self.borrow_mut().error = None;
413 self.borrow_mut().is_clean = false;
414 self.borrow_mut().view_sub = None;
415 self.table_loaded.emit(());
416 }
417
418 Ok(())
419 }
420
421 pub async fn validate_expr(
423 &self,
424 expr: &str,
425 ) -> Result<Option<perspective_client::ExprValidationError>, ApiError> {
426 let table = self.borrow().table.as_ref().unwrap().clone();
427 let errors = table
428 .validate_expressions(
429 ExpressionsDeserde::Map(std::collections::HashMap::from_iter([(
430 "_".to_string(),
431 expr.to_string(),
432 )]))
433 .into(),
434 )
435 .await?
436 .errors;
437
438 Ok(errors.get("_").cloned())
439 }
440
441 pub async fn arrow_as_vec(
442 &self,
443 flat: bool,
444 window: Option<ViewWindow>,
445 ) -> Result<Vec<u8>, ApiError> {
446 Ok(self
447 .flat_view(flat)
448 .await?
449 .to_arrow(window.unwrap_or_default())
450 .await?
451 .into())
452 }
453
454 pub async fn arrow_as_jsvalue(
455 &self,
456 flat: bool,
457 window: Option<ViewWindow>,
458 ) -> Result<js_sys::ArrayBuffer, ApiError> {
459 let arrow = self
460 .flat_view(flat)
461 .await?
462 .to_arrow(window.unwrap_or_default())
463 .await?;
464 Ok(js_sys::Uint8Array::from(&arrow[..])
465 .buffer()
466 .unchecked_into())
467 }
468
469 pub async fn ndjson_as_jsvalue(
470 &self,
471 flat: bool,
472 window: Option<ViewWindow>,
473 ) -> Result<js_sys::JsString, ApiError> {
474 let json: String = self
475 .flat_view(flat)
476 .await?
477 .to_ndjson(window.unwrap_or_default())
478 .await?;
479
480 Ok(json.into())
481 }
482
483 pub async fn json_as_jsvalue(
484 &self,
485 flat: bool,
486 window: Option<ViewWindow>,
487 ) -> Result<js_sys::Object, ApiError> {
488 let json: String = self
489 .flat_view(flat)
490 .await?
491 .to_columns_string(window.unwrap_or_default())
492 .await?;
493
494 Ok(js_sys::JSON::parse(&json)?.unchecked_into())
495 }
496
497 pub async fn csv_as_jsvalue(
498 &self,
499 flat: bool,
500 window: Option<ViewWindow>,
501 ) -> Result<js_sys::JsString, ApiError> {
502 let window = window.unwrap_or_default();
503 let csv = self.flat_view(flat).await?.to_csv(window).await;
504 Ok(csv.map(js_sys::JsString::from)?)
505 }
506
507 pub fn get_view(&self) -> Option<View> {
508 self.borrow()
509 .view_sub
510 .as_ref()
511 .map(|sub| sub.get_view().clone())
512 }
513
514 pub(crate) fn get_table_stats(&self) -> Option<ViewStats> {
515 self.borrow().stats.clone()
516 }
517
518 pub fn get_view_config(&'_ self) -> Ref<'_, ViewConfig> {
519 Ref::map(self.borrow(), |x| &x.config)
520 }
521
522 pub async fn get_column_values(&self, column: String) -> Result<Vec<String>, ApiError> {
533 let expressions = Some(self.borrow().config.expressions.clone());
534 let config = ViewConfigUpdate {
535 group_by: Some(vec![column]),
536 columns: Some(vec![]),
537 expressions,
538 ..ViewConfigUpdate::default()
539 };
540
541 let table = self.borrow().table.clone().unwrap();
542 let view = table.view(Some(config.clone())).await?;
543 let csv = view.to_csv(ViewWindow::default()).await?;
544
545 ApiFuture::spawn(async move {
546 view.delete().await?;
547 Ok(())
548 });
549
550 let res = csv
551 .lines()
552 .map(|val| {
553 if val.starts_with('\"') && val.ends_with('\"') {
554 (val[1..val.len() - 1]).to_owned()
555 } else {
556 val.to_owned()
557 }
558 })
559 .skip(2)
560 .collect::<Vec<String>>();
561 Ok(res)
562 }
563
564 pub fn set_update_column_defaults(
565 &self,
566 config_update: &mut ViewConfigUpdate,
567 requirements: &ViewConfigRequirements,
568 ) {
569 use self::column_defaults_update::*;
570 config_update.set_update_column_defaults(
571 &self.metadata(),
572 &self.get_view_config().columns,
573 requirements,
574 )
575 }
576
577 pub fn update_view_config(&self, config_update: ViewConfigUpdate) -> ApiResult<()> {
580 if let Some(x) = self.borrow().error.as_ref() {
581 tracing::warn!("Errored state");
582
583 return Err(ApiError::new(x.0.clone()));
585 }
586
587 if self.borrow_mut().config.apply_update(config_update) && self.0.borrow().is_clean {
588 self.0.borrow_mut().is_clean = false;
589 self.view_config_changed.emit(());
590 }
591
592 Ok(())
593 }
594
595 pub async fn validate(&self) -> Result<ValidSession<'_>, ApiError> {
598 let old = self.borrow_mut().old_config.take();
599 let is_diff = match old.as_ref() {
600 Some(old) => !old.is_equivalent(&self.borrow().config),
601 None => true,
602 };
603
604 if let Err(err) = self.validate_view_config().await {
605 let session = self.clone();
606 let poll_loop = LocalPollLoop::new(move |()| {
607 ApiFuture::spawn(session.reset(ResetOptions {
608 config: true,
609 expressions: true,
610 ..ResetOptions::default()
611 }));
612 Ok(JsValue::UNDEFINED)
613 });
614
615 self.borrow_mut().error = Some(TableErrorState(
616 err.clone(),
617 Some(ReconnectCallback::new(move || {
618 clone!(poll_loop);
619 Box::pin(async move {
620 poll_loop.poll(()).await;
621 Ok(())
622 })
623 })),
624 ));
625
626 if let Some(config) = old {
627 self.borrow_mut().config = config;
628 } else {
629 self.reset(ResetOptions {
630 config: true,
631 expressions: true,
632 ..ResetOptions::default()
633 })
634 .await?;
635 }
636
637 return Err(err);
638 } else {
639 let old_config = Some(self.borrow().config.clone());
640 self.borrow_mut().old_config = old_config;
641 }
642
643 Ok(ValidSession(self, is_diff))
644 }
645
646 async fn flat_view(&self, flat: bool) -> ApiResult<View> {
647 if flat {
648 let table = self.borrow().table.clone().into_apierror()?;
649 Ok(table.view(None).await?)
650 } else {
651 self.borrow()
652 .view_sub
653 .as_ref()
654 .map(|x| x.get_view().clone())
655 .into_apierror()
656 }
657 }
658
659 fn update_stats(&self, stats: ViewStats) {
660 self.borrow_mut().stats = Some(stats);
661 if let Some(cb) = self.on_stats_changed.borrow().as_ref() {
662 cb.emit(());
663 }
664 }
665
666 fn all_columns(&self) -> Vec<String> {
667 self.metadata()
668 .get_table_columns()
669 .into_iter()
670 .flatten()
671 .cloned()
672 .collect()
673 }
674
675 async fn validate_view_config(&self) -> ApiResult<()> {
676 let mut config = self.borrow().config.clone();
677 let table_columns = self.all_columns();
678 let all_columns: HashSet<String> = table_columns.iter().cloned().collect();
679 let mut view_columns: HashSet<&str> = HashSet::new();
680 let table = self
681 .borrow()
682 .table
683 .as_ref()
684 .ok_or_else(|| apierror!(NoTableError))?
685 .clone();
686
687 let expression_names = if self.metadata().get_features().unwrap().expressions {
688 let valid_recs = table
689 .validate_expressions(config.expressions.clone())
690 .await?;
691
692 self.metadata_mut().update_expressions(&valid_recs)?
693 } else {
694 HashSet::default()
695 };
696
697 if config.columns.is_empty() {
698 config.columns = table_columns.into_iter().map(Some).collect();
699 }
700
701 for column in config.columns.iter().flatten() {
702 if all_columns.contains(column) || expression_names.contains(column) {
703 let _existed = view_columns.insert(column);
704 } else {
705 return Err(apierror!(InvalidViewerConfigError(
706 "columns",
707 column.to_owned()
708 )));
709 }
710 }
711
712 for column in config.group_by.iter() {
713 if all_columns.contains(column) || expression_names.contains(column) {
714 let _existed = view_columns.insert(column);
715 } else {
716 return Err(apierror!(InvalidViewerConfigError(
717 "group_by",
718 column.to_owned(),
719 )));
720 }
721 }
722
723 for column in config.split_by.iter() {
724 if all_columns.contains(column) || expression_names.contains(column) {
725 let _existed = view_columns.insert(column);
726 } else {
727 return Err(apierror!(InvalidViewerConfigError(
728 "split_by",
729 column.to_owned(),
730 )));
731 }
732 }
733
734 for sort in config.sort.iter() {
735 if all_columns.contains(&sort.0) || expression_names.contains(&sort.0) {
736 let _existed = view_columns.insert(&sort.0);
737 } else {
738 return Err(apierror!(InvalidViewerConfigError(
739 "sort",
740 sort.0.to_owned(),
741 )));
742 }
743 }
744
745 for filter in config.filter.iter() {
746 if all_columns.contains(filter.column()) || expression_names.contains(filter.column()) {
748 let _existed = view_columns.insert(filter.column());
749 } else {
750 return Err(apierror!(InvalidViewerConfigError(
751 "filter",
752 filter.column().to_owned(),
753 )));
754 }
755 }
756
757 config
758 .aggregates
759 .retain(|column, _| view_columns.contains(column.as_str()));
760
761 self.borrow_mut().config = config;
762 Ok(())
763 }
764
765 fn reset_clean(&self) -> bool {
766 let mut is_clean = true;
767 std::mem::swap(&mut is_clean, &mut self.0.borrow_mut().is_clean);
768 is_clean
769 }
770
771 pub fn to_props(&self) -> SessionProps {
775 let data = self.borrow();
776 SessionProps {
777 config: PtrEqRc::new(data.config.clone()),
778 stats: data.stats.clone(),
779 has_table: if data.table.is_some() {
780 Some(TableLoadState::Loaded)
781 } else if data.is_loading {
782 Some(TableLoadState::Loading)
783 } else {
784 None
785 },
786 error: data.error.clone(),
787 title: data.title.clone(),
788 metadata: PtrEqRc::new(data.metadata.clone()),
789 }
790 }
791}
792
793pub struct ValidSession<'a>(&'a Session, bool);
795
796impl ValidSession<'_> {
797 pub async fn create_view(&self) -> Result<Option<View>, ApiError> {
801 if !self.0.reset_clean() && !self.0.borrow().is_paused {
802 if !self.1 {
803 let config = self.0.borrow().config.clone();
804 if let Some(sub) = &mut self.0.borrow_mut().view_sub.as_mut() {
805 sub.update_view_config(Rc::new(config));
806 return Ok(Some(sub.get_view().clone()));
807 }
808 }
809
810 let table = self
811 .0
812 .borrow()
813 .table
814 .clone()
815 .ok_or("`restore()` called before `load()`")?;
816
817 let mut view_config = self.0.borrow().config.clone();
818
819 for col in view_config
822 .columns
823 .iter()
824 .flatten()
825 .chain(view_config.sort.iter().map(|x| &x.0))
826 {
827 if !view_config.aggregates.contains_key(col.as_str()) {
828 let agg = self
829 .0
830 .metadata()
831 .get_column_aggregates(col.as_str())
832 .and_then(|mut aggs| aggs.next())
833 .into_apierror();
834
835 match agg {
836 Err(_) => tracing::warn!(
837 "No default aggregate for column '{}' found, skipping",
838 col
839 ),
840 Ok(agg) => _ = view_config.aggregates.insert(col.to_string(), agg),
841 };
842 }
843 }
844
845 let view = table.view(Some(view_config.into())).await?;
846 let view_schema = view.schema().await?;
847 self.0.metadata_mut().update_view_schema(&view_schema)?;
848 let on_stats = Callback::from({
849 let this = self.0.clone();
850 move |stats| this.update_stats(stats)
851 });
852
853 let sub = {
854 let config = self.0.borrow().config.clone();
855 let on_update = self
856 .0
857 .metadata()
858 .get_features()
859 .unwrap()
860 .on_update
861 .then(|| self.0.table_updated.callback());
862
863 ViewSubscription::new(view, config, on_stats, on_update).await?
864 };
865
866 let view = self.0.borrow_mut().view_sub.take();
867 ApiFuture::spawn(view.delete());
868 self.0.borrow_mut().view_sub = Some(sub);
869 }
870
871 Ok(self.0.get_view())
872 }
873}
874
875impl Drop for ValidSession<'_> {
876 fn drop(&mut self) {
879 self.0.view_created.emit(());
880 }
881}