use super::api::*;
use crate::{dql::QueryResult, utils::make_continuation};
use_prelude!();
impl Store {
pub(super) async fn _transaction_with_options<T, E>(
&self,
options: ffi_sdk::BeginTransactionOptions<'_>,
scope: impl AsyncFnOnce(&Transaction) -> Result<T, E>,
) -> Result<T, E>
where
T: core::any::Any,
E: From<DittoError>,
{
let transaction = Transaction::begin(&self.ditto, options).await?;
let result = scope(&transaction).await;
match result {
Ok(return_value) => {
let any = &return_value as &dyn core::any::Any;
let requested_action = match any.downcast_ref::<TransactionCompletionAction>() {
Some(action) => action,
None => &TransactionCompletionAction::Commit,
};
_ = transaction.complete(requested_action).await?;
Ok(return_value)
}
Err(e) => {
let _ = transaction
.complete(&TransactionCompletionAction::Rollback)
.await?;
Err(e)
}
}
}
}
impl Transaction {
fn store_ptr(ditto: &ffi_sdk::Ditto) -> &ffi_sdk::FfiStore {
let ditto_raw = ditto as *const ffi_sdk::Ditto;
let store_raw = ditto_raw as *const ffi_sdk::FfiStore;
unsafe { &*store_raw }
}
pub(super) fn _info(&self) -> TransactionInfo {
let ffi_sdk::FfiCborData(bytes) = ffi_sdk::dittoffi_transaction_info(&self.ptr);
#[derive(Deserialize)]
struct TransactionInfoInner {
pub id: String,
pub hint: Option<String>,
pub is_read_only: bool,
}
let bytes = Box::<[u8]>::from(bytes);
let TransactionInfoInner {
id,
hint,
is_read_only,
} = serde_cbor::de::from_slice(&bytes).unwrap();
TransactionInfo {
id,
hint,
is_read_only,
}
}
pub(super) async fn _execute<Q>(&self, query: Q) -> Result<QueryResult, DittoError>
where
Q: IntoQuery,
Q::Args: serde::Serialize,
{
let (continuation, recv) = make_continuation();
let query = query.into_query()?;
let args = query.args_cbor.as_deref().map(|bytes| bytes.into());
let string = (&*query.string).into();
ffi_sdk::dittoffi_transaction_execute_async_throws(
&self.ptr,
string,
args,
#[allow(clippy::useless_conversion)]
continuation.into(),
);
let query_result = recv.await.unwrap();
let query_result = query_result.into_rust_result()?;
Ok(QueryResult::from(query_result))
}
async fn begin(
ditto: &ffi_sdk::Ditto,
options: ffi_sdk::BeginTransactionOptions<'_>,
) -> Result<Self, DittoError> {
let (continuation, recv) = make_continuation();
ffi_sdk::dittoffi_store_begin_transaction_async_throws(
Transaction::store_ptr(ditto),
options,
#[allow(clippy::useless_conversion)]
continuation.into(),
);
let ffi_result = recv.await.unwrap();
let ptr = ffi_result.into_rust_result()?;
Ok(Transaction { ptr })
}
async fn complete(
self,
action: &TransactionCompletionAction,
) -> Result<TransactionCompletionAction, DittoError> {
let ffi_action = match action {
TransactionCompletionAction::Commit => ffi_sdk::TransactionCompletionAction::Commit,
TransactionCompletionAction::Rollback => ffi_sdk::TransactionCompletionAction::Rollback,
};
let (continuation, recv) = make_continuation();
ffi_sdk::dittoffi_transaction_complete_async_throws(
&self.ptr,
ffi_action,
#[allow(clippy::useless_conversion)]
continuation.into(),
);
let action = recv.await.unwrap();
let action = match action.into_rust_result()? {
ffi_sdk::TransactionCompletionAction::Commit => TransactionCompletionAction::Commit,
ffi_sdk::TransactionCompletionAction::Rollback => TransactionCompletionAction::Rollback,
};
Ok(action)
}
}
#[test]
fn make_oneshot_works() {
let (mut continuation, recv) = make_continuation();
std::thread::spawn(move || {
#[allow(deprecated)]
continuation.call(123, Blocking::UNCHECKED);
});
let val = recv.blocking_recv().unwrap();
assert_eq!(val, 123);
}
#[test]
#[ignore = "this does panic, but the panic handler doesn't catch it"]
#[should_panic]
fn make_oneshot_is_actually_oneshot() {
let (mut continuation, _recv) = make_continuation();
#[allow(deprecated)]
{
continuation.call(123, Blocking::UNCHECKED);
continuation.call(234, Blocking::UNCHECKED);
}
}