1mod column_defaults_update;
14mod drag_drop_update;
15mod metadata;
16mod replace_expression_update;
17mod view_subscription;
18
19use std::cell::{Ref, RefCell};
20use std::collections::HashSet;
21use std::future::Future;
22use std::ops::Deref;
23use std::rc::Rc;
24use std::sync::Arc;
25
26use perspective_client::config::*;
27use perspective_client::{ReconnectCallback, View, ViewWindow};
28use perspective_js::utils::*;
29use wasm_bindgen::prelude::*;
30use yew::html::ImplicitClone;
31use yew::prelude::*;
32
33use self::metadata::*;
34use self::replace_expression_update::*;
35pub use self::view_subscription::ViewStats;
36use self::view_subscription::*;
37use crate::dragdrop::*;
38use crate::js::plugin::*;
39use crate::utils::*;
40
41#[derive(Clone, Default)]
45pub struct Session(Arc<SessionHandle>);
46
47impl ImplicitClone for Session {}
48
49#[derive(Default)]
51pub struct SessionHandle {
52 session_data: RefCell<SessionData>,
53 pub table_updated: PubSub<()>,
54 pub table_loaded: PubSub<()>,
55 pub view_created: PubSub<()>,
56 pub view_config_changed: PubSub<()>,
57 pub stats_changed: PubSub<Option<ViewStats>>,
58 pub table_errored: PubSub<Option<String>>,
59}
60
61#[derive(Default)]
63pub struct SessionData {
64 table: Option<perspective_client::Table>,
65 metadata: SessionMetadata,
66 old_config: Option<ViewConfig>,
67 config: ViewConfig,
68 view_sub: Option<ViewSubscription>,
69 stats: Option<ViewStats>,
70 is_clean: bool,
71 is_paused: bool,
72 error: Option<TableErrorState>,
73}
74
75#[derive(Clone, Default)]
76pub struct TableErrorState(Option<String>, Option<ReconnectCallback>);
77
78impl Deref for Session {
79 type Target = SessionHandle;
80
81 fn deref(&self) -> &Self::Target {
82 &self.0
83 }
84}
85
86impl PartialEq for Session {
87 fn eq(&self, other: &Self) -> bool {
88 Arc::ptr_eq(&self.0, &other.0)
89 }
90}
91
92impl Deref for SessionHandle {
93 type Target = RefCell<SessionData>;
94
95 fn deref(&self) -> &Self::Target {
96 &self.session_data
97 }
98}
99
100pub type MetadataRef<'a> = std::cell::Ref<'a, SessionMetadata>;
101pub type MetadataMutRef<'a> = std::cell::RefMut<'a, SessionMetadata>;
102
103impl Session {
104 pub fn metadata(&self) -> MetadataRef<'_> {
105 std::cell::Ref::map(self.borrow(), |x| &x.metadata)
106 }
107
108 pub fn metadata_mut(&self) -> MetadataMutRef<'_> {
109 std::cell::RefMut::map(self.borrow_mut(), |x| &mut x.metadata)
110 }
111
112 pub fn invalidate(&self) {
113 self.borrow_mut().error = None;
114 self.borrow_mut().is_clean = false;
115 self.borrow_mut().view_sub = None;
116 }
117
118 pub fn reset(&self, reset_expressions: bool) -> impl Future<Output = ApiResult<()>> + use<> {
123 self.borrow_mut().is_clean = false;
124 let view = self.0.borrow_mut().view_sub.take();
125 self.borrow_mut().view_sub = None;
126 self.borrow_mut().config.reset(reset_expressions);
127 view.delete()
128 }
129
130 pub async fn delete(&self) -> ApiResult<()> {
135 self.borrow_mut().is_clean = false;
136 self.borrow_mut().config.reset(true);
137 self.borrow_mut().metadata = SessionMetadata::default();
138 self.borrow_mut().table = None;
139 let view = self.borrow_mut().view_sub.take();
140 view.delete().await?;
141 Ok(())
142 }
143
144 pub fn has_table(&self) -> bool {
145 self.borrow().table.is_some()
146 }
147
148 pub fn get_table(&self) -> Option<perspective_client::Table> {
149 self.borrow().table.clone()
150 }
151
152 pub async fn set_table(&self, table: perspective_client::Table) -> ApiResult<JsValue> {
156 match SessionMetadata::from_table(&table).await {
157 Ok(metadata) => {
158 let client = table.get_client();
159 let set_error = self.table_errored.as_boxfn();
160 let session = self.clone();
161 let poll_loop =
162 LocalPollLoop::new(move |(message, reconnect): (Option<String>, _)| {
163 set_error(message.clone());
164 session.borrow_mut().error = Some(TableErrorState(message, reconnect));
165 Ok(JsValue::UNDEFINED)
166 });
167
168 let _callback_id = client
169 .on_error(Box::new(move |message, reconnect| {
170 let poll_loop = poll_loop.clone();
171 async move {
172 poll_loop.poll((message, reconnect)).await;
173 Ok(())
174 }
175 }))
176 .await?;
177
178 let sub = self.borrow_mut().view_sub.take();
179 self.borrow_mut().metadata = metadata;
180 self.borrow_mut().table = Some(table);
181 sub.delete().await?;
182 self.table_loaded.emit(());
183 Ok(JsValue::UNDEFINED)
184 },
185 Err(err) => self
186 .set_error(err.to_string())
187 .await
188 .map(|_| JsValue::UNDEFINED),
189 }
190 }
191
192 pub async fn set_error(&self, err: String) -> ApiResult<()> {
193 self.borrow_mut().error = Some(TableErrorState(Some(err.clone()), None));
194 self.table_errored.emit(Some(err.clone()));
195 let sub = self.borrow_mut().view_sub.take();
196 self.borrow_mut().metadata = SessionMetadata::default();
197 self.borrow_mut().table = None;
198 sub.delete().await?;
199 Err(err.into())
200 }
201
202 pub fn set_pause(&self, pause: bool) -> bool {
203 self.borrow_mut().is_clean = false;
204 if pause == self.borrow().is_paused {
205 false
206 } else if pause {
207 ApiFuture::spawn(self.borrow_mut().view_sub.take().delete());
208 self.borrow_mut().is_paused = true;
209 true
210 } else {
211 self.borrow_mut().is_paused = false;
212 true
213 }
214 }
215
216 pub async fn await_table(&self) -> ApiResult<()> {
217 if self.js_get_table().is_none() {
218 self.table_loaded.listen_once().await?;
219 let _ = self.js_get_table().ok_or("No table set")?;
220 }
221
222 Ok(())
223 }
224
225 pub fn js_get_table(&self) -> Option<JsValue> {
226 Some(perspective_js::Table::from(self.borrow().table.clone()?).into())
227 }
228
229 pub fn js_get_view(&self) -> Option<JsValue> {
230 let view = self.borrow().view_sub.as_ref()?.get_view().clone();
231 Some(perspective_js::View::from(view).into())
232 }
233
234 pub fn get_error(&self) -> Option<String> {
235 self.borrow().error.as_ref().and_then(|x| x.0.clone())
236 }
237
238 pub fn is_reconnect(&self) -> bool {
239 self.borrow()
240 .error
241 .as_ref()
242 .map(|x| x.1.is_some())
243 .unwrap_or_default()
244 }
245
246 pub async fn reconnect(&self) -> ApiResult<()> {
247 let err = self.borrow().error.clone();
248 if let Some(TableErrorState(_, Some(reconnect))) = err {
249 reconnect().await?;
250 self.borrow_mut().error = None;
251 self.borrow_mut().is_clean = false;
252 self.borrow_mut().view_sub = None;
253 }
254
255 Ok(())
256 }
257
258 pub fn is_column_expression_in_use(&self, name: &str) -> bool {
259 self.borrow().config.is_column_expression_in_use(name)
260 }
261
262 pub fn is_column_active(&self, name: &str) -> bool {
264 let config = Ref::map(self.borrow(), |x| &x.config);
265 config.columns.iter().any(|maybe_col| {
266 maybe_col
267 .as_ref()
268 .map(|col| col == name)
269 .unwrap_or_default()
270 }) || config.group_by.iter().any(|col| col == name)
271 || config.split_by.iter().any(|col| col == name)
272 || config.filter.iter().any(|col| col.column() == name)
273 || config.sort.iter().any(|col| col.0 == name)
274 }
275
276 pub fn create_drag_drop_update(
277 &self,
278 column: String,
279 index: usize,
280 drop: DragTarget,
281 drag: DragEffect,
282 requirements: &ViewConfigRequirements,
283 ) -> ViewConfigUpdate {
284 use self::drag_drop_update::*;
285 let col_type = self
286 .metadata()
287 .get_column_table_type(column.as_str())
288 .unwrap();
289
290 self.get_view_config().create_drag_drop_update(
291 column,
292 col_type,
293 index,
294 drop,
295 drag,
296 requirements,
297 self.metadata().get_features().unwrap(),
298 )
299 }
300
301 pub async fn create_replace_expression_update(
303 &self,
304 old_expr_name: &str,
305 new_expr: &Expression<'static>,
306 ) -> ViewConfigUpdate {
307 let old_expr_val = self
308 .metadata()
309 .get_expression_by_alias(old_expr_name)
310 .unwrap();
311
312 let old_expr = Expression::new(Some(old_expr_name.into()), old_expr_val.into());
313
314 use self::replace_expression_update::*;
315 self.get_view_config()
316 .create_replace_expression_update(&old_expr, new_expr)
317 }
318
319 pub async fn create_rename_expression_update(
320 &self,
321 old_expr_name: String,
322 new_expr_name: Option<String>,
323 ) -> ViewConfigUpdate {
324 let old_expr_val = self
325 .metadata()
326 .get_expression_by_alias(&old_expr_name)
327 .expect_throw(&format!("Unable to get expr with name {old_expr_name}"));
328 let old_expr = Expression::new(Some(old_expr_name.into()), old_expr_val.clone().into());
329 let new_expr = Expression::new(new_expr_name.map(|n| n.into()), old_expr_val.into());
330 self.get_view_config()
331 .create_replace_expression_update(&old_expr, &new_expr)
332 }
333
334 pub async fn validate_expr(
336 &self,
337 expr: &str,
338 ) -> Result<Option<perspective_client::ExprValidationError>, ApiError> {
339 let table = self.borrow().table.as_ref().unwrap().clone();
341 let errors = table
342 .validate_expressions(
343 ExpressionsDeserde::Map(std::collections::HashMap::from_iter([(
344 "_".to_string(),
345 expr.to_string(),
346 )]))
347 .into(),
348 )
349 .await?
350 .errors;
351
352 Ok(errors.get("_").cloned())
353 }
354
355 pub async fn arrow_as_vec(
356 &self,
357 flat: bool,
358 window: Option<ViewWindow>,
359 ) -> Result<Vec<u8>, ApiError> {
360 Ok(self
361 .flat_view(flat)
362 .await?
363 .to_arrow(window.unwrap_or_default())
364 .await?
365 .into())
366 }
367
368 pub async fn arrow_as_jsvalue(
369 &self,
370 flat: bool,
371 window: Option<ViewWindow>,
372 ) -> Result<js_sys::ArrayBuffer, ApiError> {
373 let arrow = self
374 .flat_view(flat)
375 .await?
376 .to_arrow(window.unwrap_or_default())
377 .await?;
378 Ok(js_sys::Uint8Array::from(&arrow[..])
379 .buffer()
380 .unchecked_into())
381 }
382
383 pub async fn ndjson_as_jsvalue(
384 &self,
385 flat: bool,
386 window: Option<ViewWindow>,
387 ) -> Result<js_sys::JsString, ApiError> {
388 let json: String = self
389 .flat_view(flat)
390 .await?
391 .to_ndjson(window.unwrap_or_default())
392 .await?;
393
394 Ok(json.into())
395 }
396
397 pub async fn json_as_jsvalue(
398 &self,
399 flat: bool,
400 window: Option<ViewWindow>,
401 ) -> Result<js_sys::Object, ApiError> {
402 let json: String = self
403 .flat_view(flat)
404 .await?
405 .to_columns_string(window.unwrap_or_default())
406 .await?;
407
408 Ok(js_sys::JSON::parse(&json)?.unchecked_into())
409 }
410
411 pub async fn csv_as_jsvalue(
412 &self,
413 flat: bool,
414 window: Option<ViewWindow>,
415 ) -> Result<js_sys::JsString, ApiError> {
416 let window = window.unwrap_or_default();
417 let csv = self.flat_view(flat).await?.to_csv(window).await;
418 Ok(csv.map(js_sys::JsString::from)?)
419 }
420
421 pub fn get_view(&self) -> Option<View> {
422 self.borrow()
423 .view_sub
424 .as_ref()
425 .map(|sub| sub.get_view().clone())
426 }
427
428 pub fn get_table_stats(&self) -> Option<ViewStats> {
429 self.borrow().stats.clone()
430 }
431
432 pub fn get_view_config(&self) -> Ref<ViewConfig> {
433 Ref::map(self.borrow(), |x| &x.config)
434 }
435
436 pub async fn get_column_values(&self, column: String) -> Result<Vec<String>, ApiError> {
447 let expressions = Some(self.borrow().config.expressions.clone());
448 let config = ViewConfigUpdate {
449 group_by: Some(vec![column]),
450 columns: Some(vec![]),
451 expressions,
452 ..ViewConfigUpdate::default()
453 };
454
455 let table = self.borrow().table.clone().unwrap();
456 let view = table.view(Some(config.clone())).await?;
457 let csv = view.to_csv(ViewWindow::default()).await?;
458
459 ApiFuture::spawn(async move {
460 view.delete().await?;
461 Ok(())
462 });
463
464 let res = csv
465 .lines()
466 .map(|val| {
467 if val.starts_with('\"') && val.ends_with('\"') {
468 (val[1..val.len() - 1]).to_owned()
469 } else {
470 val.to_owned()
471 }
472 })
473 .skip(2)
474 .collect::<Vec<String>>();
475 Ok(res)
476 }
477
478 pub fn set_update_column_defaults(
479 &self,
480 config_update: &mut ViewConfigUpdate,
481 requirements: &ViewConfigRequirements,
482 ) {
483 use self::column_defaults_update::*;
484 config_update.set_update_column_defaults(
485 &self.metadata(),
486 &self.borrow().config.columns,
487 requirements,
488 )
489 }
490
491 pub fn update_view_config(&self, config_update: ViewConfigUpdate) -> ApiResult<()> {
494 if let Some(x) = self.borrow().error.as_ref() {
495 tracing::warn!("Errored state");
496
497 return Err(ApiError::new(
499 x.0.clone().unwrap_or_else(|| "Unknown error".to_string()),
500 ));
501 }
502
503 if self.borrow_mut().config.apply_update(config_update) {
504 self.0.borrow_mut().is_clean = false;
505 self.view_config_changed.emit(());
506 }
507
508 Ok(())
509 }
510
511 pub fn reset_stats(&self) {
512 self.update_stats(ViewStats::default());
513 }
514
515 #[cfg(test)]
516 pub fn set_stats(&self, stats: ViewStats) {
517 self.update_stats(stats)
518 }
519
520 pub async fn validate(&self) -> Result<ValidSession<'_>, JsValue> {
523 let old = self.borrow_mut().old_config.take();
524 let is_diff = match old.as_ref() {
525 Some(old) => !old.is_equivalent(&self.borrow().config),
526 None => true,
527 };
528
529 if let Err(err) = self.validate_view_config().await {
530 self.borrow_mut().error = Some(TableErrorState(Some(err.to_string()), None));
531 web_sys::console::error_2(&"Failed to apply config:".into(), &err.clone().into());
532 if let Some(config) = old {
533 self.borrow_mut().config = config;
534 } else {
535 self.reset(true).await?;
536 }
537
538 return Err(err)?;
539 } else {
540 let old_config = Some(self.borrow().config.clone());
541 self.borrow_mut().old_config = old_config;
542 }
543
544 Ok(ValidSession(self, is_diff))
545 }
546
547 async fn flat_view(&self, flat: bool) -> ApiResult<View> {
548 if flat {
549 let table = self.borrow().table.clone().into_apierror()?;
550 Ok(table.view(None).await?)
551 } else {
552 self.borrow()
553 .view_sub
554 .as_ref()
555 .map(|x| x.get_view().clone())
556 .into_apierror()
557 }
558 }
559
560 fn update_stats(&self, stats: ViewStats) {
561 self.borrow_mut().stats = Some(stats.clone());
562 self.stats_changed.emit(Some(stats));
563 }
564
565 async fn validate_view_config(&self) -> ApiResult<()> {
566 let config = self.borrow().config.clone();
567 let table_columns = self
568 .metadata()
569 .get_table_columns()
570 .into_iter()
571 .flatten()
572 .cloned()
573 .collect::<Vec<String>>();
574
575 let all_columns: HashSet<String> = table_columns.iter().cloned().collect();
576
577 let mut view_columns: HashSet<&str> = HashSet::new();
578
579 let table = self
580 .borrow()
581 .table
582 .as_ref()
583 .ok_or("Trying to draw the viewer with no table attached")?
584 .clone();
585
586 let valid_recs = table.validate_expressions(config.expressions).await?;
587 let expression_names = self.metadata_mut().update_expressions(&valid_recs)?;
588
589 let mut config = self.borrow().config.clone();
592
593 if config.columns.is_empty() {
594 config.columns = table_columns.into_iter().map(Some).collect();
595 }
596
597 for column in config.columns.iter().flatten() {
598 if all_columns.contains(column) || expression_names.contains(column) {
599 let _existed = view_columns.insert(column);
600 } else {
601 return Err(format!("Unknown \"{}\" in `columns`", column).into());
602 }
603 }
604
605 for column in config.group_by.iter() {
606 if all_columns.contains(column) || expression_names.contains(column) {
607 let _existed = view_columns.insert(column);
608 } else {
609 return Err(format!("Unknown \"{}\" in `group_by`", column).into());
610 }
611 }
612
613 for column in config.split_by.iter() {
614 if all_columns.contains(column) || expression_names.contains(column) {
615 let _existed = view_columns.insert(column);
616 } else {
617 return Err(format!("Unknown \"{}\" in `split_by`", column).into());
618 }
619 }
620
621 for sort in config.sort.iter() {
622 if all_columns.contains(&sort.0) || expression_names.contains(&sort.0) {
623 let _existed = view_columns.insert(&sort.0);
624 } else {
625 return Err(format!("Unknown \"{}\" in `sort`", sort.0).into());
626 }
627 }
628
629 for filter in config.filter.iter() {
630 if all_columns.contains(filter.column()) || expression_names.contains(filter.column()) {
632 let _existed = view_columns.insert(filter.column());
633 } else {
634 return Err(format!("Unknown \"{}\" in `filter`", filter.column()).into());
635 }
636 }
637
638 config
639 .aggregates
640 .retain(|column, _| view_columns.contains(column.as_str()));
641
642 self.borrow_mut().config = config;
643 Ok(())
644 }
645
646 fn reset_clean(&self) -> bool {
647 let mut is_clean = true;
648 std::mem::swap(&mut is_clean, &mut self.0.borrow_mut().is_clean);
649 is_clean
650 }
651}
652
653pub struct ValidSession<'a>(&'a Session, bool);
655
656impl<'a> ValidSession<'a> {
657 pub async fn create_view(&self) -> Result<&'a Session, ApiError> {
661 if !self.0.reset_clean() && !self.0.borrow().is_paused {
662 if !self.1 {
663 let config = self.0.borrow().config.clone();
664 if let Some(sub) = &mut self.0.borrow_mut().view_sub.as_mut() {
665 sub.update_view_config(Rc::new(config));
666 return Ok(self.0);
667 }
668 }
669
670 let table = self
671 .0
672 .borrow()
673 .table
674 .clone()
675 .ok_or("`restore()` called before `load()`")?;
676
677 let view_config = self.0.borrow().config.clone();
678 let view = table.view(Some(view_config.into())).await?;
679 let view_schema = view.schema().await?;
680 self.0.metadata_mut().update_view_schema(&view_schema)?;
681 let on_stats = Callback::from({
682 let this = self.0.clone();
683 move |stats| this.update_stats(stats)
684 });
685
686 let sub = {
687 let config = self.0.borrow().config.clone();
688 let on_update = self.0.table_updated.callback();
689 ViewSubscription::new(view, config, on_stats, on_update)
690 };
691
692 let view = self.0.borrow_mut().view_sub.take();
693 ApiFuture::spawn(view.delete());
694 self.0.borrow_mut().view_sub = Some(sub);
695 }
696
697 Ok(self.0)
698 }
699}
700
701impl Drop for ValidSession<'_> {
702 fn drop(&mut self) {
705 self.0.view_created.emit(());
706 }
707}