rust-query 0.9.0

A query builder using rust concepts.
Documentation
use std::{
    future,
    sync::{Arc, Mutex},
    task::{Poll, Waker},
};

use crate::{Database, Transaction, migrate::Schema};

/// This is an async wrapper for [Database].
///
/// Transaction methods on this type are identical to [Database], but they spawn
/// an async task and return a future to await the transaction completion.
/// Because of this difference, the transaction closures must be `'static`.
///
/// You can easily implement [DatabaseAsync] manually using `tokio::task::spawn_blocking`,
/// but this wrapper is a little bit more efficient while also being runtime agnostic.
pub struct DatabaseAsync<S> {
    inner: Arc<Database<S>>,
}

impl<S> Clone for DatabaseAsync<S> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
        }
    }
}

impl<S: 'static + Send + Sync + Schema> DatabaseAsync<S> {
    /// Create an async wrapper for the [Database].
    ///
    /// The database is wrapped in an [Arc] as it needs to be shared with any thread
    /// executing a transaction. These threads can live longer than the future that
    /// started the transaction.
    ///
    /// By accepting an [Arc], you can keep your own clone of the [Arc] and use
    /// the database synchronously and asynchronously at the same time!
    pub fn new(db: Arc<Database<S>>) -> Self {
        DatabaseAsync { inner: db }
    }

    #[doc = include_str!("database/transaction.md")]
    pub async fn transaction<R: 'static + Send>(
        &self,
        f: impl 'static + Send + FnOnce(&'static Transaction<S>) -> R,
    ) -> R {
        let db = self.inner.clone();
        async_run(move || db.transaction_local(f)).await
    }

    #[doc = include_str!("database/transaction_mut.md")]
    pub async fn transaction_mut<O: 'static + Send, E: 'static + Send>(
        &self,
        f: impl 'static + Send + FnOnce(&'static mut Transaction<S>) -> Result<O, E>,
    ) -> Result<O, E> {
        let db = self.inner.clone();
        async_run(move || db.transaction_mut_local(f)).await
    }

    #[doc = include_str!("database/transaction_mut_ok.md")]
    pub async fn transaction_mut_ok<R: 'static + Send>(
        &self,
        f: impl 'static + Send + FnOnce(&'static mut Transaction<S>) -> R,
    ) -> R {
        self.transaction_mut(|txn| Ok::<R, std::convert::Infallible>(f(txn)))
            .await
            .unwrap()
    }
}

async fn async_run<R: 'static + Send>(f: impl 'static + Send + FnOnce() -> R) -> R {
    pub struct WakeOnDrop {
        waker: Mutex<Waker>,
    }

    impl Drop for WakeOnDrop {
        #[cfg_attr(test, mutants::skip)] // mutating this will make the test hang
        fn drop(&mut self) {
            self.waker.lock().unwrap().wake_by_ref();
        }
    }

    // Initally we use a noop waker, because we will override it anyway.
    let wake_on_drop = Arc::new(WakeOnDrop {
        waker: Mutex::new(Waker::noop().clone()),
    });
    let weak = Arc::downgrade(&wake_on_drop);

    let handle = std::thread::spawn(move || {
        // waker will be called when thread finishes, even with panic.
        let _wake_on_drop = wake_on_drop;
        f()
    });

    // asynchonously wait for the thread to finish
    future::poll_fn(|cx| {
        if let Some(wake_on_drop) = weak.upgrade() {
            wake_on_drop.waker.lock().unwrap().clone_from(cx.waker());
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    })
    .await;

    // we know that the thread is finished, so we block on it
    match handle.join() {
        Ok(val) => val,
        Err(err) => std::panic::resume_unwind(err),
    }
}