tigerbeetle_unofficial_core/
lib.rs

1#![doc(
2    html_logo_url = "https://avatars.githubusercontent.com/u/187310527",
3    html_favicon_url = "https://avatars.githubusercontent.com/u/187310527?s=256"
4)]
5#![warn(
6    clippy::match_wildcard_for_single_variants,
7    clippy::wildcard_enum_match_arm
8)]
9
10pub mod account;
11mod callback;
12pub mod error;
13mod packet;
14pub mod query_filter;
15pub mod transfer;
16pub mod util;
17
18use std::{cell::UnsafeCell, marker::PhantomData, mem, num::NonZeroU32, pin::Pin};
19
20use error::{NewClientError, NewClientErrorKind};
21
22pub use account::Account;
23pub use callback::*;
24pub use packet::*;
25pub use query_filter::QueryFilter;
26pub use transfer::Transfer;
27
28type CompletionCallbackRawFn =
29    unsafe extern "C" fn(usize, *mut sys::tb_packet_t, u64, *const u8, u32);
30
31pub struct Client<F>
32where
33    F: CallbacksPtr,
34{
35    raw: Pin<Box<UnsafeCell<sys::tb_client_t>>>,
36    cb: *const F::Target,
37    marker: PhantomData<F>,
38}
39
40unsafe impl<F> Send for Client<F> where F: CallbacksPtr + Send {}
41unsafe impl<F> Sync for Client<F> where F: CallbacksPtr {}
42
43impl<F> Client<F>
44where
45    F: CallbacksPtr,
46{
47    pub fn with_callback<A>(
48        cluster_id: u128,
49        address: A,
50        completion_callback: F,
51    ) -> Result<Self, NewClientError>
52    where
53        A: AsRef<[u8]>,
54        // `F` and `UserDataPtr` are `'static`, because we can `mem::forget(self)`
55        // and drop anything that is being referred from `F` or `UserDataPtr`,
56        // thus invalidating callback or user data.
57        F: 'static,
58        F::UserDataPtr: 'static,
59    {
60        // SAFETY: `F` and `UserDataPtr` are `'static`.
61        unsafe { Client::with_callback_unchecked(cluster_id, address, completion_callback) }
62    }
63
64    /// Highly unsafe method. Please use [`Self::with_callback`]
65    /// unless you are *really sure* you are doing it right.
66    ///
67    /// # Safety
68    ///
69    /// `F` and `U` are unresticted by any lifetime. It's user's responsibility
70    /// to ensure validity of `on_completion` callback or packet's `user_data`
71    /// for client's use. If client is dropped, you can safely invalidate these
72    /// things.
73    pub unsafe fn with_callback_unchecked<A>(
74        cluster_id: u128,
75        address: A,
76        completion_callback: F,
77    ) -> Result<Self, NewClientError>
78    where
79        A: AsRef<[u8]>,
80    {
81        let completion_fn = completion_callback_raw_fn::<F::Target>;
82        let completion_cb = F::into_raw_const_ptr(completion_callback);
83        let completion_ctx = sptr::Strict::expose_addr(completion_cb);
84
85        unsafe fn raw_with_callback(
86            cluster_id: u128,
87            address: &[u8],
88            completion_ctx: usize,
89            completion_callback: CompletionCallbackRawFn,
90        ) -> Result<Pin<Box<UnsafeCell<sys::tb_client_t>>>, NewClientError> {
91            let mut raw = Box::pin(UnsafeCell::new(mem::zeroed()));
92            let status = sys::tb_client_init(
93                raw.as_mut().get_unchecked_mut().get_mut(),
94                cluster_id.to_le_bytes().as_ptr(),
95                address.as_ptr().cast(),
96                address
97                    .len()
98                    .try_into()
99                    .map_err(|_| NewClientErrorKind::AddressInvalid)?,
100                completion_ctx,
101                Some(completion_callback),
102            );
103
104            // SAFETY: Unwrapping is OK here, because the returned `TB_INIT_STATUS` is actually an
105            //         enum with positive discriminant undoubtedly fitting into `u32`.
106            #[allow(clippy::useless_conversion)] // not true for Windows
107            if let Some(c) = NonZeroU32::new(status.try_into().unwrap_unchecked()) {
108                Err(NewClientError(c))
109            } else {
110                Ok(raw)
111            }
112        }
113
114        Ok(Client {
115            raw: unsafe {
116                raw_with_callback(cluster_id, address.as_ref(), completion_ctx, completion_fn)
117                    .inspect_err(|_| drop(F::from_raw_const_ptr(completion_cb)))?
118            },
119            cb: completion_cb,
120            marker: PhantomData,
121        })
122    }
123
124    pub fn submit(&self, mut packet: Packet<F::UserDataPtr>) {
125        use crate::error::SendErrorKind;
126
127        let data = packet.user_data().data();
128        let Ok(data_size) = data.len().try_into() else {
129            packet.set_status(Err(SendErrorKind::TooMuchData.into()));
130            let cb = unsafe { &*self.cb };
131            cb.completion(packet, None);
132            return;
133        };
134        let data = data.as_ptr();
135
136        let raw_packet = packet.raw_mut();
137        raw_packet.data_size = data_size;
138        raw_packet.data = data.cast_mut().cast();
139
140        // SAFETY: Going from `&self` to `*mut sys::tb_client_t` is OK here, because multi-thread
141        //         access is synchronized by the `sys::tb_client_t` itself inside.
142        unsafe {
143            let raw_client: *mut sys::tb_client_t = self.raw.get();
144            // NOTE: We do omit checking the result to be `TB_CLIENT_INVALID` intentionally here,
145            //       because it can be returned only if the `raw_client` is not yet inited, or was
146            //       deinited already, which happens only in constructors and `Drop` respectively.
147            _ = sys::tb_client_submit(raw_client, packet.raw);
148        }
149        mem::forget(packet); // avoid `Drop`ping `Packet`
150    }
151}
152
153/// Blocks until all pending requests finish
154impl<F> Drop for Client<F>
155where
156    F: CallbacksPtr,
157{
158    fn drop(&mut self) {
159        unsafe {
160            let raw_client = self.raw.as_mut().get_unchecked_mut().get_mut();
161            #[cfg(feature = "tokio-rt-multi-thread")]
162            if tokio::runtime::Handle::try_current().is_ok_and(|h| {
163                matches!(
164                    h.runtime_flavor(),
165                    tokio::runtime::RuntimeFlavor::MultiThread
166                )
167            }) {
168                _ = tokio::task::block_in_place(|| sys::tb_client_deinit(raw_client));
169            } else {
170                _ = sys::tb_client_deinit(raw_client);
171            }
172            #[cfg(not(feature = "tokio-rt-multi-thread"))]
173            {
174                _ = sys::tb_client_deinit(raw_client);
175            }
176
177            drop(F::from_raw_const_ptr(self.cb));
178        }
179    }
180}