Crate cn_tigerbeetle

Crate cn_tigerbeetle 

Source
Expand description

The official TigerBeetle client for Rust.

This is a client library for the TigerBeetle financial database. To use, create a Client and call its methods to make requests.

The client presents an async interface, but does not depend on a specific Rust async runtime. Instead it contains its own off-thread event loop, shared by all official TigerBeetle clients. Thus it should integrate seamlessly into any Rust codebase.

The cost of this though is that it does link to a non-Rust static library (called tb_client), and it does need to context switch between threads for every request. The native linking should be handled seamlessly on all supported platforms, and the context switching overhead is expected to be low compared to the cost of networking and disk I/O.

§Example

use tigerbeetle as tb;

// Connect to TigerBeetle
let client = tb::Client::new(0, "127.0.0.1:3000")?;

// Create accounts
let account_id1 = tb::id();
let account_id2 = tb::id();

let accounts = [
    tb::Account {
        id: account_id1,
        ledger: 1,
        code: 1,
        flags: tb::AccountFlags::History,
        ..Default::default()
    },
    tb::Account {
        id: account_id2,
        ledger: 1,
        code: 1,
        flags: tb::AccountFlags::History,
        ..Default::default()
    },
];

let account_results = client.create_accounts(&accounts).await?;

// If no results are returned, then all input events were successful -
// to save resources only unsuccessful inputs return results.
assert_eq!(account_results.len(), 0);

// Create a transfer between accounts
let transfer_id = tb::id();
let transfers = [tb::Transfer {
    id: transfer_id,
    debit_account_id: account_id1,
    credit_account_id: account_id2,
    amount: 100,
    ledger: 1,
    code: 1,
    ..Default::default()
}];

let transfer_results = client.create_transfers(&transfers).await?;
assert_eq!(transfer_results.len(), 0);

// Look up the accounts to see the transfer result
let accounts = client.lookup_accounts(&[account_id1, account_id2]).await?;
let account1 = accounts[0];
let account2 = accounts[1];

assert_eq!(account1.id, account_id1);
assert_eq!(account2.id, account_id2);
assert_eq!(account1.debits_posted, 100);
assert_eq!(account2.credits_posted, 100);

§Request batching

Most transaction and query operations support multiple events of the same type at once (this can be seen in the request method signatures accepting slices of their input types) and it is strongly recommended to submit many events in a single request at once as TigerBeetle will only reach its performance limits when events are received in large batches. The client does implement its own internal batching and will attempt to create them efficiently, but it is more efficient for applications to create their own batches based on understanding of their own architectural needs and limitations.

In TigerBeetle’s standard build-time configuration the maximum number of events per batch is 8189. If the events in a request exceed this number its future will return PacketStatus::TooMuchData.

§Range query limits

TigerBeetle’s range queries, get_account_transfers, get_account_balances, query_accounts and query_transfers, also have a limit to how many results they return.

In TigerBeetle’s standard build-time configuration the maximum number of results returned is 8189.

If the server returns a full batch for a range query, then further results can be paged by incrementing timeout_max to one greater than the highest timeout returned in the previous batch, and issuing a new query with otherwise the same filter. This process can be repeated until the server returns an unfull batch.

Here is an example of paging to get started with:

use tigerbeetle as tb;
use futures::{stream, Stream};

fn get_account_transfers_paged(
    client: &tb::Client,
    event: tb::AccountFilter,
) -> impl Stream<Item = Result<Vec<tb::Transfer>, tb::PacketStatus>> + '_ {
    assert!(
        event.limit > 1,
        "paged queries should use an explicit limit"
    );

    enum State {
        Start,
        Continue(u64),
        End,
    }

    let is_reverse = event.flags.contains(tb::AccountFilterFlags::Reversed);

    futures::stream::unfold(State::Start, move |state| async move {
        let event = match state {
            State::Start => event,
            State::Continue(timestamp_begin) => {
                if !is_reverse {
                    tb::AccountFilter {
                        timestamp_min: timestamp_begin,
                        ..event
                    }
                } else {
                    tb::AccountFilter {
                        timestamp_max: timestamp_begin,
                        ..event
                    }
                }
            }
            State::End => return None,
        };
        let result_next = client.get_account_transfers(event).await;
        match result_next {
            Ok(result_next) => {
                let result_len = u32::try_from(result_next.len()).expect("u32");
                let must_page = result_len == event.limit;
                if must_page {
                    let timestamp_first = result_next.first().expect("item").timestamp;
                    let timestamp_last = result_next.last().expect("item").timestamp;
                    let (timestamp_begin_next, should_continue) = if !is_reverse {
                        assert!(timestamp_first < timestamp_last);
                        let timestamp_begin_next = timestamp_last.checked_add(1).expect("overflow");
                        assert_ne!(timestamp_begin_next, u64::MAX);
                        let should_continue =
                            timestamp_begin_next <= event.timestamp_max || event.timestamp_max == 0;
                        (timestamp_begin_next, should_continue)
                    } else {
                        assert!(timestamp_first > timestamp_last);
                        let timestamp_begin_next = timestamp_last.checked_sub(1).expect("overflow");
                        assert_ne!(timestamp_begin_next, 0);
                        let should_continue =
                            timestamp_begin_next >= event.timestamp_min || event.timestamp_min == 0;
                        (timestamp_begin_next, should_continue)
                    };
                    if should_continue {
                        Some((Ok(result_next), State::Continue(timestamp_begin_next)))
                    } else {
                        Some((Ok(result_next), State::End))
                    }
                } else {
                    Some((Ok(result_next), State::End))
                }
            }
            Err(result_next) => Some((Err(result_next), State::End)),
        }
    })
}

§Response futures and client lifetime considerations

Responses to requests are returned as Futures. It is not strictly necessary for applications to await these futures — requests are enqueued as soon as the request method is called and will be executed even if the future is dropped.

It is possible to drop a Client while request futures are still outstanding. In this case any pending requests will be completed with PacketStatus::ClientShutdown. Request futures may resolve to successful results even after the client is closed.

When Client is dropped without calling close, it will shutdown correctly, but some of that work happens off-thread after the drop completes.

For orderly shutdown, it is recommended to await all request futures prior to destroying the client, and to destroy the client by calling close and awaiting its return value.

§Concurrency and multithreading

Multiple requests may be submitted concurrently from a single client; the results of which are returned as futures whose Rust lifetimes are tied to the Client. The server only supports one in-flight request per client though, so the client will internally buffer concurrent requests. To truly have multiple requests in flight concurrently, multiple clients can be created, though note that there is a hard-coded limit on how many clients can be connected to the server simultaneously.

The Client type implements Send and Sync and may be used in parallel across multiple threads or async tasks, e.g. by placing it into an Arc. In some cases this may be useful because it allows the client to leverage its internal request batching to batch events from multiple threads (or tasks), but otherwise it provides no performance advantage.

§TigerBeetle time-based identifiers

Accounts and transfers must have globally unique identifiers. The generation of these is application-specific, and any scheme that guarantees unique IDs will work. Barring other constraints, TigerBeetle recommends using TigerBeetle time-based identifiers. This crate provides an implementation in the id function.

For additional considerations when choosing an ID scheme see the TigerBeetle documentation on data modeling.

§Use in non-async codebases

The TigerBeetle client is async-only, but if you’re working in a synchronous codebase, you can use futures::executor::block_on to run async operations to completion.

use futures::executor::block_on;
use tigerbeetle as tb;

fn synchronous_function() -> Result<(), Box<dyn std::error::Error>> {
    block_on(async {
        let client = tb::Client::new(0, "127.0.0.1:3000")?;

        let accounts = [tb::Account {
            id: tb::id(),
            ledger: 1,
            code: 1,
            ..Default::default()
        }];

        let results = client.create_accounts(&accounts).await?;

        Ok(())
    })
}

Note that block_on will block the current thread until the async operation completes, so this approach works best for simple use cases or when you need to integrate TigerBeetle into an existing synchronous application.

§Rust structure binary representation and the TigerBeetle protocol

Many types in this library are ABI-compatible with the underlying protocol definition and can be cast (unsafely) directly to and from byte buffers on all supported platforms, though this should not be required for typical application purposes.

The protocol-compatible types are:

Note that status enums are not ABI-compatible with the protocol’s status codes and must be converted with TryFrom.

§References

The TigerBeetle Reference.

Structs§

Account
A TigerBeetle account.
AccountBalance
An account balance at a point in time.
AccountFilter
Filter for querying transfers and historical balances.
AccountFilterFlags
Bitflags for the flags field of AccountFilter.
AccountFlags
Bitflags for the flags field of Account.
Client
The TigerBeetle client.
CreateAccountsResult
The result of a single create_accounts event, with index.
CreateTransfersResult
The result of a single create_transfers event, with index.
NotFound
An error type returned by point queries.
QueryFilter
Parameters for querying accounts and transfers.
QueryFilterFlags
Bitflags for the flags field of QueryFilter.
Reserved
A utility type for representing reserved bytes in structs.
Transfer
A transfer between accounts.
TransferFlags
Bitflags for the flags field of Transfer.

Enums§

CreateAccountResult
The result of a single create_accounts event.
CreateTransferResult
The result of a single create_transfers event.
InitStatus
Errors resulting from constructing a Client.
PacketStatus
Errors that occur prior to the server processing a batch of operations.

Functions§

id
Generate a TigerBeetle time-based identifier.