enfipy_tigerbeetle/
lib.rs1#![doc(
2 html_logo_url = "https://avatars.githubusercontent.com/u/187310527",
3 html_favicon_url = "https://avatars.githubusercontent.com/u/187310527?s=256"
4)]
5#![forbid(unsafe_code)]
6
7mod id;
8mod reply;
9
10use error::NewClientError;
11use reply::Reply;
12use tokio::sync::oneshot;
13
14use core::{
15 error::{CreateAccountsError, CreateTransfersError, SendError},
16 util::{RawConstPtr, SendAsBytesOwnedSlice, SendOwnedSlice},
17};
18
19pub use core::{self, account, error, transfer, Account, Packet, QueryFilter, Transfer};
20
21pub use self::id::id;
22
23pub struct Client {
24 inner: core::Client<&'static Callbacks>,
25}
26
27struct Callbacks;
28
29struct UserData {
30 reply_sender: oneshot::Sender<Result<Reply, SendError>>,
31 data: SendAsBytesOwnedSlice,
32}
33
34impl Client {
35 pub fn new<A>(cluster_id: u128, address: A) -> Result<Self, NewClientError>
36 where
37 A: AsRef<[u8]>,
38 {
39 Ok(Client {
40 inner: core::Client::with_callback(cluster_id, address, &Callbacks)?,
41 })
42 }
43
44 pub async fn create_accounts<T>(&self, accounts: T) -> Result<(), CreateAccountsError>
45 where
46 T: Into<SendOwnedSlice<Account>>,
47 {
48 let accounts: SendOwnedSlice<Account> = accounts.into();
49 if accounts.is_empty() {
50 return Ok(());
51 }
52 Ok(self
53 .submit(
54 accounts.into_as_bytes(),
55 core::OperationKind::CreateAccounts,
56 )
57 .await?
58 .into_create_accounts()?)
59 }
60
61 pub async fn create_transfers<T>(&self, transfers: T) -> Result<(), CreateTransfersError>
62 where
63 T: Into<SendOwnedSlice<Transfer>>,
64 {
65 let transfers: SendOwnedSlice<Transfer> = transfers.into();
66 if transfers.is_empty() {
67 return Ok(());
68 }
69 Ok(self
70 .submit(
71 transfers.into_as_bytes(),
72 core::OperationKind::CreateTransfers,
73 )
74 .await?
75 .into_create_transfers()?)
76 }
77
78 pub async fn get_account_balances<T>(
79 &self,
80 filter: T,
81 ) -> Result<Vec<account::Balance>, SendError>
82 where
83 T: RawConstPtr<Target = account::Filter> + Send + 'static,
84 {
85 let filter: SendOwnedSlice<account::Filter> = SendOwnedSlice::from_single(filter);
86 self.submit(
87 filter.into_as_bytes(),
88 core::OperationKind::GetAccountBalances,
89 )
90 .await
91 .map(Reply::into_get_account_balances)
92 }
93
94 pub async fn get_account_transfers<T>(&self, filter: T) -> Result<Vec<Transfer>, SendError>
95 where
96 T: RawConstPtr<Target = account::Filter> + Send + 'static,
97 {
98 let filter: SendOwnedSlice<account::Filter> = SendOwnedSlice::from_single(filter);
99 self.submit(
100 filter.into_as_bytes(),
101 core::OperationKind::GetAccountTransfers,
102 )
103 .await
104 .map(Reply::into_get_account_transfers)
105 }
106
107 pub async fn lookup_accounts<T>(&self, ids: T) -> Result<Vec<Account>, SendError>
108 where
109 T: Into<SendOwnedSlice<u128>>,
110 {
111 let ids: SendOwnedSlice<u128> = ids.into();
112 if ids.is_empty() {
113 return Ok(Vec::new());
114 }
115 self.submit(ids.into_as_bytes(), core::OperationKind::LookupAccounts)
116 .await
117 .map(Reply::into_lookup_accounts)
118 }
119
120 pub async fn lookup_transfers<T>(&self, ids: T) -> Result<Vec<Transfer>, SendError>
121 where
122 T: Into<SendOwnedSlice<u128>>,
123 {
124 let ids: SendOwnedSlice<u128> = ids.into();
125 if ids.is_empty() {
126 return Ok(Vec::new());
127 }
128 self.submit(ids.into_as_bytes(), core::OperationKind::LookupTransfers)
129 .await
130 .map(Reply::into_lookup_transfers)
131 }
132
133 pub async fn query_accounts<T>(&self, filter: T) -> Result<Vec<Account>, SendError>
134 where
135 T: RawConstPtr<Target = QueryFilter> + Send + 'static,
136 {
137 let filter: SendOwnedSlice<QueryFilter> = SendOwnedSlice::from_single(filter);
138 self.submit(filter.into_as_bytes(), core::OperationKind::QueryAccounts)
139 .await
140 .map(Reply::into_query_accounts)
141 }
142
143 pub async fn query_transfers<T>(&self, filter: T) -> Result<Vec<Transfer>, SendError>
144 where
145 T: RawConstPtr<Target = QueryFilter> + Send + 'static,
146 {
147 let filter: SendOwnedSlice<QueryFilter> = SendOwnedSlice::from_single(filter);
148 self.submit(filter.into_as_bytes(), core::OperationKind::QueryTransfers)
149 .await
150 .map(Reply::into_query_transfers)
151 }
152
153 async fn submit(
154 &self,
155 data: SendAsBytesOwnedSlice,
156 operation: impl Into<core::Operation>,
157 ) -> Result<Reply, SendError> {
158 let (reply_sender, reply_receiver) = oneshot::channel();
159 let user_data = Box::new(UserData { reply_sender, data });
160 self.inner.submit(Packet::new(user_data, operation));
161 reply_receiver.await.unwrap()
162 }
163}
164
165impl core::Callbacks for Callbacks {
166 type UserDataPtr = Box<UserData>;
167
168 fn completion(&self, packet: Packet<Self::UserDataPtr>, reply: Option<core::Reply<'_>>) {
169 let status = packet.status();
170 let operation = packet.operation();
171 let user_data = packet.into_user_data();
172 user_data
175 .reply_sender
176 .send(status.map(|()| {
177 Reply::copy_from_reply(operation.kind(), reply.unwrap().payload)
180 }))
181 .unwrap_or_else(drop);
182 }
183}
184
185impl core::UserData for UserData {
186 fn data(&self) -> &[u8] {
187 self.data.as_ref()
188 }
189}
190
191fn _test_thread_safe(
192 client: Client,
193 accounts: Vec<Account>,
194 transfers: Vec<Transfer>,
195 query_filter: &'static QueryFilter,
196 account_filter: &'static account::Filter,
197 ids: Vec<u128>,
198) {
199 check_thread_safe(async move {
200 client.create_accounts(accounts).await.unwrap();
201 client.create_transfers(transfers).await.unwrap();
202 client.get_account_balances(account_filter).await.unwrap();
203 client.get_account_transfers(account_filter).await.unwrap();
204 client.lookup_accounts(ids.clone()).await.unwrap();
205 client.lookup_transfers(ids).await.unwrap();
206 client.query_accounts(query_filter).await.unwrap();
207 client.query_transfers(query_filter).await.unwrap();
208 });
209
210 fn check_thread_safe<T>(_: T)
211 where
212 T: Send + Sync + 'static,
213 {
214 }
215}