dittolive-ditto 4.13.0

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use super::api::*;
use crate::{dql::QueryResult, utils::make_continuation};

use_prelude!();

impl Store {
    pub(super) async fn _transaction_with_options<T, E>(
        &self,
        options: ffi_sdk::BeginTransactionOptions<'_>,
        scope: impl AsyncFnOnce(&Transaction) -> Result<T, E>,
    ) -> Result<T, E>
    where
        T: core::any::Any,
        E: From<DittoError>,
    {
        let transaction = Transaction::begin(&self.ditto, options).await?;
        let result = scope(&transaction).await;

        match result {
            Ok(return_value) => {
                let any = &return_value as &dyn core::any::Any;

                let requested_action = match any.downcast_ref::<TransactionCompletionAction>() {
                    Some(action) => action,
                    None => &TransactionCompletionAction::Commit,
                };

                _ = transaction.complete(requested_action).await?;
                Ok(return_value)
            }
            Err(e) => {
                let _ = transaction
                    .complete(&TransactionCompletionAction::Rollback)
                    .await?;

                Err(e)
            }
        }
    }
}

impl Transaction {
    /// needed for `begin`, since we can't construct a `Self` until we have a transaction ptr
    fn store_ptr(ditto: &ffi_sdk::Ditto) -> &ffi_sdk::FfiStore {
        let ditto_raw = ditto as *const ffi_sdk::Ditto;
        let store_raw = ditto_raw as *const ffi_sdk::FfiStore;

        // SAFETY: `ffi_sdk::FfiStore` is a `repr(transparent)` wrapper around `ffi_sdk::Ditto`
        // which guarantees that it is safe to transmute between purely from a layout point of
        // view. Additionally, `FfiStore` doesn't enforce extra safety invariants for `Ditto`
        unsafe { &*store_raw }
    }

    pub(super) fn _info(&self) -> TransactionInfo {
        let ffi_sdk::FfiCborData(bytes) = ffi_sdk::dittoffi_transaction_info(&self.ptr);

        /// Hidden inner type to prevent publicly implementing Deserialize
        #[derive(Deserialize)]
        struct TransactionInfoInner {
            pub id: String,
            pub hint: Option<String>,
            pub is_read_only: bool,
        }

        let bytes = Box::<[u8]>::from(bytes);

        let TransactionInfoInner {
            id,
            hint,
            is_read_only,
        } = serde_cbor::de::from_slice(&bytes).unwrap();

        TransactionInfo {
            id,
            hint,
            is_read_only,
        }
    }

    pub(super) async fn _execute<Q>(&self, query: Q) -> Result<QueryResult, DittoError>
    where
        Q: IntoQuery,
        Q::Args: serde::Serialize,
    {
        let (continuation, recv) = make_continuation();

        let query = query.into_query()?;
        let args = query.args_cbor.as_deref().map(|bytes| bytes.into());

        let string = (&*query.string).into();

        ffi_sdk::dittoffi_transaction_execute_async_throws(
            &self.ptr,
            string,
            args,
            #[allow(clippy::useless_conversion)]
            continuation.into(),
        );

        let query_result = recv.await.unwrap();
        let query_result = query_result.into_rust_result()?;

        Ok(QueryResult::from(query_result))
    }

    async fn begin(
        ditto: &ffi_sdk::Ditto,
        options: ffi_sdk::BeginTransactionOptions<'_>,
    ) -> Result<Self, DittoError> {
        let (continuation, recv) = make_continuation();

        ffi_sdk::dittoffi_store_begin_transaction_async_throws(
            Transaction::store_ptr(ditto),
            options,
            #[allow(clippy::useless_conversion)]
            continuation.into(),
        );

        let ffi_result = recv.await.unwrap();
        let ptr = ffi_result.into_rust_result()?;

        Ok(Transaction { ptr })
    }

    async fn complete(
        self,
        action: &TransactionCompletionAction,
    ) -> Result<TransactionCompletionAction, DittoError> {
        let ffi_action = match action {
            TransactionCompletionAction::Commit => ffi_sdk::TransactionCompletionAction::Commit,
            TransactionCompletionAction::Rollback => ffi_sdk::TransactionCompletionAction::Rollback,
        };

        let (continuation, recv) = make_continuation();

        ffi_sdk::dittoffi_transaction_complete_async_throws(
            &self.ptr,
            ffi_action,
            #[allow(clippy::useless_conversion)]
            continuation.into(),
        );

        let action = recv.await.unwrap();
        let action = match action.into_rust_result()? {
            ffi_sdk::TransactionCompletionAction::Commit => TransactionCompletionAction::Commit,
            ffi_sdk::TransactionCompletionAction::Rollback => TransactionCompletionAction::Rollback,
        };

        Ok(action)
    }
}

#[test]
fn make_oneshot_works() {
    let (mut continuation, recv) = make_continuation();
    std::thread::spawn(move || {
        continuation.call(123);
    });

    let val = recv.blocking_recv().unwrap();
    assert_eq!(val, 123);
}

#[test]
#[ignore = "this does panic, but the panic handler doesn't catch it"]
#[should_panic]
fn make_oneshot_is_actually_oneshot() {
    let (mut continuation, _recv) = make_continuation();
    continuation.call(123);
    continuation.call(234);
}