endpoint_libs/libs/database/
data_thread.rs1use eyre::*;
2use futures::future::BoxFuture;
3use futures::FutureExt;
4use postgres_from_row::FromRow;
5use std::any::Any;
6use std::fmt::Debug;
7
8use crate::libs::datatable::RDataTable;
9
10use super::{DatabaseRequest, PooledDbClient};
11
12type DbExecutionRequestType =
13 Box<dyn FnOnce(&PooledDbClient) -> BoxFuture<Box<dyn Any + Sync + Send>> + Send>;
14
15struct DbExecutionQuery {
16 request: DbExecutionRequestType,
17 result: tokio::sync::oneshot::Sender<Box<dyn Any + Sync + Send>>,
18}
19#[derive(Clone)]
20pub struct ThreadedDbClient {
21 tx: tokio::sync::mpsc::Sender<DbExecutionQuery>,
22}
23impl ThreadedDbClient {
24 pub async fn execute<T>(&self, req: T) -> Result<RDataTable<T::ResponseRow>>
25 where
26 T: DatabaseRequest + Sync + Send + Debug + 'static,
27 T::ResponseRow: FromRow + Sync + Send + Clone + Debug + Sized + 'static,
28 {
29 let request: DbExecutionRequestType = Box::new(move |client: &PooledDbClient| {
30 async move {
31 let result = client.execute(req).await;
32 Box::new(result) as _
33 }
34 .boxed()
35 });
36 let (tx, rx) = tokio::sync::oneshot::channel();
37 let query = DbExecutionQuery {
38 request,
39 result: tx,
40 };
41 self.tx
42 .send(query)
43 .await
44 .map_err(|_| eyre!("send failed"))?;
45 let result = rx.await?;
46 let result = result
47 .downcast::<Result<RDataTable<T::ResponseRow>>>()
48 .expect("downcast failed");
49 *result
50 }
51}
52pub fn spawn_thread_db_client(pooled: PooledDbClient) -> Result<ThreadedDbClient> {
53 let (tx, mut rx) = tokio::sync::mpsc::channel(100);
54 let client = ThreadedDbClient { tx };
55 std::thread::spawn(move || {
56 tokio::runtime::Runtime::new()
57 .unwrap()
58 .block_on(async move {
59 let client = pooled;
60 while let Some(x) = rx.recv().await {
61 let DbExecutionQuery { request, result } = x;
62 let result1 = request(&client).await;
63 let _ = result.send(result1);
64 }
65 })
66 });
67 Ok(client)
68}