endpoint_libs/libs/database/
data_thread.rs

1use eyre::*;
2use futures::future::BoxFuture;
3use futures::FutureExt;
4use postgres_from_row::FromRow;
5use std::any::Any;
6use std::fmt::Debug;
7
8use crate::libs::datatable::RDataTable;
9
10use super::{DatabaseRequest, PooledDbClient};
11
12type DbExecutionRequestType =
13    Box<dyn FnOnce(&PooledDbClient) -> BoxFuture<Box<dyn Any + Sync + Send>> + Send>;
14
15struct DbExecutionQuery {
16    request: DbExecutionRequestType,
17    result: tokio::sync::oneshot::Sender<Box<dyn Any + Sync + Send>>,
18}
19#[derive(Clone)]
20pub struct ThreadedDbClient {
21    tx: tokio::sync::mpsc::Sender<DbExecutionQuery>,
22}
23impl ThreadedDbClient {
24    pub async fn execute<T>(&self, req: T) -> Result<RDataTable<T::ResponseRow>>
25    where
26        T: DatabaseRequest + Sync + Send + Debug + 'static,
27        T::ResponseRow: FromRow + Sync + Send + Clone + Debug + Sized + 'static,
28    {
29        let request: DbExecutionRequestType = Box::new(move |client: &PooledDbClient| {
30            async move {
31                let result = client.execute(req).await;
32                Box::new(result) as _
33            }
34            .boxed()
35        });
36        let (tx, rx) = tokio::sync::oneshot::channel();
37        let query = DbExecutionQuery {
38            request,
39            result: tx,
40        };
41        self.tx
42            .send(query)
43            .await
44            .map_err(|_| eyre!("send failed"))?;
45        let result = rx.await?;
46        let result = result
47            .downcast::<Result<RDataTable<T::ResponseRow>>>()
48            .expect("downcast failed");
49        *result
50    }
51}
52pub fn spawn_thread_db_client(pooled: PooledDbClient) -> Result<ThreadedDbClient> {
53    let (tx, mut rx) = tokio::sync::mpsc::channel(100);
54    let client = ThreadedDbClient { tx };
55    std::thread::spawn(move || {
56        tokio::runtime::Runtime::new()
57            .unwrap()
58            .block_on(async move {
59                let client = pooled;
60                while let Some(x) = rx.recv().await {
61                    let DbExecutionQuery { request, result } = x;
62                    let result1 = request(&client).await;
63                    let _ = result.send(result1);
64                }
65            })
66    });
67    Ok(client)
68}