1use std::collections::HashMap;
14use std::error::Error;
15use std::ops::Deref;
16use std::sync::Arc;
17
18use async_lock::{Mutex, RwLock};
19use futures::Future;
20use futures::future::{BoxFuture, LocalBoxFuture, join_all};
21use prost::Message;
22use serde::{Deserialize, Serialize};
23use ts_rs::TS;
24
25use crate::proto::request::ClientReq;
26use crate::proto::response::ClientResp;
27use crate::proto::{
28 ColumnType, GetFeaturesReq, GetFeaturesResp, GetHostedTablesReq, GetHostedTablesResp,
29 HostedTable, JoinType, MakeJoinTableReq, MakeTableReq, RemoveHostedTablesUpdateReq, Request,
30 Response, ServerError, ServerSystemInfoReq,
31};
32use crate::table::{JoinOptions, Table, TableInitOptions, TableOptions};
33use crate::table_data::{TableData, UpdateData};
34use crate::table_ref::TableRef;
35use crate::utils::*;
36use crate::view::{OnUpdateData, ViewWindow};
37use crate::{OnUpdateMode, OnUpdateOptions, asyncfn, clone};
38
39#[derive(Clone, Debug, Serialize, Deserialize, TS)]
41pub struct SystemInfo<T = u64> {
42 pub heap_size: T,
44
45 pub used_size: T,
47
48 pub cpu_time: u32,
53
54 pub cpu_time_epoch: u32,
56
57 pub timestamp: Option<T>,
61
62 pub client_heap: Option<T>,
65
66 pub client_used: Option<T>,
69}
70
71impl<U: Copy + 'static> SystemInfo<U> {
72 pub fn cast<T: Copy + 'static>(&self) -> SystemInfo<T>
75 where
76 U: num_traits::AsPrimitive<T>,
77 {
78 SystemInfo {
79 heap_size: self.heap_size.as_(),
80 used_size: self.used_size.as_(),
81 cpu_time: self.cpu_time,
82 cpu_time_epoch: self.cpu_time_epoch,
83 timestamp: self.timestamp.map(|x| x.as_()),
84 client_heap: self.client_heap.map(|x| x.as_()),
85 client_used: self.client_used.map(|x| x.as_()),
86 }
87 }
88}
89
90#[derive(Clone, Debug, Default, PartialEq)]
93pub struct Features(Arc<GetFeaturesResp>);
94
95impl Features {
96 pub fn get_group_rollup_modes(&self) -> Vec<crate::config::GroupRollupMode> {
97 self.group_rollup_mode
98 .iter()
99 .map(|x| {
100 crate::config::GroupRollupMode::from(
101 crate::proto::GroupRollupMode::try_from(*x).unwrap(),
102 )
103 })
104 .collect::<Vec<_>>()
105 }
106}
107
108impl Deref for Features {
109 type Target = GetFeaturesResp;
110
111 fn deref(&self) -> &Self::Target {
112 &self.0
113 }
114}
115
116impl GetFeaturesResp {
117 pub fn default_op(&self, col_type: ColumnType) -> Option<&str> {
118 self.filter_ops
119 .get(&(col_type as u32))?
120 .options
121 .first()
122 .map(|x| x.as_str())
123 }
124}
125
126type BoxFn<I, O> = Box<dyn Fn(I) -> O + Send + Sync + 'static>;
127type Box2Fn<I, J, O> = Box<dyn Fn(I, J) -> O + Send + Sync + 'static>;
128
129type Subscriptions<C> = Arc<RwLock<HashMap<u32, C>>>;
130type OnErrorCallback =
131 Box2Fn<ClientError, Option<ReconnectCallback>, BoxFuture<'static, Result<(), ClientError>>>;
132
133type OnceCallback = Box<dyn FnOnce(Response) -> ClientResult<()> + Send + Sync + 'static>;
134type SendCallback = Arc<
135 dyn for<'a> Fn(&'a Request) -> BoxFuture<'a, Result<(), Box<dyn Error + Send + Sync>>>
136 + Send
137 + Sync
138 + 'static,
139>;
140
141pub trait ClientHandler: Clone + Send + Sync + 'static {
143 fn send_request(
144 &self,
145 msg: Vec<u8>,
146 ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send;
147}
148
149mod name_registry {
150 use std::collections::HashSet;
151 use std::sync::{Arc, LazyLock, Mutex};
152
153 use crate::ClientError;
154 use crate::view::ClientResult;
155
156 static CLIENT_ID_GEN: LazyLock<Arc<Mutex<u32>>> = LazyLock::new(Arc::default);
157 static REGISTERED_CLIENTS: LazyLock<Arc<Mutex<HashSet<String>>>> = LazyLock::new(Arc::default);
158
159 pub(crate) fn generate_name(name: Option<&str>) -> ClientResult<String> {
160 if let Some(name) = name {
161 if let Some(name) = REGISTERED_CLIENTS
162 .lock()
163 .map_err(ClientError::from)?
164 .get(name)
165 {
166 Err(ClientError::DuplicateNameError(name.to_owned()))
167 } else {
168 Ok(name.to_owned())
169 }
170 } else {
171 let mut guard = CLIENT_ID_GEN.lock()?;
172 *guard += 1;
173 Ok(format!("client-{guard}"))
174 }
175 }
176}
177
178#[derive(Clone)]
185#[allow(clippy::type_complexity)]
186pub struct ReconnectCallback(
187 Arc<dyn Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync>,
188);
189
190impl Deref for ReconnectCallback {
191 type Target = dyn Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync;
192
193 fn deref(&self) -> &Self::Target {
194 &*self.0
195 }
196}
197
198impl ReconnectCallback {
199 pub fn new(
200 f: impl Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync + 'static,
201 ) -> Self {
202 ReconnectCallback(Arc::new(f))
203 }
204}
205
206#[derive(Clone)]
223pub struct Client {
224 name: Arc<String>,
225 features: Arc<Mutex<Option<Features>>>,
226 send: SendCallback,
227 id_gen: IDGen,
228 subscriptions_errors: Subscriptions<OnErrorCallback>,
229 subscriptions_once: Subscriptions<OnceCallback>,
230 subscriptions: Subscriptions<BoxFn<Response, BoxFuture<'static, Result<(), ClientError>>>>,
231}
232
233impl PartialEq for Client {
234 fn eq(&self, other: &Self) -> bool {
235 self.name == other.name
236 }
237}
238
239impl std::fmt::Debug for Client {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 f.debug_struct("Client").finish()
242 }
243}
244
245impl Client {
246 pub fn new_with_callback<T, U>(name: Option<&str>, send_request: T) -> ClientResult<Self>
249 where
250 T: Fn(Vec<u8>) -> U + 'static + Sync + Send,
251 U: Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send + 'static,
252 {
253 let name = name_registry::generate_name(name)?;
254 let send_request = Arc::new(send_request);
255 let send: SendCallback = Arc::new(move |req| {
256 let mut bytes: Vec<u8> = Vec::new();
257 req.encode(&mut bytes).unwrap();
258 let send_request = send_request.clone();
259 Box::pin(async move { send_request(bytes).await })
260 });
261
262 Ok(Client {
263 name: Arc::new(name),
264 features: Arc::default(),
265 id_gen: IDGen::default(),
266 send,
267 subscriptions: Subscriptions::default(),
268 subscriptions_errors: Arc::default(),
269 subscriptions_once: Arc::default(),
270 })
271 }
272
273 pub fn new<T>(name: Option<&str>, client_handler: T) -> ClientResult<Self>
275 where
276 T: ClientHandler + 'static + Sync + Send,
277 {
278 Self::new_with_callback(
279 name,
280 asyncfn!(client_handler, async move |req| {
281 client_handler.send_request(req).await
282 }),
283 )
284 }
285
286 pub fn get_name(&self) -> &'_ str {
287 self.name.as_str()
288 }
289
290 pub async fn handle_response<'a>(&'a self, msg: &'a [u8]) -> ClientResult<bool> {
297 let msg = Response::decode(msg)?;
298 tracing::debug!("RECV {}", msg);
299 let mut wr = self.subscriptions_once.write().await;
300 if let Some(handler) = (*wr).remove(&msg.msg_id) {
301 drop(wr);
302 handler(msg)?;
303 return Ok(true);
304 } else if let Some(handler) = self.subscriptions.try_read().unwrap().get(&msg.msg_id) {
305 drop(wr);
306 handler(msg).await?;
307 return Ok(true);
308 }
309
310 if let Response {
311 client_resp: Some(ClientResp::ServerError(ServerError { message, .. })),
312 ..
313 } = &msg
314 {
315 tracing::error!("{}", message);
316 } else {
317 tracing::debug!("Received unsolicited server response: {}", msg);
318 }
319
320 Ok(false)
321 }
322
323 pub async fn handle_error<T, U>(
325 &self,
326 message: ClientError,
327 reconnect: Option<T>,
328 ) -> ClientResult<()>
329 where
330 T: Fn() -> U + Clone + Send + Sync + 'static,
331 U: Future<Output = ClientResult<()>>,
332 {
333 let subs = self.subscriptions_errors.read().await;
334 let tasks = join_all(subs.values().map(|callback| {
335 callback(
336 message.clone(),
337 reconnect.clone().map(move |f| {
338 ReconnectCallback(Arc::new(move || {
339 clone!(f);
340 Box::pin(async move { Ok(f().await?) }) as LocalBoxFuture<'static, _>
341 }))
342 }),
343 )
344 }));
345
346 tasks.await.into_iter().collect::<Result<(), _>>()?;
347 self.close_and_error_subscriptions(&message).await
348 }
349
350 async fn close_and_error_subscriptions(&self, message: &ClientError) -> ClientResult<()> {
355 let synthetic_error = |msg_id| Response {
356 msg_id,
357 entity_id: "".to_string(),
358 client_resp: Some(ClientResp::ServerError(ServerError {
359 message: format!("{message}"),
360 status_code: 2,
361 })),
362 };
363
364 self.subscriptions.write().await.clear();
365 let callbacks_once = self
366 .subscriptions_once
367 .write()
368 .await
369 .drain()
370 .collect::<Vec<_>>();
371
372 callbacks_once
373 .into_iter()
374 .try_for_each(|(msg_id, f)| f(synthetic_error(msg_id)))
375 }
376
377 pub async fn on_error<T, U, V>(&self, on_error: T) -> ClientResult<u32>
378 where
379 T: Fn(ClientError, Option<ReconnectCallback>) -> U + Clone + Send + Sync + 'static,
380 U: Future<Output = V> + Send + 'static,
381 V: Into<Result<(), ClientError>> + Sync + 'static,
382 {
383 let id = self.gen_id();
384 let callback = asyncfn!(on_error, async move |x, y| on_error(x, y).await.into());
385 self.subscriptions_errors
386 .write()
387 .await
388 .insert(id, Box::new(move |x, y| Box::pin(callback(x, y))));
389
390 Ok(id)
391 }
392
393 pub(crate) fn gen_id(&self) -> u32 {
395 self.id_gen.next()
396 }
397
398 pub(crate) async fn unsubscribe(&self, update_id: u32) -> ClientResult<()> {
399 let callback = self
400 .subscriptions
401 .write()
402 .await
403 .remove(&update_id)
404 .ok_or(ClientError::Unknown("remove_update".to_string()))?;
405
406 drop(callback);
407 Ok(())
408 }
409
410 pub(crate) async fn subscribe_once(
412 &self,
413 msg: &Request,
414 on_update: Box<dyn FnOnce(Response) -> ClientResult<()> + Send + Sync + 'static>,
415 ) -> ClientResult<()> {
416 self.subscriptions_once
417 .write()
418 .await
419 .insert(msg.msg_id, on_update);
420
421 tracing::debug!("SEND {}", msg);
422 if let Err(e) = (self.send)(msg).await {
423 self.subscriptions_once.write().await.remove(&msg.msg_id);
424 Err(ClientError::Unknown(e.to_string()))
425 } else {
426 Ok(())
427 }
428 }
429
430 pub(crate) async fn subscribe<T, U>(&self, msg: &Request, on_update: T) -> ClientResult<()>
431 where
432 T: Fn(Response) -> U + Send + Sync + 'static,
433 U: Future<Output = Result<(), ClientError>> + Send + 'static,
434 {
435 self.subscriptions
436 .write()
437 .await
438 .insert(msg.msg_id, Box::new(move |x| Box::pin(on_update(x))));
439
440 tracing::debug!("SEND {}", msg);
441 if let Err(e) = (self.send)(msg).await {
442 self.subscriptions.write().await.remove(&msg.msg_id);
443 Err(ClientError::Unknown(e.to_string()))
444 } else {
445 Ok(())
446 }
447 }
448
449 pub(crate) async fn oneshot(&self, req: &Request) -> ClientResult<ClientResp> {
452 let (sender, receiver) = futures::channel::oneshot::channel::<ClientResp>();
453 let on_update = Box::new(move |res: Response| {
454 sender.send(res.client_resp.unwrap()).map_err(|x| x.into())
455 });
456
457 self.subscribe_once(req, on_update).await?;
458 receiver
459 .await
460 .map_err(|_| ClientError::Unknown(format!("Internal error for req {req}")))
461 }
462
463 pub(crate) async fn get_features(&self) -> ClientResult<Features> {
464 let mut guard = self.features.lock().await;
465 let features = if let Some(features) = &*guard {
466 features.clone()
467 } else {
468 let msg = Request {
469 msg_id: self.gen_id(),
470 entity_id: "".to_owned(),
471 client_req: Some(ClientReq::GetFeaturesReq(GetFeaturesReq {})),
472 };
473
474 let features = Features(Arc::new(match self.oneshot(&msg).await? {
475 ClientResp::GetFeaturesResp(features) => Ok(features),
476 resp => Err(resp),
477 }?));
478
479 *guard = Some(features.clone());
480 features
481 };
482
483 Ok(features)
484 }
485
486 pub async fn table(&self, input: TableData, options: TableInitOptions) -> ClientResult<Table> {
539 let entity_id = match options.name.clone() {
540 Some(x) => x.to_owned(),
541 None => randid(),
542 };
543
544 if let TableData::View(view) = &input {
545 let window = ViewWindow::default();
546 let arrow = view.to_arrow(window).await?;
547 let mut table = self
548 .crate_table_inner(UpdateData::Arrow(arrow).into(), options.into(), entity_id)
549 .await?;
550
551 let table_ = table.clone();
552 let callback = asyncfn!(table_, update, async move |update: OnUpdateData| {
553 let update = UpdateData::Arrow(update.delta.expect("Malformed message").into());
554 let options = crate::UpdateOptions::default();
555 table_.update(update, options).await.unwrap_or_log();
556 });
557
558 let options = OnUpdateOptions {
559 mode: Some(OnUpdateMode::Row),
560 };
561
562 let on_update_token = view.on_update(callback, options).await?;
563 table.view_update_token = Some(on_update_token);
564 Ok(table)
565 } else {
566 self.crate_table_inner(input, options.into(), entity_id)
567 .await
568 }
569 }
570
571 async fn crate_table_inner(
572 &self,
573 input: TableData,
574 options: TableOptions,
575 entity_id: String,
576 ) -> ClientResult<Table> {
577 let msg = Request {
578 msg_id: self.gen_id(),
579 entity_id: entity_id.clone(),
580 client_req: Some(ClientReq::MakeTableReq(MakeTableReq {
581 data: Some(input.into()),
582 options: Some(options.clone().try_into()?),
583 })),
584 };
585
586 let client = self.clone();
587 match self.oneshot(&msg).await? {
588 ClientResp::MakeTableResp(_) => Ok(Table::new(entity_id, client, options)),
589 resp => Err(resp.into()),
590 }
591 }
592
593 pub async fn join(
605 &self,
606 left: TableRef,
607 right: TableRef,
608 on: &str,
609 options: JoinOptions,
610 ) -> ClientResult<Table> {
611 let entity_id = options.name.unwrap_or_else(randid);
612 let join_type: JoinType = options.join_type.unwrap_or_default();
613 let right_on_column = options.right_on.unwrap_or_default();
614 let msg = Request {
615 msg_id: self.gen_id(),
616 entity_id: entity_id.clone(),
617 client_req: Some(ClientReq::MakeJoinTableReq(MakeJoinTableReq {
618 left_table_id: left.table_name().to_owned(),
619 right_table_id: right.table_name().to_owned(),
620 on_column: on.to_owned(),
621 join_type: join_type.into(),
622 right_on_column,
623 })),
624 };
625
626 let client = self.clone();
627 match self.oneshot(&msg).await? {
628 ClientResp::MakeJoinTableResp(_) => Ok(Table::new(entity_id, client, TableOptions {
629 index: Some(on.to_owned()),
630 limit: None,
631 })),
632 resp => Err(resp.into()),
633 }
634 }
635
636 async fn get_table_infos(&self) -> ClientResult<Vec<HostedTable>> {
637 let msg = Request {
638 msg_id: self.gen_id(),
639 entity_id: "".to_owned(),
640 client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
641 subscribe: false,
642 })),
643 };
644
645 match self.oneshot(&msg).await? {
646 ClientResp::GetHostedTablesResp(GetHostedTablesResp { table_infos }) => Ok(table_infos),
647 resp => Err(resp.into()),
648 }
649 }
650
651 pub async fn open_table(&self, entity_id: String) -> ClientResult<Table> {
664 let infos = self.get_table_infos().await?;
665
666 if let Some(info) = infos.into_iter().find(|i| i.entity_id == entity_id) {
668 let options = TableOptions {
669 index: info.index,
670 limit: info.limit,
671 };
672
673 let client = self.clone();
674 Ok(Table::new(entity_id, client, options))
675 } else {
676 Err(ClientError::Unknown(format!(
677 "Unknown table \"{}\"",
678 entity_id
679 )))
680 }
681 }
682
683 pub async fn get_hosted_table_names(&self) -> ClientResult<Vec<String>> {
696 let msg = Request {
697 msg_id: self.gen_id(),
698 entity_id: "".to_owned(),
699 client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
700 subscribe: false,
701 })),
702 };
703
704 match self.oneshot(&msg).await? {
705 ClientResp::GetHostedTablesResp(GetHostedTablesResp { table_infos }) => {
706 Ok(table_infos.into_iter().map(|i| i.entity_id).collect())
707 },
708 resp => Err(resp.into()),
709 }
710 }
711
712 pub async fn on_hosted_tables_update<T, U>(&self, on_update: T) -> ClientResult<u32>
716 where
717 T: Fn() -> U + Send + Sync + 'static,
718 U: Future<Output = ()> + Send + 'static,
719 {
720 let on_update = Arc::new(on_update);
721 let callback = asyncfn!(on_update, async move |resp: Response| {
722 match resp.client_resp {
723 Some(ClientResp::GetHostedTablesResp(_)) | None => {
724 on_update().await;
725 Ok(())
726 },
727 resp => Err(resp.into()),
728 }
729 });
730
731 let msg = Request {
732 msg_id: self.gen_id(),
733 entity_id: "".to_owned(),
734 client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
735 subscribe: true,
736 })),
737 };
738
739 self.subscribe(&msg, callback).await?;
740 Ok(msg.msg_id)
741 }
742
743 pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ClientResult<()> {
746 let msg = Request {
747 msg_id: self.gen_id(),
748 entity_id: "".to_owned(),
749 client_req: Some(ClientReq::RemoveHostedTablesUpdateReq(
750 RemoveHostedTablesUpdateReq { id: update_id },
751 )),
752 };
753
754 self.unsubscribe(update_id).await?;
755 match self.oneshot(&msg).await? {
756 ClientResp::RemoveHostedTablesUpdateResp(_) => Ok(()),
757 resp => Err(resp.into()),
758 }
759 }
760
761 pub async fn system_info(&self) -> ClientResult<SystemInfo> {
765 let msg = Request {
766 msg_id: self.gen_id(),
767 entity_id: "".to_string(),
768 client_req: Some(ClientReq::ServerSystemInfoReq(ServerSystemInfoReq {})),
769 };
770
771 match self.oneshot(&msg).await? {
772 ClientResp::ServerSystemInfoResp(resp) => {
773 #[cfg(not(target_family = "wasm"))]
774 let timestamp = Some(
775 std::time::SystemTime::now()
776 .duration_since(std::time::UNIX_EPOCH)?
777 .as_millis() as u64,
778 );
779
780 #[cfg(target_family = "wasm")]
781 let timestamp = None;
782
783 #[cfg(feature = "talc-allocator")]
784 let (client_used, client_heap) = {
785 let (client_used, client_heap) = crate::utils::get_used();
786 (Some(client_used as u64), Some(client_heap as u64))
787 };
788
789 #[cfg(not(feature = "talc-allocator"))]
790 let (client_used, client_heap) = (None, None);
791
792 let info = SystemInfo {
793 heap_size: resp.heap_size,
794 used_size: resp.used_size,
795 cpu_time: resp.cpu_time,
796 cpu_time_epoch: resp.cpu_time_epoch,
797 timestamp,
798 client_heap,
799 client_used,
800 };
801
802 Ok(info)
803 },
804 resp => Err(resp.into()),
805 }
806 }
807}