1use std::collections::HashMap;
14use std::error::Error;
15use std::ops::Deref;
16use std::sync::Arc;
17
18use async_lock::{Mutex, RwLock};
19use futures::Future;
20use futures::future::{BoxFuture, LocalBoxFuture, join_all};
21use prost::Message;
22use serde::{Deserialize, Serialize};
23use ts_rs::TS;
24
25use crate::proto::request::ClientReq;
26use crate::proto::response::ClientResp;
27use crate::proto::{
28 ColumnType, GetFeaturesReq, GetFeaturesResp, GetHostedTablesReq, GetHostedTablesResp,
29 HostedTable, MakeTableReq, RemoveHostedTablesUpdateReq, Request, Response, ServerError,
30 ServerSystemInfoReq,
31};
32use crate::table::{Table, TableInitOptions, TableOptions};
33use crate::table_data::{TableData, UpdateData};
34use crate::utils::*;
35use crate::view::{OnUpdateData, ViewWindow};
36use crate::{OnUpdateMode, OnUpdateOptions, asyncfn, clone};
37
38#[derive(Clone, Debug, Serialize, Deserialize, TS)]
40pub struct SystemInfo<T = u64> {
41 pub heap_size: T,
43
44 pub used_size: T,
46
47 pub cpu_time: u32,
52
53 pub cpu_time_epoch: u32,
55
56 pub timestamp: Option<T>,
60
61 pub client_heap: Option<T>,
64
65 pub client_used: Option<T>,
68}
69
70impl<U: Copy + 'static> SystemInfo<U> {
71 pub fn cast<T: Copy + 'static>(&self) -> SystemInfo<T>
74 where
75 U: num_traits::AsPrimitive<T>,
76 {
77 SystemInfo {
78 heap_size: self.heap_size.as_(),
79 used_size: self.used_size.as_(),
80 cpu_time: self.cpu_time,
81 cpu_time_epoch: self.cpu_time_epoch,
82 timestamp: self.timestamp.map(|x| x.as_()),
83 client_heap: self.client_heap.map(|x| x.as_()),
84 client_used: self.client_used.map(|x| x.as_()),
85 }
86 }
87}
88
89#[derive(Clone, Debug, Default)]
92pub struct Features(Arc<GetFeaturesResp>);
93
94impl Features {
95 pub fn get_group_rollup_modes(&self) -> Vec<crate::config::GroupRollupMode> {
96 self.group_rollup_mode
97 .iter()
98 .map(|x| {
99 crate::config::GroupRollupMode::from(
100 crate::proto::GroupRollupMode::try_from(*x).unwrap(),
101 )
102 })
103 .collect::<Vec<_>>()
104 }
105}
106
107impl Deref for Features {
108 type Target = GetFeaturesResp;
109
110 fn deref(&self) -> &Self::Target {
111 &self.0
112 }
113}
114
115impl GetFeaturesResp {
116 pub fn default_op(&self, col_type: ColumnType) -> Option<&str> {
117 self.filter_ops
118 .get(&(col_type as u32))?
119 .options
120 .first()
121 .map(|x| x.as_str())
122 }
123}
124
125type BoxFn<I, O> = Box<dyn Fn(I) -> O + Send + Sync + 'static>;
126type Box2Fn<I, J, O> = Box<dyn Fn(I, J) -> O + Send + Sync + 'static>;
127
128type Subscriptions<C> = Arc<RwLock<HashMap<u32, C>>>;
129type OnErrorCallback =
130 Box2Fn<ClientError, Option<ReconnectCallback>, BoxFuture<'static, Result<(), ClientError>>>;
131
132type OnceCallback = Box<dyn FnOnce(Response) -> ClientResult<()> + Send + Sync + 'static>;
133type SendCallback = Arc<
134 dyn for<'a> Fn(&'a Request) -> BoxFuture<'a, Result<(), Box<dyn Error + Send + Sync>>>
135 + Send
136 + Sync
137 + 'static,
138>;
139
140pub trait ClientHandler: Clone + Send + Sync + 'static {
142 fn send_request(
143 &self,
144 msg: Vec<u8>,
145 ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send;
146}
147
148mod name_registry {
149 use std::collections::HashSet;
150 use std::sync::{Arc, LazyLock, Mutex};
151
152 use crate::ClientError;
153 use crate::view::ClientResult;
154
155 static CLIENT_ID_GEN: LazyLock<Arc<Mutex<u32>>> = LazyLock::new(Arc::default);
156 static REGISTERED_CLIENTS: LazyLock<Arc<Mutex<HashSet<String>>>> = LazyLock::new(Arc::default);
157
158 pub(crate) fn generate_name(name: Option<&str>) -> ClientResult<String> {
159 if let Some(name) = name {
160 if let Some(name) = REGISTERED_CLIENTS
161 .lock()
162 .map_err(ClientError::from)?
163 .get(name)
164 {
165 Err(ClientError::DuplicateNameError(name.to_owned()))
166 } else {
167 Ok(name.to_owned())
168 }
169 } else {
170 let mut guard = CLIENT_ID_GEN.lock()?;
171 *guard += 1;
172 Ok(format!("client-{guard}"))
173 }
174 }
175}
176
177#[derive(Clone)]
184#[allow(clippy::type_complexity)]
185pub struct ReconnectCallback(
186 Arc<dyn Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync>,
187);
188
189impl Deref for ReconnectCallback {
190 type Target = dyn Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync;
191
192 fn deref(&self) -> &Self::Target {
193 &*self.0
194 }
195}
196
197impl ReconnectCallback {
198 pub fn new(
199 f: impl Fn() -> LocalBoxFuture<'static, Result<(), Box<dyn Error>>> + Send + Sync + 'static,
200 ) -> Self {
201 ReconnectCallback(Arc::new(f))
202 }
203}
204
205#[derive(Clone)]
222pub struct Client {
223 name: Arc<String>,
224 features: Arc<Mutex<Option<Features>>>,
225 send: SendCallback,
226 id_gen: IDGen,
227 subscriptions_errors: Subscriptions<OnErrorCallback>,
228 subscriptions_once: Subscriptions<OnceCallback>,
229 subscriptions: Subscriptions<BoxFn<Response, BoxFuture<'static, Result<(), ClientError>>>>,
230}
231
232impl PartialEq for Client {
233 fn eq(&self, other: &Self) -> bool {
234 self.name == other.name
235 }
236}
237
238impl std::fmt::Debug for Client {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 f.debug_struct("Client").finish()
241 }
242}
243
244impl Client {
245 pub fn new_with_callback<T, U>(name: Option<&str>, send_request: T) -> ClientResult<Self>
248 where
249 T: Fn(Vec<u8>) -> U + 'static + Sync + Send,
250 U: Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send + 'static,
251 {
252 let name = name_registry::generate_name(name)?;
253 let send_request = Arc::new(send_request);
254 let send: SendCallback = Arc::new(move |req| {
255 let mut bytes: Vec<u8> = Vec::new();
256 req.encode(&mut bytes).unwrap();
257 let send_request = send_request.clone();
258 Box::pin(async move { send_request(bytes).await })
259 });
260
261 Ok(Client {
262 name: Arc::new(name),
263 features: Arc::default(),
264 id_gen: IDGen::default(),
265 send,
266 subscriptions: Subscriptions::default(),
267 subscriptions_errors: Arc::default(),
268 subscriptions_once: Arc::default(),
269 })
270 }
271
272 pub fn new<T>(name: Option<&str>, client_handler: T) -> ClientResult<Self>
274 where
275 T: ClientHandler + 'static + Sync + Send,
276 {
277 Self::new_with_callback(
278 name,
279 asyncfn!(client_handler, async move |req| {
280 client_handler.send_request(req).await
281 }),
282 )
283 }
284
285 pub fn get_name(&self) -> &'_ str {
286 self.name.as_str()
287 }
288
289 pub async fn handle_response<'a>(&'a self, msg: &'a [u8]) -> ClientResult<bool> {
296 let msg = Response::decode(msg)?;
297 tracing::debug!("RECV {}", msg);
298 let mut wr = self.subscriptions_once.write().await;
299 if let Some(handler) = (*wr).remove(&msg.msg_id) {
300 drop(wr);
301 handler(msg)?;
302 return Ok(true);
303 } else if let Some(handler) = self.subscriptions.try_read().unwrap().get(&msg.msg_id) {
304 drop(wr);
305 handler(msg).await?;
306 return Ok(true);
307 }
308
309 if let Response {
310 client_resp: Some(ClientResp::ServerError(ServerError { message, .. })),
311 ..
312 } = &msg
313 {
314 tracing::error!("{}", message);
315 } else {
316 tracing::debug!("Received unsolicited server response: {}", msg);
317 }
318
319 Ok(false)
320 }
321
322 pub async fn handle_error<T, U>(
324 &self,
325 message: ClientError,
326 reconnect: Option<T>,
327 ) -> ClientResult<()>
328 where
329 T: Fn() -> U + Clone + Send + Sync + 'static,
330 U: Future<Output = ClientResult<()>>,
331 {
332 let subs = self.subscriptions_errors.read().await;
333 let tasks = join_all(subs.values().map(|callback| {
334 callback(
335 message.clone(),
336 reconnect.clone().map(move |f| {
337 ReconnectCallback(Arc::new(move || {
338 clone!(f);
339 Box::pin(async move { Ok(f().await?) }) as LocalBoxFuture<'static, _>
340 }))
341 }),
342 )
343 }));
344
345 tasks.await.into_iter().collect::<Result<(), _>>()?;
346 self.close_and_error_subscriptions(&message).await
347 }
348
349 async fn close_and_error_subscriptions(&self, message: &ClientError) -> ClientResult<()> {
354 let synthetic_error = |msg_id| Response {
355 msg_id,
356 entity_id: "".to_string(),
357 client_resp: Some(ClientResp::ServerError(ServerError {
358 message: format!("{message}"),
359 status_code: 2,
360 })),
361 };
362
363 self.subscriptions.write().await.clear();
364 let callbacks_once = self
365 .subscriptions_once
366 .write()
367 .await
368 .drain()
369 .collect::<Vec<_>>();
370
371 callbacks_once
372 .into_iter()
373 .try_for_each(|(msg_id, f)| f(synthetic_error(msg_id)))
374 }
375
376 pub async fn on_error<T, U, V>(&self, on_error: T) -> ClientResult<u32>
377 where
378 T: Fn(ClientError, Option<ReconnectCallback>) -> U + Clone + Send + Sync + 'static,
379 U: Future<Output = V> + Send + 'static,
380 V: Into<Result<(), ClientError>> + Sync + 'static,
381 {
382 let id = self.gen_id();
383 let callback = asyncfn!(on_error, async move |x, y| on_error(x, y).await.into());
384 self.subscriptions_errors
385 .write()
386 .await
387 .insert(id, Box::new(move |x, y| Box::pin(callback(x, y))));
388
389 Ok(id)
390 }
391
392 pub(crate) fn gen_id(&self) -> u32 {
394 self.id_gen.next()
395 }
396
397 pub(crate) async fn unsubscribe(&self, update_id: u32) -> ClientResult<()> {
398 let callback = self
399 .subscriptions
400 .write()
401 .await
402 .remove(&update_id)
403 .ok_or(ClientError::Unknown("remove_update".to_string()))?;
404
405 drop(callback);
406 Ok(())
407 }
408
409 pub(crate) async fn subscribe_once(
411 &self,
412 msg: &Request,
413 on_update: Box<dyn FnOnce(Response) -> ClientResult<()> + Send + Sync + 'static>,
414 ) -> ClientResult<()> {
415 self.subscriptions_once
416 .write()
417 .await
418 .insert(msg.msg_id, on_update);
419
420 tracing::debug!("SEND {}", msg);
421 if let Err(e) = (self.send)(msg).await {
422 self.subscriptions_once.write().await.remove(&msg.msg_id);
423 Err(ClientError::Unknown(e.to_string()))
424 } else {
425 Ok(())
426 }
427 }
428
429 pub(crate) async fn subscribe<T, U>(&self, msg: &Request, on_update: T) -> ClientResult<()>
430 where
431 T: Fn(Response) -> U + Send + Sync + 'static,
432 U: Future<Output = Result<(), ClientError>> + Send + 'static,
433 {
434 self.subscriptions
435 .write()
436 .await
437 .insert(msg.msg_id, Box::new(move |x| Box::pin(on_update(x))));
438
439 tracing::debug!("SEND {}", msg);
440 if let Err(e) = (self.send)(msg).await {
441 self.subscriptions.write().await.remove(&msg.msg_id);
442 Err(ClientError::Unknown(e.to_string()))
443 } else {
444 Ok(())
445 }
446 }
447
448 pub(crate) async fn oneshot(&self, req: &Request) -> ClientResult<ClientResp> {
451 let (sender, receiver) = futures::channel::oneshot::channel::<ClientResp>();
452 let on_update = Box::new(move |res: Response| {
453 sender.send(res.client_resp.unwrap()).map_err(|x| x.into())
454 });
455
456 self.subscribe_once(req, on_update).await?;
457 receiver
458 .await
459 .map_err(|_| ClientError::Unknown(format!("Internal error for req {req}")))
460 }
461
462 pub(crate) async fn get_features(&self) -> ClientResult<Features> {
463 let mut guard = self.features.lock().await;
464 let features = if let Some(features) = &*guard {
465 features.clone()
466 } else {
467 let msg = Request {
468 msg_id: self.gen_id(),
469 entity_id: "".to_owned(),
470 client_req: Some(ClientReq::GetFeaturesReq(GetFeaturesReq {})),
471 };
472
473 let features = Features(Arc::new(match self.oneshot(&msg).await? {
474 ClientResp::GetFeaturesResp(features) => Ok(features),
475 resp => Err(resp),
476 }?));
477
478 *guard = Some(features.clone());
479 features
480 };
481
482 Ok(features)
483 }
484
485 pub async fn table(&self, input: TableData, options: TableInitOptions) -> ClientResult<Table> {
538 let entity_id = match options.name.clone() {
539 Some(x) => x.to_owned(),
540 None => randid(),
541 };
542
543 if let TableData::View(view) = &input {
544 let window = ViewWindow::default();
545 let arrow = view.to_arrow(window).await?;
546 let mut table = self
547 .crate_table_inner(UpdateData::Arrow(arrow).into(), options.into(), entity_id)
548 .await?;
549
550 let table_ = table.clone();
551 let callback = asyncfn!(table_, update, async move |update: OnUpdateData| {
552 let update = UpdateData::Arrow(update.delta.expect("Malformed message").into());
553 let options = crate::UpdateOptions::default();
554 table_.update(update, options).await.unwrap_or_log();
555 });
556
557 let options = OnUpdateOptions {
558 mode: Some(OnUpdateMode::Row),
559 };
560
561 let on_update_token = view.on_update(callback, options).await?;
562 table.view_update_token = Some(on_update_token);
563 Ok(table)
564 } else {
565 self.crate_table_inner(input, options.into(), entity_id)
566 .await
567 }
568 }
569
570 async fn crate_table_inner(
571 &self,
572 input: TableData,
573 options: TableOptions,
574 entity_id: String,
575 ) -> ClientResult<Table> {
576 let msg = Request {
577 msg_id: self.gen_id(),
578 entity_id: entity_id.clone(),
579 client_req: Some(ClientReq::MakeTableReq(MakeTableReq {
580 data: Some(input.into()),
581 options: Some(options.clone().try_into()?),
582 })),
583 };
584
585 let client = self.clone();
586 match self.oneshot(&msg).await? {
587 ClientResp::MakeTableResp(_) => Ok(Table::new(entity_id, client, options)),
588 resp => Err(resp.into()),
589 }
590 }
591
592 async fn get_table_infos(&self) -> ClientResult<Vec<HostedTable>> {
593 let msg = Request {
594 msg_id: self.gen_id(),
595 entity_id: "".to_owned(),
596 client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
597 subscribe: false,
598 })),
599 };
600
601 match self.oneshot(&msg).await? {
602 ClientResp::GetHostedTablesResp(GetHostedTablesResp { table_infos }) => Ok(table_infos),
603 resp => Err(resp.into()),
604 }
605 }
606
607 pub async fn open_table(&self, entity_id: String) -> ClientResult<Table> {
620 let infos = self.get_table_infos().await?;
621
622 if let Some(info) = infos.into_iter().find(|i| i.entity_id == entity_id) {
624 let options = TableOptions {
625 index: info.index,
626 limit: info.limit,
627 };
628
629 let client = self.clone();
630 Ok(Table::new(entity_id, client, options))
631 } else {
632 Err(ClientError::Unknown(format!(
633 "Unknown table \"{}\"",
634 entity_id
635 )))
636 }
637 }
638
639 pub async fn get_hosted_table_names(&self) -> ClientResult<Vec<String>> {
652 let msg = Request {
653 msg_id: self.gen_id(),
654 entity_id: "".to_owned(),
655 client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
656 subscribe: false,
657 })),
658 };
659
660 match self.oneshot(&msg).await? {
661 ClientResp::GetHostedTablesResp(GetHostedTablesResp { table_infos }) => {
662 Ok(table_infos.into_iter().map(|i| i.entity_id).collect())
663 },
664 resp => Err(resp.into()),
665 }
666 }
667
668 pub async fn on_hosted_tables_update<T, U>(&self, on_update: T) -> ClientResult<u32>
672 where
673 T: Fn() -> U + Send + Sync + 'static,
674 U: Future<Output = ()> + Send + 'static,
675 {
676 let on_update = Arc::new(on_update);
677 let callback = asyncfn!(on_update, async move |resp: Response| {
678 match resp.client_resp {
679 Some(ClientResp::GetHostedTablesResp(_)) | None => {
680 on_update().await;
681 Ok(())
682 },
683 resp => Err(resp.into()),
684 }
685 });
686
687 let msg = Request {
688 msg_id: self.gen_id(),
689 entity_id: "".to_owned(),
690 client_req: Some(ClientReq::GetHostedTablesReq(GetHostedTablesReq {
691 subscribe: true,
692 })),
693 };
694
695 self.subscribe(&msg, callback).await?;
696 Ok(msg.msg_id)
697 }
698
699 pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ClientResult<()> {
702 let msg = Request {
703 msg_id: self.gen_id(),
704 entity_id: "".to_owned(),
705 client_req: Some(ClientReq::RemoveHostedTablesUpdateReq(
706 RemoveHostedTablesUpdateReq { id: update_id },
707 )),
708 };
709
710 self.unsubscribe(update_id).await?;
711 match self.oneshot(&msg).await? {
712 ClientResp::RemoveHostedTablesUpdateResp(_) => Ok(()),
713 resp => Err(resp.into()),
714 }
715 }
716
717 pub async fn system_info(&self) -> ClientResult<SystemInfo> {
721 let msg = Request {
722 msg_id: self.gen_id(),
723 entity_id: "".to_string(),
724 client_req: Some(ClientReq::ServerSystemInfoReq(ServerSystemInfoReq {})),
725 };
726
727 match self.oneshot(&msg).await? {
728 ClientResp::ServerSystemInfoResp(resp) => {
729 #[cfg(not(target_family = "wasm"))]
730 let timestamp = Some(
731 std::time::SystemTime::now()
732 .duration_since(std::time::UNIX_EPOCH)?
733 .as_millis() as u64,
734 );
735
736 #[cfg(target_family = "wasm")]
737 let timestamp = None;
738
739 #[cfg(feature = "talc-allocator")]
740 let (client_used, client_heap) = {
741 let (client_used, client_heap) = crate::utils::get_used();
742 (Some(client_used as u64), Some(client_heap as u64))
743 };
744
745 #[cfg(not(feature = "talc-allocator"))]
746 let (client_used, client_heap) = (None, None);
747
748 let info = SystemInfo {
749 heap_size: resp.heap_size,
750 used_size: resp.used_size,
751 cpu_time: resp.cpu_time,
752 cpu_time_epoch: resp.cpu_time_epoch,
753 timestamp,
754 client_heap,
755 client_used,
756 };
757
758 Ok(info)
759 },
760 resp => Err(resp.into()),
761 }
762 }
763}