cn_tigerbeetle/
lib.rs

1//! The official TigerBeetle client for Rust.
2//!
3//! This is a client library for the [TigerBeetle] financial database.
4//! To use, create a [`Client`] and call its methods to make requests.
5//!
6//! The client presents an async interface, but does not depend on a specific
7//! Rust async runtime. Instead it contains its own off-thread event loop,
8//! shared by all official TigerBeetle clients. Thus it should integrate
9//! seamlessly into any Rust codebase.
10//!
11//! The cost of this though is that it does link to a non-Rust static library
12//! (called `tb_client`), and it does need to context switch between threads for
13//! every request. The native linking should be handled seamlessly on all
14//! supported platforms, and the context switching overhead is expected to be
15//! low compared to the cost of networking and disk I/O.
16//!
17//! [TigerBeetle]: https://tigerbeetle.com
18//!
19//!
20//! # Example
21//!
22//! ```no_run
23//! use tigerbeetle as tb;
24//!
25//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26//! // Connect to TigerBeetle
27//! let client = tb::Client::new(0, "127.0.0.1:3000")?;
28//!
29//! // Create accounts
30//! let account_id1 = tb::id();
31//! let account_id2 = tb::id();
32//!
33//! let accounts = [
34//!     tb::Account {
35//!         id: account_id1,
36//!         ledger: 1,
37//!         code: 1,
38//!         flags: tb::AccountFlags::History,
39//!         ..Default::default()
40//!     },
41//!     tb::Account {
42//!         id: account_id2,
43//!         ledger: 1,
44//!         code: 1,
45//!         flags: tb::AccountFlags::History,
46//!         ..Default::default()
47//!     },
48//! ];
49//!
50//! let account_results = client.create_accounts(&accounts).await?;
51//!
52//! // If no results are returned, then all input events were successful -
53//! // to save resources only unsuccessful inputs return results.
54//! assert_eq!(account_results.len(), 0);
55//!
56//! // Create a transfer between accounts
57//! let transfer_id = tb::id();
58//! let transfers = [tb::Transfer {
59//!     id: transfer_id,
60//!     debit_account_id: account_id1,
61//!     credit_account_id: account_id2,
62//!     amount: 100,
63//!     ledger: 1,
64//!     code: 1,
65//!     ..Default::default()
66//! }];
67//!
68//! let transfer_results = client.create_transfers(&transfers).await?;
69//! assert_eq!(transfer_results.len(), 0);
70//!
71//! // Look up the accounts to see the transfer result
72//! let accounts = client.lookup_accounts(&[account_id1, account_id2]).await?;
73//! let account1 = accounts[0];
74//! let account2 = accounts[1];
75//!
76//! assert_eq!(account1.id, account_id1);
77//! assert_eq!(account2.id, account_id2);
78//! assert_eq!(account1.debits_posted, 100);
79//! assert_eq!(account2.credits_posted, 100);
80//! # Ok(())
81//! # }
82//! ```
83//!
84//!
85//! # Request batching
86//!
87//! Most transaction and query operations support multiple events of the same
88//! type at once (this can be seen in the request method signatures accepting
89//! slices of their input types) and it is strongly recommended to submit many
90//! events in a single request at once as TigerBeetle will only reach its
91//! performance limits when events are received in large batches. The client
92//! _does_ implement its own internal batching and will attempt to create them
93//! efficiently, but it is more efficient for applications to create their own
94//! batches based on understanding of their own architectural needs and
95//! limitations.
96//!
97//! In TigerBeetle's standard build-time configuration **the maximum number of
98//! events per batch is 8189**. If the events in a request exceed this number
99//! its future will return [`PacketStatus::TooMuchData`].
100//!
101//!
102//! # Range query limits
103//!
104//! TigerBeetle's range queries, [`get_account_transfers`],
105//! [`get_account_balances`], [`query_accounts`] and [`query_transfers`], also
106//! have a limit to how many results they return.
107//!
108//! In TigerBeetle's standard build-time configuration **the maximum number of
109//! results returned is 8189**.
110//!
111//! If the server returns a full batch for a range query, then further results
112//! can be paged by incrementing `timeout_max` to one greater than the highest
113//! timeout returned in the previous batch, and issuing a new query with
114//! otherwise the same filter. This process can be repeated until the server
115//! returns an unfull batch.
116//!
117//! [`get_account_transfers`]: `Client::get_account_transfers`
118//! [`get_account_balances`]: `Client::get_account_balances`
119//! [`query_accounts`]: `Client::query_accounts`
120//! [`query_transfers`]: `Client::query_transfers`
121//!
122//! Here is an example of paging to get started with:
123//!
124//! ```no_run
125//! use tigerbeetle as tb;
126//! use futures::{stream, Stream};
127//!
128//! fn get_account_transfers_paged(
129//!     client: &tb::Client,
130//!     event: tb::AccountFilter,
131//! ) -> impl Stream<Item = Result<Vec<tb::Transfer>, tb::PacketStatus>> + '_ {
132//!     assert!(
133//!         event.limit > 1,
134//!         "paged queries should use an explicit limit"
135//!     );
136//!
137//!     enum State {
138//!         Start,
139//!         Continue(u64),
140//!         End,
141//!     }
142//!
143//!     let is_reverse = event.flags.contains(tb::AccountFilterFlags::Reversed);
144//!
145//!     futures::stream::unfold(State::Start, move |state| async move {
146//!         let event = match state {
147//!             State::Start => event,
148//!             State::Continue(timestamp_begin) => {
149//!                 if !is_reverse {
150//!                     tb::AccountFilter {
151//!                         timestamp_min: timestamp_begin,
152//!                         ..event
153//!                     }
154//!                 } else {
155//!                     tb::AccountFilter {
156//!                         timestamp_max: timestamp_begin,
157//!                         ..event
158//!                     }
159//!                 }
160//!             }
161//!             State::End => return None,
162//!         };
163//!         let result_next = client.get_account_transfers(event).await;
164//!         match result_next {
165//!             Ok(result_next) => {
166//!                 let result_len = u32::try_from(result_next.len()).expect("u32");
167//!                 let must_page = result_len == event.limit;
168//!                 if must_page {
169//!                     let timestamp_first = result_next.first().expect("item").timestamp;
170//!                     let timestamp_last = result_next.last().expect("item").timestamp;
171//!                     let (timestamp_begin_next, should_continue) = if !is_reverse {
172//!                         assert!(timestamp_first < timestamp_last);
173//!                         let timestamp_begin_next = timestamp_last.checked_add(1).expect("overflow");
174//!                         assert_ne!(timestamp_begin_next, u64::MAX);
175//!                         let should_continue =
176//!                             timestamp_begin_next <= event.timestamp_max || event.timestamp_max == 0;
177//!                         (timestamp_begin_next, should_continue)
178//!                     } else {
179//!                         assert!(timestamp_first > timestamp_last);
180//!                         let timestamp_begin_next = timestamp_last.checked_sub(1).expect("overflow");
181//!                         assert_ne!(timestamp_begin_next, 0);
182//!                         let should_continue =
183//!                             timestamp_begin_next >= event.timestamp_min || event.timestamp_min == 0;
184//!                         (timestamp_begin_next, should_continue)
185//!                     };
186//!                     if should_continue {
187//!                         Some((Ok(result_next), State::Continue(timestamp_begin_next)))
188//!                     } else {
189//!                         Some((Ok(result_next), State::End))
190//!                     }
191//!                 } else {
192//!                     Some((Ok(result_next), State::End))
193//!                 }
194//!             }
195//!             Err(result_next) => Some((Err(result_next), State::End)),
196//!         }
197//!     })
198//! }
199//! ```
200//!
201//!
202//! # Response futures and client lifetime considerations
203//!
204//! Responses to requests are returned as [`Future`]s. It is not strictly
205//! necessary for applications to `await` these futures &mdash; requests are
206//! enqueued as soon as the request method is called and will be executed even
207//! if the future is dropped.
208//!
209//! It is possible to drop a `Client` while request futures are still
210//! outstanding. In this case any pending requests will be completed with
211//! [`PacketStatus::ClientShutdown`]. Request futures may resolve to successful
212//! results even after the client is closed.
213//!
214//! When `Client` is dropped without calling [`close`],
215//! it will shutdown correctly, but some of that work happens
216//! off-thread after the drop completes.
217//!
218//! For orderly shutdown, it is recommended to await all
219//! request futures prior to destroying the client,
220//! and to destroy the client by calling `close` and awaiting
221//! its return value.
222//!
223//! [`close`]: Client::close
224//!
225//!
226//! # Concurrency and multithreading
227//!
228//! Multiple requests may be submitted concurrently from a single client; the
229//! results of which are returned as futures whose Rust lifetimes are tied to
230//! the `Client`. The server only supports one in-flight request per client
231//! though, so the client will internally buffer concurrent requests. To truly
232//! have multiple requests in flight concurrently, multiple clients can be
233//! created, though note that there is a hard-coded limit on how many clients
234//! can be connected to the server simultaneously.
235//!
236//! The `Client` type implements `Send` and `Sync` and may be used in parallel
237//! across multiple threads or async tasks, e.g. by placing it into an [`Arc`].
238//! In some cases this may be useful because it allows the client to leverage
239//! its internal request batching to batch events from multiple threads (or
240//! tasks), but otherwise it provides no performance advantage.
241//!
242//! [`Arc`]: `std::sync::Arc`
243//!
244//!
245//! # TigerBeetle time-based identifiers
246//!
247//! Accounts and transfers must have globally unique identifiers. The generation
248//! of these is application-specific, and any scheme that guarantees unique IDs
249//! will work. Barring other constraints, TigerBeetle recommends using
250//! [TigerBeetle time-based identifiers][tbid]. This crate provides an
251//! implementation in the [`id`] function.
252//!
253//! For additional considerations when choosing an ID scheme
254//! see [the TigerBeetle documentation on data modeling][tbdataid].
255//!
256//! [tbid]: https://docs.tigerbeetle.com/coding/data-modeling/#tigerbeetle-time-based-identifiers-recommended
257//! [tbdataid]: https://docs.tigerbeetle.com/coding/data-modeling/#id
258//!
259//!
260//! # Use in non-async codebases
261//!
262//! The TigerBeetle client is async-only, but if you're working in a synchronous
263//! codebase, you can use [`futures::executor::block_on`] to run async operations
264//! to completion.
265//!
266//! [`futures::executor::block_on`]: https://docs.rs/futures/latest/futures/executor/fn.block_on.html
267//!
268//! ```no_run
269//! use futures::executor::block_on;
270//! use tigerbeetle as tb;
271//!
272//! fn synchronous_function() -> Result<(), Box<dyn std::error::Error>> {
273//!     block_on(async {
274//!         let client = tb::Client::new(0, "127.0.0.1:3000")?;
275//!
276//!         let accounts = [tb::Account {
277//!             id: tb::id(),
278//!             ledger: 1,
279//!             code: 1,
280//!             ..Default::default()
281//!         }];
282//!
283//!         let results = client.create_accounts(&accounts).await?;
284//!
285//!         Ok(())
286//!     })
287//! }
288//! ```
289//!
290//! Note that `block_on` will block the current thread until the async operation
291//! completes, so this approach works best for simple use cases or when you need
292//! to integrate TigerBeetle into an existing synchronous application.
293//!
294//!
295//! # Rust structure binary representation and the TigerBeetle protocol
296//!
297//! Many types in this library are ABI-compatible with the underlying protocol
298//! definition and can be cast (unsafely) directly to and from byte buffers
299//! on all supported platforms, though this should not be required for typical
300//! application purposes.
301//!
302//! The protocol-compatible types are:
303//!
304//! - [`Account`] and [`AccountFlags`]
305//! - [`Transfer`] and [`TransferFlags`]
306//! - [`AccountBalance`]
307//! - [`AccountFilter`] and [`AccountFilterFlags`]
308//! - [`QueryFilter`] and [`QueryFilterFlags`]
309//!
310//! Note that status enums are not ABI-compatible with the protocol's status codes
311//! and must be converted with [`TryFrom`].
312//!
313//!
314//! # References
315//!
316//! [The TigerBeetle Reference](https://docs.tigerbeetle.com/reference/).
317
318use bitflags::bitflags;
319use futures_channel::oneshot::{channel, Receiver};
320
321use std::convert::Infallible;
322use std::future::Future;
323use std::os::raw::{c_char, c_void};
324use std::{fmt, mem, ptr};
325
326// The generated bindings.
327// These are not part of the public API but are re-exported hidden
328// so that the vortex driver can parse the TB protocol directly.
329#[allow(unused)]
330#[allow(non_upper_case_globals)]
331#[allow(non_camel_case_types)]
332#[allow(non_snake_case)]
333#[rustfmt::skip]
334#[doc(hidden)]
335pub mod tb_client;
336
337use tb_client as tbc;
338
339mod conversions;
340mod time_based_id;
341
342pub use time_based_id::id;
343
344/// The tb_client completion context is unused by the Rust bindings.
345/// This is just a magic number to jump out of logs.
346const COMPLETION_CONTEXT: usize = 0xAB;
347
348/// The TigerBeetle client.
349pub struct Client {
350    client: *mut tbc::tb_client_t,
351}
352
353unsafe impl Send for Client {}
354unsafe impl Sync for Client {}
355
356impl Client {
357    /// Create a new TigerBeetle client.
358    ///
359    /// # Addresses
360    ///
361    /// The `addresses` argument is a comma-separated string of addresses, where
362    /// each may be either an IP4 address, a port number, or the pair of IP4
363    /// address and port number separated by a colon. Examples include
364    /// `127.0.0.1`, `3001`, `127.0.0.1:3001` and
365    /// `127.0.0.1,3002,127.0.0.1:3003`. The default IP address is `127.0.0.1`
366    /// and default port is `3001`.
367    ///
368    /// This is the same address format supported by the TigerBeetle CLI.
369    ///
370    /// # References
371    ///
372    /// [Client Sessions](https://docs.tigerbeetle.com/reference/sessions/).
373    pub fn new(cluster_id: u128, addresses: &str) -> Result<Client, InitStatus> {
374        assert_abi_compatibility();
375
376        unsafe {
377            let tb_client = Box::new(tbc::tb_client_t {
378                opaque: Default::default(),
379            });
380            let tb_client = Box::into_raw(tb_client);
381            let status = tbc::tb_client_init(
382                tb_client,
383                &cluster_id.to_le_bytes(),
384                addresses.as_ptr() as *const c_char,
385                addresses.len() as u32,
386                COMPLETION_CONTEXT,
387                Some(on_completion),
388            );
389            if status == tbc::TB_INIT_STATUS_TB_INIT_SUCCESS {
390                Ok(Client { client: tb_client })
391            } else {
392                Err(status.into())
393            }
394        }
395    }
396
397    /// Create one or more accounts.
398    ///
399    /// Accounts to create are provided as a slice of input [`Account`] events.
400    /// Their fields must be initialized as described in the corresponding
401    /// [protocol reference](#protocol-reference).
402    ///
403    /// The request is queued for submission prior to return of this function;
404    /// dropping the returned [`Future`] will not cancel the request.
405    ///
406    /// # Interpreting the return value
407    ///
408    /// This function has two levels of errors: if the entire request fails then
409    /// the future returns [`Err`] of [`PacketStatus`] and the caller should assume
410    /// that none of the submitted events were processed.
411    ///
412    /// The results of events are represented individually. There are two
413    /// related event result types: `CreateAccountResult` is the enum of
414    /// possible outcomes, while `CreateAccountsResult` includes the index to
415    /// map back to input events.
416    ///
417    /// _This function does not return a result for all input events_. Instead
418    /// it only returns results that would not be [`CreateAccountResult::Ok`].
419    /// In other words, this function does not return results for successful
420    /// events, only unsuccessful events (though note the case of
421    /// [`CreateAccountResult::Exists`], described below). This behavior
422    /// reflects optimizations in the underlying protocol. This client will
423    /// never return a `CreateAccountResult::Ok`; that variant is defined in
424    /// case it is useful for clients to materialize omitted request results. To
425    /// relate a `CreateAccountsResult` to its input event, the
426    /// [`CreateAccountsResult::index`] field is an index into the input event
427    /// slice. An example of efficiently materializing all results is included
428    /// below.
429    ///
430    /// Note that a result of `CreateAccountResult::Exists` should often be treated
431    /// the same as `CreateAccountResult::Ok`. This result can happen in cases of
432    /// application crashes or other scenarios where requests have been replayed.
433    ///
434    /// # Example
435    ///
436    /// ```no_run
437    /// use tigerbeetle as tb;
438    ///
439    /// async fn make_create_accounts_request(
440    ///     client: &tb::Client,
441    ///     accounts: &[tb::Account],
442    /// ) -> Result<(), Box<dyn std::error::Error>> {
443    ///     let create_accounts_results = client.create_accounts(accounts).await?;
444    ///     let create_accounts_results_merged = merge_create_accounts_results(accounts, create_accounts_results);
445    ///     for (account, create_account_result) in create_accounts_results_merged {
446    ///         match create_account_result {
447    ///             tb::CreateAccountResult::Ok | tb::CreateAccountResult::Exists => {
448    ///                 handle_create_account_success(account, create_account_result).await?;
449    ///             }
450    ///             _ => {
451    ///                 handle_create_account_failure(account, create_account_result).await?;
452    ///             }
453    ///         }
454    ///     }
455    ///     Ok(())
456    /// }
457    ///
458    /// fn merge_create_accounts_results(
459    ///     accounts: &[tb::Account],
460    ///     results: Vec<tb::CreateAccountsResult>,
461    /// ) -> impl Iterator<Item = (&tb::Account, tb::CreateAccountResult)> + '_ {
462    ///     let mut results = results.into_iter().peekable();
463    ///     accounts.iter().enumerate().map(move |(i, account)| {
464    ///         match results.peek().copied() {
465    ///             Some(result) if result.index == i => {
466    ///                 let _ = results.next();
467    ///                 (account, result.result)
468    ///             }
469    ///             _ => (account, tb::CreateAccountResult::Ok),
470    ///         }
471    ///     })
472    /// }
473    ///
474    /// # async fn handle_create_account_success(
475    /// #     _account: &tb::Account,
476    /// #     _result: tb::CreateAccountResult,
477    /// # ) -> Result<(), Box<dyn std::error::Error>> {
478    /// #     Ok(())
479    /// # }
480    /// #
481    /// # async fn handle_create_account_failure(
482    /// #     _account: &tb::Account,
483    /// #     _result: tb::CreateAccountResult,
484    /// # ) -> Result<(), Box<dyn std::error::Error>> {
485    /// #     Ok(())
486    /// # }
487    /// ```
488    ///
489    /// # Maximum batch size
490    ///
491    /// If the length of the `events` argument exceeds the maximum batch size
492    /// the future will return [`Err`] of [`PacketStatus::TooMuchData`]. In
493    /// TigerBeetle's standard build-time configuration the maximum batch size
494    /// is 8189.
495    ///
496    /// # Protocol reference
497    ///
498    /// [`create_accounts`](https://docs.tigerbeetle.com/reference/requests/create_accounts).
499    pub fn create_accounts(
500        &self,
501        events: &[Account],
502    ) -> impl Future<Output = Result<Vec<CreateAccountsResult>, PacketStatus>> {
503        let (packet, rx) =
504            create_packet::<Account>(tbc::TB_OPERATION_TB_OPERATION_CREATE_ACCOUNTS, events);
505
506        unsafe {
507            let status = tbc::tb_client_submit(self.client, Box::into_raw(packet));
508            assert_eq!(status, tbc::TB_CLIENT_STATUS_TB_CLIENT_OK);
509        }
510
511        async {
512            let msg = rx.await.expect("channel");
513
514            let responses: &[tbc::tb_create_accounts_result_t] = handle_message(&msg)?;
515
516            Ok(responses
517                .iter()
518                .map(|result| CreateAccountsResult {
519                    index: usize::try_from(result.index).expect("usize"),
520                    result: CreateAccountResult::from(result.result),
521                })
522                .collect())
523        }
524    }
525
526    /// Create one or more transfers.
527    ///
528    /// Transfers to create are provided as a slice of input [`Transfer`] events.
529    /// Their fields must be initialized as described in the corresponding
530    /// [protocol reference](#protocol-reference).
531    ///
532    /// The request is queued for submission prior to return of this function;
533    /// dropping the returned [`Future`] will not cancel the request.
534    ///
535    /// # Interpreting the return value
536    ///
537    /// This function has two levels of errors: if the entire request fails then
538    /// the future returns [`Err`] of [`PacketStatus`] and the caller should assume
539    /// that none of the submitted events were processed.
540    ///
541    /// The results of events are represented individually. There are two
542    /// related event result types: `CreateTransferResult` is the enum of
543    /// possible outcomes, while `CreateTransfersResult` includes the index to
544    /// map back to input events.
545    ///
546    /// _This function does not return a result for all input events_. Instead
547    /// it only returns results that would not be [`CreateTransferResult::Ok`].
548    /// In other words, this function does not return results for successful
549    /// events, only unsuccessful events (though note the case of
550    /// [`CreateTransferResult::Exists`], described below). This behavior
551    /// reflects optimizations in the underlying protocol. This client will
552    /// never return a `CreateTransferResult::Ok`; that variant is defined in
553    /// case it is useful for clients to materialize omitted request results. To
554    /// relate a `CreateTransfersResult` to its input event, the
555    /// [`CreateTransfersResult::index`] field is an index into the input event
556    /// slice. An example of efficiently materializing all results is included
557    /// below.
558    ///
559    /// To relate a `CreateTransfersResult` to its input event, the [`CreateTransfersResult::index`] field
560    /// is an index into the input event slice.
561    ///
562    /// Note that a result of `CreateTransferResult::Exists` should often be treated
563    /// the same as `CreateTransferResult::Ok`. This result can happen in cases of
564    /// application crashes or other scenarios where requests have been replayed.
565    ///
566    /// # Example
567    ///
568    /// ```no_run
569    /// use tigerbeetle as tb;
570    ///
571    /// async fn make_create_transfers_request(
572    ///     client: &tb::Client,
573    ///     transfers: &[tb::Transfer],
574    /// ) -> Result<(), Box<dyn std::error::Error>> {
575    ///     let create_transfers_results = client.create_transfers(transfers).await?;
576    ///     let create_transfers_results_merged = merge_create_transfers_results(transfers, create_transfers_results);
577    ///     for (transfer, create_transfer_result) in create_transfers_results_merged {
578    ///         match create_transfer_result {
579    ///             tb::CreateTransferResult::Ok | tb::CreateTransferResult::Exists => {
580    ///                 handle_create_transfer_success(transfer, create_transfer_result).await?;
581    ///             }
582    ///             _ => {
583    ///                 handle_create_transfer_failure(transfer, create_transfer_result).await?;
584    ///             }
585    ///         }
586    ///     }
587    ///     Ok(())
588    /// }
589    ///
590    /// fn merge_create_transfers_results(
591    ///     transfers: &[tb::Transfer],
592    ///     results: Vec<tb::CreateTransfersResult>,
593    /// ) -> impl Iterator<Item = (&tb::Transfer, tb::CreateTransferResult)> + '_ {
594    ///     let mut results = results.into_iter().peekable();
595    ///     transfers.iter().enumerate().map(move |(i, transfer)| {
596    ///         match results.peek().copied() {
597    ///             Some(result) if result.index == i => {
598    ///                 let _ = results.next();
599    ///                 (transfer, result.result)
600    ///             }
601    ///             _ => (transfer, tb::CreateTransferResult::Ok),
602    ///         }
603    ///     })
604    /// }
605    ///
606    /// # async fn handle_create_transfer_success(
607    /// #     _transfer: &tb::Transfer,
608    /// #     _result: tb::CreateTransferResult,
609    /// # ) -> Result<(), Box<dyn std::error::Error>> {
610    /// #     Ok(())
611    /// # }
612    /// #
613    /// # async fn handle_create_transfer_failure(
614    /// #     _transfer: &tb::Transfer,
615    /// #     _result: tb::CreateTransferResult,
616    /// # ) -> Result<(), Box<dyn std::error::Error>> {
617    /// #     Ok(())
618    /// # }
619    /// ```
620    ///
621    /// # Maximum batch size
622    ///
623    /// If the length of the `events` argument exceeds the maximum batch size
624    /// the future will return [`Err`] of [`PacketStatus::TooMuchData`]. In
625    /// TigerBeetle's standard build-time configuration the maximum batch size
626    /// is 8189.
627    ///
628    /// # Protocol reference
629    ///
630    /// [`create_transfers`](https://docs.tigerbeetle.com/reference/requests/create_transfers).
631    pub fn create_transfers(
632        &self,
633        events: &[Transfer],
634    ) -> impl Future<Output = Result<Vec<CreateTransfersResult>, PacketStatus>> {
635        let (packet, rx) =
636            create_packet::<Transfer>(tbc::TB_OPERATION_TB_OPERATION_CREATE_TRANSFERS, events);
637
638        unsafe {
639            let status = tbc::tb_client_submit(self.client, Box::into_raw(packet));
640            assert_eq!(status, tbc::TB_CLIENT_STATUS_TB_CLIENT_OK);
641        }
642
643        async {
644            let msg = rx.await.expect("channel");
645
646            let responses: &[tbc::tb_create_transfers_result_t] = handle_message(&msg)?;
647
648            Ok(responses
649                .iter()
650                .map(|result| CreateTransfersResult {
651                    index: usize::try_from(result.index).expect("usize"),
652                    result: CreateTransferResult::from(result.result),
653                })
654                .collect())
655        }
656    }
657
658    /// Query individual accounts.
659    ///
660    /// The request is queued for submission prior to return of this function;
661    /// dropping the returned future will not cancel the request.
662    ///
663    /// # Interpreting the return value
664    ///
665    /// This function has two levels of errors: if the entire request fails then
666    /// the future returns [`Err`] of [`PacketStatus`] and the caller should assume
667    /// that none of the submitted events were processed.
668    ///
669    /// This request returns the found accounts, in the order requested. The
670    /// return value does not indicate which accounts were not found. Those can
671    /// be determined by comparing the output results to the input events,
672    /// example provided below.
673    ///
674    /// # Example
675    ///
676    /// ```no_run
677    /// use tigerbeetle as tb;
678    ///
679    /// async fn make_lookup_accounts_request(
680    ///     client: &tb::Client,
681    ///     accounts: &[u128],
682    /// ) -> Result<(), Box<dyn std::error::Error>> {
683    ///     let lookup_accounts_results = client.lookup_accounts(accounts).await?;
684    ///     let lookup_accounts_results_merged = merge_lookup_accounts_results(accounts, lookup_accounts_results);
685    ///     for (account_id, maybe_account) in lookup_accounts_results_merged {
686    ///         match maybe_account {
687    ///             Some(account) => {
688    ///                 handle_lookup_accounts_success(account).await?;
689    ///             }
690    ///             None => {
691    ///                 handle_lookup_accounts_failure(account_id).await?;
692    ///             }
693    ///         }
694    ///     }
695    ///     Ok(())
696    /// }
697    ///
698    /// fn merge_lookup_accounts_results(
699    ///     accounts: &[u128],
700    ///     results: Vec<tb::Account>,
701    /// ) -> impl Iterator<Item = (u128, Option<tb::Account>)> + '_ {
702    ///     let mut results = results.into_iter().peekable();
703    ///     accounts.iter().map(move |&id| match results.peek() {
704    ///         Some(acc) if acc.id == id => (id, results.next()),
705    ///         _ => (id, None),
706    ///     })
707    /// }
708    ///
709    /// # async fn handle_lookup_accounts_success(
710    /// #     _account: tb::Account,
711    /// # ) -> Result<(), Box<dyn std::error::Error>> {
712    /// #     Ok(())
713    /// # }
714    /// #
715    /// # async fn handle_lookup_accounts_failure(
716    /// #     _account_id: u128,
717    /// # ) -> Result<(), Box<dyn std::error::Error>> {
718    /// #     Ok(())
719    /// # }
720    /// ```
721    ///
722    /// # Maximum batch size
723    ///
724    /// If the length of the `events` argument exceeds the maximum batch size
725    /// the future will return [`Err`] of [`PacketStatus::TooMuchData`]. In
726    /// TigerBeetle's standard build-time configuration the maximum batch size
727    /// is 8189.
728    ///
729    /// # Errors
730    ///
731    /// This request has two levels of errors: if the entire request fails then
732    /// the future returns [`Err`] of [`PacketStatus`] and the caller can assume
733    /// that none of the submitted events were processed; if the request was
734    /// processed, then each event may possibly be [`NotFound`].
735    ///
736    /// # Protocol reference
737    ///
738    /// [`lookup_accounts`](https://docs.tigerbeetle.com/reference/requests/lookup_accounts).
739    pub fn lookup_accounts(
740        &self,
741        events: &[u128],
742    ) -> impl Future<Output = Result<Vec<Account>, PacketStatus>> {
743        let (packet, rx) =
744            create_packet::<u128>(tbc::TB_OPERATION_TB_OPERATION_LOOKUP_ACCOUNTS, events);
745
746        unsafe {
747            let status = tbc::tb_client_submit(self.client, Box::into_raw(packet));
748            assert_eq!(status, tbc::TB_CLIENT_STATUS_TB_CLIENT_OK);
749        }
750
751        async {
752            let msg = rx.await.expect("channel");
753            let responses: &[Account] = handle_message(&msg)?;
754            Ok(Vec::from(responses))
755        }
756    }
757
758    /// Query individual transfers.
759    ///
760    /// The request is queued for submission prior to return of this function;
761    /// dropping the returned future will not cancel the request.
762    ///
763    /// # Maximum batch size
764    ///
765    /// If the length of the `events` argument exceeds the maximum batch size
766    /// the future will return [`Err`] of [`PacketStatus::TooMuchData`]. In
767    /// TigerBeetle's standard build-time configuration the maximum batch size
768    /// is 8189.
769    ///
770    /// # Errors
771    ///
772    /// This request has two levels of errors: if the entire request fails then
773    /// the future returns [`Err`] of [`PacketStatus`] and the caller can assume
774    /// that none of the submitted events were processed; if the request was
775    /// processed, then each event may possibly be [`NotFound`].
776    ///
777    /// # Example
778    ///
779    /// ```
780    /// use tigerbeetle as tb;
781    ///
782    /// async fn make_lookup_transfers_request(
783    ///     client: &tb::Client,
784    ///     transfers: &[u128],
785    /// ) -> Result<(), Box<dyn std::error::Error>> {
786    ///     let lookup_transfers_results = client.lookup_transfers(transfers).await?;
787    ///     let lookup_transfers_results_merged = merge_lookup_transfers_results(transfers, lookup_transfers_results);
788    ///     for (transfer_id, maybe_transfer) in lookup_transfers_results_merged {
789    ///         match maybe_transfer {
790    ///             Some(transfer) => {
791    ///                 handle_lookup_transfers_success(transfer).await?;
792    ///             }
793    ///             None => {
794    ///                 handle_lookup_transfers_failure(transfer_id).await?;
795    ///             }
796    ///         }
797    ///     }
798    ///     Ok(())
799    /// }
800    ///
801    /// fn merge_lookup_transfers_results(
802    ///     transfers: &[u128],
803    ///     results: Vec<tb::Transfer>,
804    /// ) -> impl Iterator<Item = (u128, Option<tb::Transfer>)> + '_ {
805    ///     let mut results = results.into_iter().peekable();
806    ///     transfers.iter().map(move |&id| match results.peek() {
807    ///         Some(transfer) if transfer.id == id => (id, results.next()),
808    ///         _ => (id, None),
809    ///     })
810    /// }
811    ///
812    /// # async fn handle_lookup_transfers_success(
813    /// #     _transfer: tb::Transfer,
814    /// # ) -> Result<(), Box<dyn std::error::Error>> {
815    /// #     Ok(())
816    /// # }
817    /// #
818    /// # async fn handle_lookup_transfers_failure(
819    /// #     _transfer_id: u128,
820    /// # ) -> Result<(), Box<dyn std::error::Error>> {
821    /// #     Ok(())
822    /// # }
823    /// ```
824    ///
825    /// # Protocol reference
826    ///
827    /// [`lookup_transfers`](https://docs.tigerbeetle.com/reference/requests/lookup_transfers).
828    pub fn lookup_transfers(
829        &self,
830        events: &[u128],
831    ) -> impl Future<Output = Result<Vec<Transfer>, PacketStatus>> {
832        let (packet, rx) =
833            create_packet::<u128>(tbc::TB_OPERATION_TB_OPERATION_LOOKUP_TRANSFERS, events);
834
835        unsafe {
836            let status = tbc::tb_client_submit(self.client, Box::into_raw(packet));
837            assert_eq!(status, tbc::TB_CLIENT_STATUS_TB_CLIENT_OK);
838        }
839
840        async {
841            let msg = rx.await.expect("channel");
842            let responses: &[Transfer] = handle_message(&msg)?;
843            Ok(Vec::from(responses))
844        }
845    }
846
847    /// Query multiple transfers for a single account.
848    ///
849    /// The request is queued for submission prior to return of this function;
850    /// dropping the returned future will not cancel the request.
851    ///
852    /// # Errors
853    ///
854    /// If the entire request fails then the future returns [`Err`] of [`PacketStatus`].
855    ///
856    /// # Protocol reference
857    ///
858    /// [`get_account_transfers`](https://docs.tigerbeetle.com/reference/requests/get_account_transfers).
859    pub fn get_account_transfers(
860        &self,
861        event: AccountFilter,
862    ) -> impl Future<Output = Result<Vec<Transfer>, PacketStatus>> {
863        let (packet, rx) = create_packet::<AccountFilter>(
864            tbc::TB_OPERATION_TB_OPERATION_GET_ACCOUNT_TRANSFERS,
865            &[event],
866        );
867
868        unsafe {
869            let status = tbc::tb_client_submit(self.client, Box::into_raw(packet));
870            assert_eq!(status, tbc::TB_CLIENT_STATUS_TB_CLIENT_OK);
871        }
872
873        async {
874            let msg = rx.await.expect("channel");
875            let result: &[Transfer] = handle_message(&msg)?;
876
877            Ok(result.to_vec())
878        }
879    }
880
881    /// Query historical account balances for a single account.
882    ///
883    /// The request is queued for submission prior to return of this function;
884    /// dropping the returned future will not cancel the request.
885    ///
886    /// # Errors
887    ///
888    /// If the entire request fails then the future returns [`Err`] of [`PacketStatus`].
889    ///
890    /// # Protocol reference
891    ///
892    /// [`get_account_balances`](https://docs.tigerbeetle.com/reference/requests/get_account_balances).
893    pub fn get_account_balances(
894        &self,
895        event: AccountFilter,
896    ) -> impl Future<Output = Result<Vec<AccountBalance>, PacketStatus>> {
897        let (packet, rx) = create_packet::<AccountFilter>(
898            tbc::TB_OPERATION_TB_OPERATION_GET_ACCOUNT_BALANCES,
899            &[event],
900        );
901
902        unsafe {
903            let status = tbc::tb_client_submit(self.client, Box::into_raw(packet));
904            assert_eq!(status, tbc::TB_CLIENT_STATUS_TB_CLIENT_OK);
905        }
906
907        async {
908            let msg = rx.await.expect("channel");
909            let result: &[AccountBalance] = handle_message(&msg)?;
910
911            Ok(result.to_vec())
912        }
913    }
914
915    /// Query multiple accounts related by fields and timestamps.
916    ///
917    /// The request is queued for submission prior to return of this function;
918    /// dropping the returned future will not cancel the request.
919    ///
920    /// # Errors
921    ///
922    /// If the entire request fails then the future returns [`Err`] of [`PacketStatus`].
923    ///
924    /// # Protocol reference
925    ///
926    /// [`query_accounts`](https://docs.tigerbeetle.com/reference/requests/query_accounts).
927    pub fn query_accounts(
928        &self,
929        event: QueryFilter,
930    ) -> impl Future<Output = Result<Vec<Account>, PacketStatus>> {
931        let (packet, rx) =
932            create_packet::<QueryFilter>(tbc::TB_OPERATION_TB_OPERATION_QUERY_ACCOUNTS, &[event]);
933
934        unsafe {
935            let status = tbc::tb_client_submit(self.client, Box::into_raw(packet));
936            assert_eq!(status, tbc::TB_CLIENT_STATUS_TB_CLIENT_OK);
937        }
938
939        async {
940            let msg = rx.await.expect("channel");
941            let result: &[Account] = handle_message(&msg)?;
942
943            Ok(result.to_vec())
944        }
945    }
946
947    /// Query multiple transfers related by fields and timestamps.
948    ///
949    /// The request is queued for submission prior to return of this function;
950    /// dropping the returned future will not cancel the request.
951    ///
952    /// # Errors
953    ///
954    /// If the entire request fails then the future returns [`Err`] of [`PacketStatus`].
955    ///
956    /// # Protocol reference
957    ///
958    /// [`query_transfers`](https://docs.tigerbeetle.com/reference/requests/query_transfers).
959    pub fn query_transfers(
960        &self,
961        event: QueryFilter,
962    ) -> impl Future<Output = Result<Vec<Transfer>, PacketStatus>> {
963        let (packet, rx) =
964            create_packet::<QueryFilter>(tbc::TB_OPERATION_TB_OPERATION_QUERY_TRANSFERS, &[event]);
965
966        unsafe {
967            let status = tbc::tb_client_submit(self.client, Box::into_raw(packet));
968            assert_eq!(status, tbc::TB_CLIENT_STATUS_TB_CLIENT_OK);
969        }
970
971        async {
972            let msg = rx.await.expect("channel");
973            let result: &[Transfer] = handle_message(&msg)?;
974
975            Ok(result.to_vec())
976        }
977    }
978
979    /// Close the client and asynchronously wait for completion.
980    ///
981    /// Note that it is not required for correctness to call this method &mdash;
982    /// `Client`'s destructor will correctly shut down the client, though
983    /// without providing the ability to wait for shutdown.
984    ///
985    /// Calling `close` will cancel any pending requests. This is only possible
986    /// if the futures for those requests were dropped without awaiting them.
987    pub fn close(mut self) -> impl Future<Output = ()> {
988        struct SendClient(*mut tbc::tb_client_t);
989        unsafe impl Send for SendClient {}
990
991        let client = std::mem::replace(&mut self.client, std::ptr::null_mut());
992        let client = SendClient(client);
993
994        let (tx, rx) = channel::<Infallible>();
995
996        std::thread::spawn(move || {
997            let client = client;
998            unsafe {
999                // This is a blocking function so we're calling it offthread.
1000                let status = tbc::tb_client_deinit(client.0);
1001                assert_eq!(status, tbc::TB_CLIENT_STATUS_TB_CLIENT_OK);
1002                std::mem::drop(Box::from_raw(client.0));
1003            }
1004            drop(tx);
1005        });
1006
1007        async {
1008            // wait for the channel to close
1009            let _ = rx.await;
1010        }
1011    }
1012}
1013
1014impl Drop for Client {
1015    fn drop(&mut self) {
1016        if !self.client.is_null() {
1017            let close_future = Client {
1018                client: self.client,
1019            }
1020            .close();
1021            // NB: Rust 1.68 clippy - specifically - want's an explicit drop for this future.
1022            drop(close_future);
1023        }
1024    }
1025}
1026
1027impl fmt::Debug for Client {
1028    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1029        f.write_str("Client")
1030    }
1031}
1032
1033/// Make basic assertions about the ABI of our types.
1034///
1035/// We don't actually use some of the C types at all,
1036/// instead casting directly to hand-written Rust types.
1037///
1038/// These assertions give us some confidence those types
1039/// might be possibly correct.
1040fn assert_abi_compatibility() {
1041    assert_eq!(
1042        std::mem::size_of::<Account>(),
1043        std::mem::size_of::<tbc::tb_account_t>()
1044    );
1045    assert_eq!(
1046        std::mem::align_of::<Account>(),
1047        std::mem::align_of::<tbc::tb_account_t>()
1048    );
1049    assert_eq!(
1050        std::mem::size_of::<AccountFlags>(),
1051        std::mem::size_of::<tbc::TB_ACCOUNT_FLAGS>()
1052    );
1053    assert_eq!(
1054        std::mem::align_of::<AccountFlags>(),
1055        std::mem::align_of::<tbc::TB_ACCOUNT_FLAGS>()
1056    );
1057    assert_eq!(
1058        std::mem::size_of::<Transfer>(),
1059        std::mem::size_of::<tbc::tb_transfer_t>()
1060    );
1061    assert_eq!(
1062        std::mem::align_of::<Transfer>(),
1063        std::mem::align_of::<tbc::tb_transfer_t>()
1064    );
1065    assert_eq!(
1066        std::mem::size_of::<TransferFlags>(),
1067        std::mem::size_of::<tbc::TB_TRANSFER_FLAGS>()
1068    );
1069    assert_eq!(
1070        std::mem::align_of::<TransferFlags>(),
1071        std::mem::align_of::<tbc::TB_TRANSFER_FLAGS>()
1072    );
1073    assert_eq!(
1074        std::mem::size_of::<AccountFilter>(),
1075        std::mem::size_of::<tbc::tb_account_filter_t>()
1076    );
1077    assert_eq!(
1078        std::mem::align_of::<AccountFilter>(),
1079        std::mem::align_of::<tbc::tb_account_filter_t>()
1080    );
1081    assert_eq!(
1082        std::mem::size_of::<AccountFilterFlags>(),
1083        std::mem::size_of::<tbc::TB_ACCOUNT_FILTER_FLAGS>()
1084    );
1085    assert_eq!(
1086        std::mem::align_of::<AccountFilterFlags>(),
1087        std::mem::align_of::<tbc::TB_ACCOUNT_FILTER_FLAGS>()
1088    );
1089    assert_eq!(
1090        std::mem::size_of::<AccountBalance>(),
1091        std::mem::size_of::<tbc::tb_account_balance_t>()
1092    );
1093    assert_eq!(
1094        std::mem::align_of::<AccountBalance>(),
1095        std::mem::align_of::<tbc::tb_account_balance_t>()
1096    );
1097    assert_eq!(
1098        std::mem::size_of::<QueryFilter>(),
1099        std::mem::size_of::<tbc::tb_query_filter_t>()
1100    );
1101    assert_eq!(
1102        std::mem::align_of::<QueryFilter>(),
1103        std::mem::align_of::<tbc::tb_query_filter_t>()
1104    );
1105    assert_eq!(
1106        std::mem::size_of::<QueryFilterFlags>(),
1107        std::mem::size_of::<tbc::TB_QUERY_FILTER_FLAGS>()
1108    );
1109    assert_eq!(
1110        std::mem::align_of::<QueryFilterFlags>(),
1111        std::mem::align_of::<tbc::TB_QUERY_FILTER_FLAGS>()
1112    );
1113}
1114
1115/// A TigerBeetle account.
1116///
1117/// # Protocol reference
1118///
1119/// [`Account`](https://docs.tigerbeetle.com/reference/account/).
1120#[repr(C)]
1121#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
1122pub struct Account {
1123    pub id: u128,
1124    pub debits_pending: u128,
1125    pub debits_posted: u128,
1126    pub credits_pending: u128,
1127    pub credits_posted: u128,
1128    pub user_data_128: u128,
1129    pub user_data_64: u64,
1130    pub user_data_32: u32,
1131    pub reserved: Reserved<4>,
1132    pub ledger: u32,
1133    pub code: u16,
1134    pub flags: AccountFlags,
1135    pub timestamp: u64,
1136}
1137
1138bitflags! {
1139    /// Bitflags for the `flags` field of [`Account`].
1140    ///
1141    /// See the [`bitflags` crate](https://docs.rs/bitflags) for an explanation of Rust bitflags.
1142    ///
1143    /// # Protocol reference
1144    ///
1145    /// [`Account.flags`](https://docs.tigerbeetle.com/reference/account/#flags).
1146    #[repr(transparent)]
1147    #[derive(Copy, Clone, Debug, Default)]
1148    #[derive(Eq, PartialEq, Ord, PartialOrd, Hash)]
1149    pub struct AccountFlags: u16 {
1150        const None = 0;
1151        const Linked = tbc::TB_ACCOUNT_FLAGS_TB_ACCOUNT_LINKED;
1152        const DebitsMustNotExceedCredits = tbc::TB_ACCOUNT_FLAGS_TB_ACCOUNT_DEBITS_MUST_NOT_EXCEED_CREDITS;
1153        const CreditsMustNotExceedDebits = tbc::TB_ACCOUNT_FLAGS_TB_ACCOUNT_CREDITS_MUST_NOT_EXCEED_DEBITS;
1154        const History = tbc::TB_ACCOUNT_FLAGS_TB_ACCOUNT_HISTORY;
1155        const Imported = tbc::TB_ACCOUNT_FLAGS_TB_ACCOUNT_IMPORTED;
1156        const Closed = tbc::TB_ACCOUNT_FLAGS_TB_ACCOUNT_CLOSED;
1157    }
1158}
1159
1160/// A transfer between accounts.
1161///
1162/// # Protocol reference
1163///
1164/// [`Transfer`](https://docs.tigerbeetle.com/reference/transfer).
1165#[repr(C)]
1166#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
1167pub struct Transfer {
1168    pub id: u128,
1169    pub debit_account_id: u128,
1170    pub credit_account_id: u128,
1171    pub amount: u128,
1172    pub pending_id: u128,
1173    pub user_data_128: u128,
1174    pub user_data_64: u64,
1175    pub user_data_32: u32,
1176    pub timeout: u32,
1177    pub ledger: u32,
1178    pub code: u16,
1179    pub flags: TransferFlags,
1180    pub timestamp: u64,
1181}
1182
1183bitflags! {
1184    /// Bitflags for the `flags` field of [`Transfer`].
1185    ///
1186    /// See the [`bitflags` crate](https://docs.rs/bitflags) for an explanation of Rust bitflags.
1187    ///
1188    /// # Protocol reference
1189    ///
1190    /// [`Transfer.flags`](https://docs.tigerbeetle.com/reference/transfer/#flags).
1191    #[repr(transparent)]
1192    #[derive(Copy, Clone, Debug, Default)]
1193    #[derive(Eq, PartialEq, Ord, PartialOrd, Hash)]
1194    pub struct TransferFlags: u16 {
1195        const Linked = tbc::TB_TRANSFER_FLAGS_TB_TRANSFER_LINKED;
1196        const Pending = tbc::TB_TRANSFER_FLAGS_TB_TRANSFER_PENDING;
1197        const PostPendingTransfer = tbc::TB_TRANSFER_FLAGS_TB_TRANSFER_POST_PENDING_TRANSFER;
1198        const VoidPendingTransfer = tbc::TB_TRANSFER_FLAGS_TB_TRANSFER_VOID_PENDING_TRANSFER;
1199        const BalancingDebit = tbc::TB_TRANSFER_FLAGS_TB_TRANSFER_BALANCING_DEBIT;
1200        const BalancingCredit = tbc::TB_TRANSFER_FLAGS_TB_TRANSFER_BALANCING_CREDIT;
1201        const ClosingDebit = tbc::TB_TRANSFER_FLAGS_TB_TRANSFER_CLOSING_DEBIT;
1202        const ClosingCredit = tbc::TB_TRANSFER_FLAGS_TB_TRANSFER_CLOSING_CREDIT;
1203        const Imported = tbc::TB_TRANSFER_FLAGS_TB_TRANSFER_IMPORTED;
1204    }
1205}
1206
1207/// Filter for querying transfers and historical balances.
1208///
1209/// # Protocol reference
1210///
1211/// [`AccountFilter`](https://docs.tigerbeetle.com/reference/account-filter).
1212#[repr(C)]
1213#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
1214pub struct AccountFilter {
1215    pub account_id: u128,
1216    pub user_data_128: u128,
1217    pub user_data_64: u64,
1218    pub user_data_32: u32,
1219    pub code: u16,
1220    pub reserved: Reserved<58>,
1221    pub timestamp_min: u64,
1222    pub timestamp_max: u64,
1223    pub limit: u32,
1224    pub flags: AccountFilterFlags,
1225}
1226
1227bitflags! {
1228    /// Bitflags for the `flags` field of [`AccountFilter`].
1229    ///
1230    /// See the [`bitflags` crate](https://docs.rs/bitflags) for an explanation of Rust bitflags.
1231    ///
1232    /// # Protocol reference
1233    ///
1234    /// [`AccountFilter.flags`](https://docs.tigerbeetle.com/reference/account-filter/#flags).
1235    #[repr(transparent)]
1236    #[derive(Copy, Clone, Debug, Default)]
1237    #[derive(Eq, PartialEq, Ord, PartialOrd, Hash)]
1238    pub struct AccountFilterFlags: u32 {
1239        const Debits = tbc::TB_ACCOUNT_FILTER_FLAGS_TB_ACCOUNT_FILTER_DEBITS;
1240        const Credits = tbc::TB_ACCOUNT_FILTER_FLAGS_TB_ACCOUNT_FILTER_CREDITS;
1241        const Reversed = tbc::TB_ACCOUNT_FILTER_FLAGS_TB_ACCOUNT_FILTER_REVERSED;
1242    }
1243}
1244
1245/// An account balance at a point in time.
1246///
1247/// # Protocol reference
1248///
1249/// [`AccountBalance`](https://docs.tigerbeetle.com/reference/account-balance/).
1250#[repr(C)]
1251#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
1252pub struct AccountBalance {
1253    pub debits_pending: u128,
1254    pub debits_posted: u128,
1255    pub credits_pending: u128,
1256    pub credits_posted: u128,
1257    pub timestamp: u64,
1258    pub reserved: Reserved<56>,
1259}
1260
1261/// Parameters for querying accounts and transfers.
1262///
1263/// # Protocol reference
1264///
1265/// [`QueryFilter`](https://docs.tigerbeetle.com/reference/query-filter/).
1266#[repr(C)]
1267#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
1268pub struct QueryFilter {
1269    pub user_data_128: u128,
1270    pub user_data_64: u64,
1271    pub user_data_32: u32,
1272    pub ledger: u32,
1273    pub code: u16,
1274    pub reserved: Reserved<6>,
1275    pub timestamp_min: u64,
1276    pub timestamp_max: u64,
1277    pub limit: u32,
1278    pub flags: QueryFilterFlags,
1279}
1280
1281bitflags! {
1282    /// Bitflags for the `flags` field of [`QueryFilter`].
1283    ///
1284    /// See the [`bitflags` crate](https://docs.rs/bitflags) for an explanation of Rust bitflags.
1285    ///
1286    /// # Protocol reference
1287    ///
1288    /// [`QueryFilter.flags`](https://docs.tigerbeetle.com/reference/query-filter/#flags).
1289    #[repr(transparent)]
1290    #[derive(Copy, Clone, Debug, Default)]
1291    #[derive(Eq, PartialEq, Ord, PartialOrd, Hash)]
1292    pub struct QueryFilterFlags: u32 {
1293        const Reversed = tbc::TB_QUERY_FILTER_FLAGS_TB_QUERY_FILTER_REVERSED;
1294    }
1295}
1296
1297/// The result of a single [`create_accounts`] event.
1298///
1299/// For the meaning of individual enum variants see the linked protocol reference.
1300///
1301/// See also [`CreateAccountsResult`] (note the plural), the type directly
1302/// returned by `create_accunts`, and which contains an additional index for
1303/// relating results with input events.
1304///
1305/// [`create_accounts`]: `Client::create_accounts`
1306///
1307/// # Protocol reference
1308///
1309/// [`CreateAccountResult`](https://docs.tigerbeetle.com/reference/requests/create_accounts/#result).
1310#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1311#[non_exhaustive]
1312pub enum CreateAccountResult {
1313    Ok,
1314    LinkedEventFailed,
1315    LinkedEventChainOpen,
1316    ImportedEventExpected,
1317    ImportedEventNotExpected,
1318    TimestampMustBeZero,
1319    ImportedEventTimestampOutOfRange,
1320    ImportedEventTimestampMustNotAdvance,
1321    ReservedField,
1322    ReservedFlag,
1323    IdMustNotBeZero,
1324    IdMustNotBeIntMax,
1325    ExistsWithDifferentFlags,
1326    ExistsWithDifferentUserData128,
1327    ExistsWithDifferentUserData64,
1328    ExistsWithDifferentUserData32,
1329    ExistsWithDifferentLedger,
1330    ExistsWithDifferentCode,
1331    Exists,
1332    FlagsAreMutuallyExclusive,
1333    DebitsPendingMustBeZero,
1334    DebitsPostedMustBeZero,
1335    CreditsPendingMustBeZero,
1336    CreditsPostedMustBeZero,
1337    LedgerMustNotBeZero,
1338    CodeMustNotBeZero,
1339    ImportedEventTimestampMustNotRegress,
1340}
1341
1342/// The result of a single [`create_accounts`] event, with index.
1343///
1344/// [`create_accounts`]: `Client::create_accounts`
1345///
1346/// # Protocol reference
1347///
1348/// [`CreateAccountResult`](https://docs.tigerbeetle.com/reference/requests/create_accounts/#result).
1349#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1350pub struct CreateAccountsResult {
1351    pub index: usize,
1352    pub result: CreateAccountResult,
1353}
1354
1355impl core::fmt::Display for CreateAccountResult {
1356    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
1357        match self {
1358            Self::Ok => f.write_str("ok"),
1359            Self::LinkedEventFailed => f.write_str("linked event failed"),
1360            Self::LinkedEventChainOpen => f.write_str("linked event chain open"),
1361            Self::ImportedEventExpected => f.write_str("imported event expected"),
1362            Self::ImportedEventNotExpected => f.write_str("imported event not expected"),
1363            Self::TimestampMustBeZero => f.write_str("timestamp must be zero"),
1364            Self::ImportedEventTimestampOutOfRange => {
1365                f.write_str("imported event timestamp out of range")
1366            }
1367            Self::ImportedEventTimestampMustNotAdvance => {
1368                f.write_str("imported event timestamp must not advance")
1369            }
1370            Self::ReservedField => f.write_str("reserved field"),
1371            Self::ReservedFlag => f.write_str("reserved flag"),
1372            Self::IdMustNotBeZero => f.write_str("id must not be zero"),
1373            Self::IdMustNotBeIntMax => f.write_str("id must not be int max"),
1374            Self::ExistsWithDifferentFlags => f.write_str("exists with different flags"),
1375            Self::ExistsWithDifferentUserData128 => {
1376                f.write_str("exists with different user_data_128")
1377            }
1378            Self::ExistsWithDifferentUserData64 => {
1379                f.write_str("exists with different user_data_64")
1380            }
1381            Self::ExistsWithDifferentUserData32 => {
1382                f.write_str("exists with different user_data_32")
1383            }
1384            Self::ExistsWithDifferentLedger => f.write_str("exists with different ledger"),
1385            Self::ExistsWithDifferentCode => f.write_str("exists with different code"),
1386            Self::Exists => f.write_str("exists"),
1387            Self::FlagsAreMutuallyExclusive => f.write_str("flags are mutually exclusive"),
1388            Self::DebitsPendingMustBeZero => f.write_str("debits_pending must be zero"),
1389            Self::DebitsPostedMustBeZero => f.write_str("debits_posted must be zero"),
1390            Self::CreditsPendingMustBeZero => f.write_str("credits_pending must be zero"),
1391            Self::CreditsPostedMustBeZero => f.write_str("credits_posted must be zero"),
1392            Self::LedgerMustNotBeZero => f.write_str("ledger must not be zero"),
1393            Self::CodeMustNotBeZero => f.write_str("code must not be zero"),
1394            Self::ImportedEventTimestampMustNotRegress => {
1395                f.write_str("imported event timestamp must not regress")
1396            }
1397        }
1398    }
1399}
1400
1401/// The result of a single [`create_transfers`] event.
1402///
1403/// For the meaning of individual enum variants see the linked protocol reference.
1404///
1405/// See also [`CreateTransfersResult`] (note the plural), the type directly
1406/// returned by `create_accunts`, and which contains an additional index for
1407/// relating results with input events.
1408///
1409/// [`create_transfers`]: `Client::create_transfers`
1410///
1411/// # Protocol reference
1412///
1413/// [`CreateTransferResult`](https://docs.tigerbeetle.com/reference/requests/create_transfers/#result).
1414#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1415#[non_exhaustive]
1416pub enum CreateTransferResult {
1417    Ok,
1418    LinkedEventFailed,
1419    LinkedEventChainOpen,
1420    ImportedEventExpected,
1421    ImportedEventNotExpected,
1422    TimestampMustBeZero,
1423    ImportedEventTimestampOutOfRange,
1424    ImportedEventTimestampMustNotAdvance,
1425    ReservedFlag,
1426    IdMustNotBeZero,
1427    IdMustNotBeIntMax,
1428    ExistsWithDifferentFlags,
1429    ExistsWithDifferentPendingId,
1430    ExistsWithDifferentTimeout,
1431    ExistsWithDifferentDebitAccountId,
1432    ExistsWithDifferentCreditAccountId,
1433    ExistsWithDifferentAmount,
1434    ExistsWithDifferentUserData128,
1435    ExistsWithDifferentUserData64,
1436    ExistsWithDifferentUserData32,
1437    ExistsWithDifferentLedger,
1438    ExistsWithDifferentCode,
1439    Exists,
1440    IdAlreadyFailed,
1441    FlagsAreMutuallyExclusive,
1442    DebitAccountIdMustNotBeZero,
1443    DebitAccountIdMustNotBeIntMax,
1444    CreditAccountIdMustNotBeZero,
1445    CreditAccountIdMustNotBeIntMax,
1446    AccountsMustBeDifferent,
1447    PendingIdMustBeZero,
1448    PendingIdMustNotBeZero,
1449    PendingIdMustNotBeIntMax,
1450    PendingIdMustBeDifferent,
1451    TimeoutReservedForPendingTransfer,
1452    ClosingTransferMustBePending,
1453    LedgerMustNotBeZero,
1454    CodeMustNotBeZero,
1455    DebitAccountNotFound,
1456    CreditAccountNotFound,
1457    AccountsMustHaveTheSameLedger,
1458    TransferMustHaveTheSameLedgerAsAccounts,
1459    PendingTransferNotFound,
1460    PendingTransferNotPending,
1461    PendingTransferHasDifferentDebitAccountId,
1462    PendingTransferHasDifferentCreditAccountId,
1463    PendingTransferHasDifferentLedger,
1464    PendingTransferHasDifferentCode,
1465    ExceedsPendingTransferAmount,
1466    PendingTransferHasDifferentAmount,
1467    PendingTransferAlreadyPosted,
1468    PendingTransferAlreadyVoided,
1469    PendingTransferExpired,
1470    ImportedEventTimestampMustNotRegress,
1471    ImportedEventTimestampMustPostdateDebitAccount,
1472    ImportedEventTimestampMustPostdateCreditAccount,
1473    ImportedEventTimeoutMustBeZero,
1474    DebitAccountAlreadyClosed,
1475    CreditAccountAlreadyClosed,
1476    OverflowsDebitsPending,
1477    OverflowsCreditsPending,
1478    OverflowsDebitsPosted,
1479    OverflowsCreditsPosted,
1480    OverflowsDebits,
1481    OverflowsCredits,
1482    OverflowsTimeout,
1483    ExceedsCredits,
1484    ExceedsDebits,
1485}
1486
1487/// The result of a single [`create_transfers`] event, with index.
1488///
1489/// [`create_transfers`]: `Client::create_transfers`
1490///
1491/// # Protocol reference
1492///
1493/// [`CreateTransferResult`](https://docs.tigerbeetle.com/reference/requests/create_transfers/#result).
1494#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1495pub struct CreateTransfersResult {
1496    pub index: usize,
1497    pub result: CreateTransferResult,
1498}
1499
1500impl core::fmt::Display for CreateTransferResult {
1501    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
1502        match self {
1503            Self::Ok => f.write_str("ok"),
1504            Self::LinkedEventFailed => f.write_str("linked event failed"),
1505            Self::LinkedEventChainOpen => f.write_str("linked event chain open"),
1506            Self::ImportedEventExpected => f.write_str("imported event expected"),
1507            Self::ImportedEventNotExpected => f.write_str("imported event not expected"),
1508            Self::TimestampMustBeZero => f.write_str("timestamp must be zero"),
1509            Self::ImportedEventTimestampOutOfRange => {
1510                f.write_str("imported event timestamp out of range")
1511            }
1512            Self::ImportedEventTimestampMustNotAdvance => {
1513                f.write_str("imported event timestamp must not advance")
1514            }
1515            Self::ReservedFlag => f.write_str("reserved flag"),
1516            Self::IdMustNotBeZero => f.write_str("id must not be zero"),
1517            Self::IdMustNotBeIntMax => f.write_str("id must not be int max"),
1518            Self::ExistsWithDifferentFlags => f.write_str("exists with different flags"),
1519            Self::ExistsWithDifferentPendingId => f.write_str("exists with different pending_id"),
1520            Self::ExistsWithDifferentTimeout => f.write_str("exists with different timeout"),
1521            Self::ExistsWithDifferentDebitAccountId => {
1522                f.write_str("exists with different debit_account_id")
1523            }
1524            Self::ExistsWithDifferentCreditAccountId => {
1525                f.write_str("exists with different credit_account_id")
1526            }
1527            Self::ExistsWithDifferentAmount => f.write_str("exists with different amount"),
1528            Self::ExistsWithDifferentUserData128 => {
1529                f.write_str("exists with different user_data_128")
1530            }
1531            Self::ExistsWithDifferentUserData64 => {
1532                f.write_str("exists with different user_data_64")
1533            }
1534            Self::ExistsWithDifferentUserData32 => {
1535                f.write_str("exists with different user_data_32")
1536            }
1537            Self::ExistsWithDifferentLedger => f.write_str("exists with different ledger"),
1538            Self::ExistsWithDifferentCode => f.write_str("exists with different code"),
1539            Self::Exists => f.write_str("exists"),
1540            Self::IdAlreadyFailed => f.write_str("id already failed"),
1541            Self::FlagsAreMutuallyExclusive => f.write_str("flags are mutually exclusive"),
1542            Self::DebitAccountIdMustNotBeZero => f.write_str("debit_account_id must not be zero"),
1543            Self::DebitAccountIdMustNotBeIntMax => {
1544                f.write_str("debit_account_id must not be int max")
1545            }
1546            Self::CreditAccountIdMustNotBeZero => f.write_str("credit_account_id must not be zero"),
1547            Self::CreditAccountIdMustNotBeIntMax => {
1548                f.write_str("credit_account_id must not be int max")
1549            }
1550            Self::AccountsMustBeDifferent => f.write_str("accounts must be different"),
1551            Self::PendingIdMustBeZero => f.write_str("pending_id must be zero"),
1552            Self::PendingIdMustNotBeZero => f.write_str("pending_id must not be zero"),
1553            Self::PendingIdMustNotBeIntMax => f.write_str("pending_id must not be int max"),
1554            Self::PendingIdMustBeDifferent => f.write_str("pending_id must be different"),
1555            Self::TimeoutReservedForPendingTransfer => {
1556                f.write_str("timeout reserved for pending transfer")
1557            }
1558            Self::ClosingTransferMustBePending => f.write_str("closing transfer must be pending"),
1559            Self::LedgerMustNotBeZero => f.write_str("ledger must not be zero"),
1560            Self::CodeMustNotBeZero => f.write_str("code must not be zero"),
1561            Self::DebitAccountNotFound => f.write_str("debit account not found"),
1562            Self::CreditAccountNotFound => f.write_str("credit account not found"),
1563            Self::AccountsMustHaveTheSameLedger => {
1564                f.write_str("accounts must have the same ledger")
1565            }
1566            Self::TransferMustHaveTheSameLedgerAsAccounts => {
1567                f.write_str("transfer must have the same ledger as accounts")
1568            }
1569            Self::PendingTransferNotFound => f.write_str("pending transfer not found"),
1570            Self::PendingTransferNotPending => f.write_str("pending transfer not pending"),
1571            Self::PendingTransferHasDifferentDebitAccountId => {
1572                f.write_str("pending transfer has different debit_account_id")
1573            }
1574            Self::PendingTransferHasDifferentCreditAccountId => {
1575                f.write_str("pending transfer has different credit_account_id")
1576            }
1577            Self::PendingTransferHasDifferentLedger => {
1578                f.write_str("pending transfer has different ledger")
1579            }
1580            Self::PendingTransferHasDifferentCode => {
1581                f.write_str("pending transfer has different code")
1582            }
1583            Self::ExceedsPendingTransferAmount => f.write_str("exceeds pending transfer amount"),
1584            Self::PendingTransferHasDifferentAmount => {
1585                f.write_str("pending transfer has different amount")
1586            }
1587            Self::PendingTransferAlreadyPosted => f.write_str("pending transfer already posted"),
1588            Self::PendingTransferAlreadyVoided => f.write_str("pending transfer already voided"),
1589            Self::PendingTransferExpired => f.write_str("pending transfer expired"),
1590            Self::ImportedEventTimestampMustNotRegress => {
1591                f.write_str("imported event timestamp must not regress")
1592            }
1593            Self::ImportedEventTimestampMustPostdateDebitAccount => {
1594                f.write_str("imported event timestamp must postdate debit account")
1595            }
1596            Self::ImportedEventTimestampMustPostdateCreditAccount => {
1597                f.write_str("imported event timestamp must postdate credit account")
1598            }
1599            Self::ImportedEventTimeoutMustBeZero => {
1600                f.write_str("imported event timeout must be zero")
1601            }
1602            Self::DebitAccountAlreadyClosed => f.write_str("debit account already closed"),
1603            Self::CreditAccountAlreadyClosed => f.write_str("credit account already closed"),
1604            Self::OverflowsDebitsPending => f.write_str("overflows debits_pending"),
1605            Self::OverflowsCreditsPending => f.write_str("overflows credits_pending"),
1606            Self::OverflowsDebitsPosted => f.write_str("overflows debits_posted"),
1607            Self::OverflowsCreditsPosted => f.write_str("overflows credits_posted"),
1608            Self::OverflowsDebits => f.write_str("overflows debits"),
1609            Self::OverflowsCredits => f.write_str("overflows credits"),
1610            Self::OverflowsTimeout => f.write_str("overflows timeout"),
1611            Self::ExceedsCredits => f.write_str("exceeds credits"),
1612            Self::ExceedsDebits => f.write_str("exceeds debits"),
1613        }
1614    }
1615}
1616
1617/// Errors resulting from constructing a [`Client`].
1618#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1619#[non_exhaustive]
1620pub enum InitStatus {
1621    /// Some other unexpected error occurrred.
1622    Unexpected,
1623    /// Out of memory.
1624    OutOfMemory,
1625    /// There was some error parsing the provided addresses.
1626    AddressInvalid,
1627    /// Too many addresses were provided.
1628    AddressLimitExceeded,
1629    /// Some system resource was exhausted.
1630    ///
1631    /// This includes file descriptors, threads, and lockable memory.
1632    SystemResources,
1633    /// The network was unavailable or other network initialization error.
1634    NetworkSubsystem,
1635}
1636
1637impl std::error::Error for InitStatus {}
1638impl core::fmt::Display for InitStatus {
1639    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
1640        match self {
1641            Self::Unexpected => f.write_str("unexpected"),
1642            Self::OutOfMemory => f.write_str("out of memory"),
1643            Self::AddressInvalid => f.write_str("address invalid"),
1644            Self::AddressLimitExceeded => f.write_str("address limit exceeded"),
1645            Self::SystemResources => f.write_str("system resources"),
1646            Self::NetworkSubsystem => f.write_str("network subsystem"),
1647        }
1648    }
1649}
1650
1651/// Errors that occur prior to the server processing a batch of operations.
1652///
1653/// When one of these is returned as a result of a transaction request,
1654/// then all operations in the request can be assumed to have not been processed.
1655#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1656#[non_exhaustive]
1657pub enum PacketStatus {
1658    /// Too many events were submitted to a multi-event request.
1659    TooMuchData,
1660    /// The client was evicted by the server.
1661    ClientEvicted,
1662    /// The client's version is too low.
1663    ClientReleaseTooLow,
1664    /// The client's version is too high.
1665    ClientReleaseTooHigh,
1666    /// The client was already destructed.
1667    ClientShutdown,
1668    /// An invalid operation was submitted.
1669    ///
1670    /// This should not be possible in the Rust client.
1671    InvalidOperation,
1672    /// The operation's payload was an incorrect size.
1673    ///
1674    /// This should not be possible in the Rust client.
1675    InvalidDataSize,
1676}
1677
1678impl std::error::Error for PacketStatus {}
1679impl core::fmt::Display for PacketStatus {
1680    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
1681        match self {
1682            Self::TooMuchData => f.write_str("too much data"),
1683            Self::ClientEvicted => f.write_str("client evicted"),
1684            Self::ClientReleaseTooLow => f.write_str("client release too low"),
1685            Self::ClientReleaseTooHigh => f.write_str("client release too high"),
1686            Self::ClientShutdown => f.write_str("client shutdown"),
1687            Self::InvalidOperation => f.write_str("invalid operation"),
1688            Self::InvalidDataSize => f.write_str("invalid data size"),
1689        }
1690    }
1691}
1692
1693/// An error type returned by point queries.
1694///
1695/// Returned by [`Client::lookup_accounts`] and [`Client::lookup_transfers`]
1696/// when the account or transfer does not exist.
1697#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1698pub struct NotFound;
1699
1700impl std::error::Error for NotFound {}
1701impl core::fmt::Display for NotFound {
1702    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
1703        f.write_str("not found")
1704    }
1705}
1706
1707/// A utility type for representing reserved bytes in structs.
1708///
1709/// This type is instantiated with [`Default::default`] and typically
1710/// does not need to be used directly.
1711#[repr(transparent)]
1712#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1713pub struct Reserved<const N: usize>([u8; N]);
1714
1715impl<const N: usize> Default for Reserved<N> {
1716    fn default() -> Reserved<N> {
1717        Reserved([0; N])
1718    }
1719}
1720
1721fn create_packet<Event>(
1722    op: u8, // TB_OPERATION
1723    events: &[Event],
1724) -> (Box<tbc::tb_packet_t>, Receiver<CompletionMessage<Event>>)
1725where
1726    Event: Copy + 'static,
1727{
1728    let (tx, rx) = channel::<CompletionMessage<Event>>();
1729    let callback: Box<OnCompletion> = Box::new(Box::new(
1730        |context, packet, timestamp, result_ptr, result_len| unsafe {
1731            let events_len = (*packet).data_size as usize / mem::size_of::<Event>();
1732            let events = Vec::from_raw_parts((*packet).data as *mut Event, events_len, events_len);
1733            (*packet).data = ptr::null_mut();
1734
1735            let packet = Packet(Box::from_raw(packet));
1736
1737            let result = if result_len != 0 {
1738                std::slice::from_raw_parts(result_ptr, result_len as usize)
1739            } else {
1740                &[]
1741            };
1742            let result = Vec::from(result);
1743
1744            let _ = tx.send(CompletionMessage {
1745                _context: context,
1746                packet,
1747                _timestamp: timestamp,
1748                result,
1749                _events: events,
1750            });
1751        },
1752    ));
1753
1754    let mut events: Vec<Event> = events.to_vec();
1755    assert_eq!(events.len(), events.capacity());
1756
1757    let events_len = events.len();
1758    let events_ptr = events.as_mut_ptr();
1759    mem::forget(events);
1760
1761    let packet = Box::new(tbc::tb_packet_t {
1762        user_data: Box::into_raw(callback) as *mut c_void,
1763        data: events_ptr as *mut c_void,
1764        data_size: (mem::size_of::<Event>() * events_len) as u32,
1765        user_tag: 0xABCD,
1766        operation: op,
1767        status: tbc::TB_PACKET_STATUS_TB_PACKET_OK,
1768        opaque: [0; 64],
1769    });
1770
1771    (packet, rx)
1772}
1773
1774fn handle_message<CEvent, CResult>(
1775    msg: &CompletionMessage<CEvent>,
1776) -> Result<&[CResult], PacketStatus> {
1777    let packet = &msg.packet.0;
1778    let result = &msg.result;
1779
1780    if packet.status != tbc::TB_PACKET_STATUS_TB_PACKET_OK {
1781        return Err(packet.status.into());
1782    }
1783
1784    let result = unsafe {
1785        if !result.is_empty() {
1786            std::slice::from_raw_parts(
1787                result.as_ptr() as *const CResult,
1788                result
1789                    .len()
1790                    .checked_div(mem::size_of::<CResult>())
1791                    .expect("div"),
1792            )
1793        } else {
1794            &[]
1795        }
1796    };
1797
1798    Ok(result)
1799}
1800
1801// Thread-sendable wrapper for the owned packet.
1802struct Packet(Box<tbc::tb_packet_t>);
1803
1804// Safety: after completion, zig no longer touches the packet; we own it exclusively.
1805unsafe impl Send for Packet {}
1806
1807struct CompletionMessage<E> {
1808    _context: usize,
1809    packet: Packet,
1810    _timestamp: u64,
1811    result: Vec<u8>,
1812    _events: Vec<E>,
1813}
1814
1815type OnCompletion = Box<dyn FnOnce(usize, *mut tbc::tb_packet_t, u64, *const u8, u32)>;
1816
1817extern "C" fn on_completion(
1818    context: usize,
1819    packet: *mut tbc::tb_packet_t,
1820    timestamp: u64,
1821    result_ptr: *const u8,
1822    result_len: u32,
1823) {
1824    unsafe {
1825        let callback: Box<OnCompletion> = Box::from_raw((*packet).user_data as *mut OnCompletion);
1826        (*packet).user_data = ptr::null_mut();
1827        callback(context, packet, timestamp, result_ptr, result_len);
1828    }
1829}