enfipy_tigerbeetle/
lib.rs

1#![doc(
2    html_logo_url = "https://avatars.githubusercontent.com/u/187310527",
3    html_favicon_url = "https://avatars.githubusercontent.com/u/187310527?s=256"
4)]
5#![forbid(unsafe_code)]
6
7mod id;
8mod reply;
9
10use error::NewClientError;
11use reply::Reply;
12use tokio::sync::oneshot;
13
14use core::{
15    error::{CreateAccountsError, CreateTransfersError, SendError},
16    util::{RawConstPtr, SendAsBytesOwnedSlice, SendOwnedSlice},
17};
18
19pub use core::{self, account, error, transfer, Account, Packet, QueryFilter, Transfer};
20
21pub use self::id::id;
22
23pub struct Client {
24    inner: core::Client<&'static Callbacks>,
25}
26
27struct Callbacks;
28
29struct UserData {
30    reply_sender: oneshot::Sender<Result<Reply, SendError>>,
31    data: SendAsBytesOwnedSlice,
32}
33
34impl Client {
35    pub fn new<A>(cluster_id: u128, address: A) -> Result<Self, NewClientError>
36    where
37        A: AsRef<[u8]>,
38    {
39        Ok(Client {
40            inner: core::Client::with_callback(cluster_id, address, &Callbacks)?,
41        })
42    }
43
44    pub async fn create_accounts<T>(&self, accounts: T) -> Result<(), CreateAccountsError>
45    where
46        T: Into<SendOwnedSlice<Account>>,
47    {
48        let accounts: SendOwnedSlice<Account> = accounts.into();
49        if accounts.is_empty() {
50            return Ok(());
51        }
52        Ok(self
53            .submit(
54                accounts.into_as_bytes(),
55                core::OperationKind::CreateAccounts,
56            )
57            .await?
58            .into_create_accounts()?)
59    }
60
61    pub async fn create_transfers<T>(&self, transfers: T) -> Result<(), CreateTransfersError>
62    where
63        T: Into<SendOwnedSlice<Transfer>>,
64    {
65        let transfers: SendOwnedSlice<Transfer> = transfers.into();
66        if transfers.is_empty() {
67            return Ok(());
68        }
69        Ok(self
70            .submit(
71                transfers.into_as_bytes(),
72                core::OperationKind::CreateTransfers,
73            )
74            .await?
75            .into_create_transfers()?)
76    }
77
78    pub async fn get_account_balances<T>(
79        &self,
80        filter: T,
81    ) -> Result<Vec<account::Balance>, SendError>
82    where
83        T: RawConstPtr<Target = account::Filter> + Send + 'static,
84    {
85        let filter: SendOwnedSlice<account::Filter> = SendOwnedSlice::from_single(filter);
86        self.submit(
87            filter.into_as_bytes(),
88            core::OperationKind::GetAccountBalances,
89        )
90        .await
91        .map(Reply::into_get_account_balances)
92    }
93
94    pub async fn get_account_transfers<T>(&self, filter: T) -> Result<Vec<Transfer>, SendError>
95    where
96        T: RawConstPtr<Target = account::Filter> + Send + 'static,
97    {
98        let filter: SendOwnedSlice<account::Filter> = SendOwnedSlice::from_single(filter);
99        self.submit(
100            filter.into_as_bytes(),
101            core::OperationKind::GetAccountTransfers,
102        )
103        .await
104        .map(Reply::into_get_account_transfers)
105    }
106
107    pub async fn lookup_accounts<T>(&self, ids: T) -> Result<Vec<Account>, SendError>
108    where
109        T: Into<SendOwnedSlice<u128>>,
110    {
111        let ids: SendOwnedSlice<u128> = ids.into();
112        if ids.is_empty() {
113            return Ok(Vec::new());
114        }
115        self.submit(ids.into_as_bytes(), core::OperationKind::LookupAccounts)
116            .await
117            .map(Reply::into_lookup_accounts)
118    }
119
120    pub async fn lookup_transfers<T>(&self, ids: T) -> Result<Vec<Transfer>, SendError>
121    where
122        T: Into<SendOwnedSlice<u128>>,
123    {
124        let ids: SendOwnedSlice<u128> = ids.into();
125        if ids.is_empty() {
126            return Ok(Vec::new());
127        }
128        self.submit(ids.into_as_bytes(), core::OperationKind::LookupTransfers)
129            .await
130            .map(Reply::into_lookup_transfers)
131    }
132
133    pub async fn query_accounts<T>(&self, filter: T) -> Result<Vec<Account>, SendError>
134    where
135        T: RawConstPtr<Target = QueryFilter> + Send + 'static,
136    {
137        let filter: SendOwnedSlice<QueryFilter> = SendOwnedSlice::from_single(filter);
138        self.submit(filter.into_as_bytes(), core::OperationKind::QueryAccounts)
139            .await
140            .map(Reply::into_query_accounts)
141    }
142
143    pub async fn query_transfers<T>(&self, filter: T) -> Result<Vec<Transfer>, SendError>
144    where
145        T: RawConstPtr<Target = QueryFilter> + Send + 'static,
146    {
147        let filter: SendOwnedSlice<QueryFilter> = SendOwnedSlice::from_single(filter);
148        self.submit(filter.into_as_bytes(), core::OperationKind::QueryTransfers)
149            .await
150            .map(Reply::into_query_transfers)
151    }
152
153    async fn submit(
154        &self,
155        data: SendAsBytesOwnedSlice,
156        operation: impl Into<core::Operation>,
157    ) -> Result<Reply, SendError> {
158        let (reply_sender, reply_receiver) = oneshot::channel();
159        let user_data = Box::new(UserData { reply_sender, data });
160        self.inner.submit(Packet::new(user_data, operation));
161        reply_receiver.await.unwrap()
162    }
163}
164
165impl core::Callbacks for Callbacks {
166    type UserDataPtr = Box<UserData>;
167
168    fn completion(&self, packet: Packet<Self::UserDataPtr>, reply: Option<core::Reply<'_>>) {
169        let status = packet.status();
170        let operation = packet.operation();
171        let user_data = packet.into_user_data();
172        // Channel may be closed due to the `Future` cancellation, so the `.send()` error should
173        // be ignored.
174        user_data
175            .reply_sender
176            .send(status.map(|()| {
177                // PANIC: Unwrapping is OK here, because the `reply` can only be `None` when the
178                //        `status` is `Err`.
179                Reply::copy_from_reply(operation.kind(), reply.unwrap().payload)
180            }))
181            .unwrap_or_else(drop);
182    }
183}
184
185impl core::UserData for UserData {
186    fn data(&self) -> &[u8] {
187        self.data.as_ref()
188    }
189}
190
191fn _test_thread_safe(
192    client: Client,
193    accounts: Vec<Account>,
194    transfers: Vec<Transfer>,
195    query_filter: &'static QueryFilter,
196    account_filter: &'static account::Filter,
197    ids: Vec<u128>,
198) {
199    check_thread_safe(async move {
200        client.create_accounts(accounts).await.unwrap();
201        client.create_transfers(transfers).await.unwrap();
202        client.get_account_balances(account_filter).await.unwrap();
203        client.get_account_transfers(account_filter).await.unwrap();
204        client.lookup_accounts(ids.clone()).await.unwrap();
205        client.lookup_transfers(ids).await.unwrap();
206        client.query_accounts(query_filter).await.unwrap();
207        client.query_transfers(query_filter).await.unwrap();
208    });
209
210    fn check_thread_safe<T>(_: T)
211    where
212        T: Send + Sync + 'static,
213    {
214    }
215}