Transistor
A Rust Crux Client crate/lib. For now, this crate intends to support 2 ways to interact with Crux:
- Via
Dockerwith acrux-standaloneversion docker-hub. Current Docker imagejuxt/crux-standalone:20.09-1.11.0. - Via
HTTPusing theHTTP API. - Async support.
- Clojure.api. (To be evaluated.)
- FFI. (To be evaluated.)
Other solutions may be added after the first release.
- Crux Getting Started
- Crux FAQs
- For examples on usage, please refer to examples directory or to the
ATM Cruxfor more complete and interactive example.
Bitemporal Crux
Crux is optimised for efficient and globally consistent point-in-time queries using a pair of transaction-time and valid-time timestamps.
Ad-hoc systems for bitemporal recordkeeping typically rely on explicitly tracking either valid-from and valid-to timestamps or range types directly within relations. The bitemporal document model that Crux provides is very simple to reason about and it is universal across the entire database, therefore it does not require you to consider which historical information is worth storing in special "bitemporal tables" upfront.
One or more documents may be inserted into Crux via a put transaction at a specific valid-time, defaulting to the transaction time (i.e. now), and each document remains valid until explicitly updated with a new version via put or deleted via delete.
Why?
| Time | Purpose |
|---|---|
| transaction-time | Used for audit purposes, technical requirements such as event sourcing. |
| valid-time | Used for querying data across time, historical analysis. |
transaction-time represents the point at which data arrives into the database. This gives us an audit trail and we can see what the state of the database was at a particular point in time. You cannot write a new transaction with a transaction-time that is in the past.
valid-time is an arbitrary time that can originate from an upstream system, or by default is set to transaction-time. Valid time is what users will typically use for query purposes.
Reference crux bitemporality and value of bitemporality
Usage
To add this crate to your project you should add one of the following line to your dependencies field in Cargo.toml:
[dependencies] transistor = "2.1.1"
Creating a Crux Client
All operations with Transistor start in the module client with Crux::new("localhost", "3000"). The struct Crux is responsabile for defining request HeadersMap and the request URL. The URL definition is required and it is done by the static function new, which receives as argument a host and a port and returns a Crux instance. To change HeadersMap info so that you can add AUTHORIZATION you can use the function with_authorization that receives as argument the authorization token and mutates the Crux instance.
HeaderMapalready contains the headerContent-Type: application/edn.
Finally, to create a Crux Client the function <type>_client should be called, for example http_client. This function returns a struct that contains all possible implementarions to query Crux Docker and Standalone HTTP Server.
use Crux;
// HttpClient with AUTHORIZATION
let auth_client = new.with_authorization.http_client;
// HttpClient without AUTHORIZATION
let client = new.http_client;
Http Client
Once you have called http_client you will have an instance of the HttpClient struct which has a bunch of functions to query Crux on Docker and Standalone HTTP Server:
statequeries endpoint/with aGET. No args. Returns various details about the state of the database.
let body = client.state.unwrap;
// StateResponse {
// index___index_version: 5,
// doc_log___consumer_state: None,
// tx_log___consumer_state: None,
// kv___kv_store: "crux.kv.rocksdb.RocksKv",
// kv___estimate_num_keys: 56,
// kv___size: 2271042
// }
tx_logrequests endpoint/tx-logviaPOST.Actionsis expected as argument. The "write" endpoint, to post transactions.
use ;
use Crux;
use ;
let person1 = Person ;
let person2 = Person ;
let actions = new
.append_put
.append_put;
let body = client.tx_log.unwrap;
// {:crux.tx/tx-id 7, :crux.tx/tx-time #inst \"2020-07-16T21:50:39.309-00:00\"}
use Crux;
let body = client.tx_logs.unwrap;
// TxLogsResponse {
// tx_events: [
// TxLogResponse {
// tx___tx_id: 0,
// tx___tx_time: 2020-07-09T23:38:06.465-00:00,
// tx__event___tx_events: Some(
// [
// [
// ":crux.tx/put",
// "a15f8b81a160b4eebe5c84e9e3b65c87b9b2f18e",
// "125d29eb3bed1bf51d64194601ad4ff93defe0e2",
// ],
// ],
// ),
// },
// TxLogResponse {
// tx___tx_id: 1,
// tx___tx_time: 2020-07-09T23:39:33.815-00:00,
// tx__event___tx_events: Some(
// [
// [
// ":crux.tx/put",
// "a15f8b81a160b4eebe5c84e9e3b65c87b9b2f18e",
// "1b42e0d5137e3833423f7bb958622bee29f91eee",
// ],
// ],
// ),
// },
// ...
// ]
// }
entityrequests endpoint/entityviaPOST. A serializedCruxId, serializedEdn::Keyor a String containing akeywordmust be passed as argument. Returns an entity for a given ID and optional valid-time/transaction-time co-ordinates.
let person = Person ;
let client = new.http_client;
// entity expects a CruxId
let edn_body = client.entity.unwrap;
// Map(
// Map(
// {
// ":crux.db/id": Key(
// ":hello-entity",
// ),
// ":first-name": Str(
// "Hello",
// ),
// ":last-name": Str(
// "World",
// ),
// },
// ),
// )
-
entity_timedis similar toentityas it requests the same endpoint, the difference is that it can sendtransaction-timeandvalid-timeas query-params. This is done by the extra argumentstransaction_time: Option<DateTime<FixedOffset>>andvalid_time: Option<DateTime<FixedOffset>>. -
entity_txrequests endpoint/entity-txviaPOST. A serializedCruxId, serializedEdn::Keyor a String containing akeywordmust be passed as argument. Returns the transaction that most recently set a key.
use ;
use Crux;
use ;
let person = Person ;
let client = new.http_client;
let tx_body = client.entity_tx.unwrap;
// EntityTxResponse {
// db___id: "d72ccae848ce3a371bd313865cedc3d20b1478ca",
// db___content_hash: "1828ebf4466f98ea3f5252a58734208cd0414376",
// db___valid_time: 2020-07-20T20:38:27.515-00:00,
// tx___tx_id: 31,
// tx___tx_time: 2020-07-20T20:38:27.515-00:00,
// }
-
entity_tx_timedis similar toentity_txas it requests the same endpoint, the difference is that it can sendtransaction-timeandvalid-timeas query-params. This is done by the extra argumentstransaction_time: Option<DateTime<FixedOffset>>andvalid_time: Option<DateTime<FixedOffset>>. -
entity_historyrequests endpoint/entity-historyviaGET. Arguments are thecrux.db/idas aString, an ordering argument defined by the enumhttp::Order(AscorDesc) and a boolean for thewith-docs?flag. The response is a Vector containingEntityHistoryElement. Ifwith-docs?istrue, thank the fielddb__doc,:crux.db/doc, witll return anOption<Edn>containing the inserted struct.
use Crux;
use Order;
use CruxId;
let person = Person
-
entity_history_timedis similar toentity_histotyas it requests the same endpoint, the difference is that it can sendstart-transaction-time,end-transaction-time,start-valid-timeandend-valid-timeas query-params. This is done by adding aVec<TimeHistory>containing oneTimeHistory::TransactionTimeand/or oneTimeHistory::ValidTime, both of them receive twoOption<DateTime<Utc>>. The firstDateTimeis thestart-<type>-timeand the second is theend-<type>-time. -
queryrequests endpoint/queryviaPOST. Argument is aqueryof the typeQuery. Retrives a Set containing a vector of the values defined by the functionQuery::find. Available functions arefind,find_by_aggregates,where_clause,args,order_by,limit,offset, examplescomplex_queryandlimit_offset_queryhave examples on how to use them.
Simple find
use Crux;
use ;
let client = new.http_client;
let query_is_sql = find
.where_clause
.build;
// Query:
// {:query
// {:find [?p1 ?n]
// :where [[?p1 :name ?n]
// [?p1 :is-sql true]]}}
let is_sql = client.query.unwrap;
// {[":mysql", "MySQL"], [":postgres", "Postgres"]} BTreeSet
Find by aggregates
- Available aggregates at
Aggregate
use Crux;
use ;
let client = new.http_client;
let q = find_by_aggregates?
.where_clause?
.build?;
// Query:
// {:query
// {:find [(min ?e) (max ?e) (count ?e) (min 5 ?e) (count-distinct ?e)]
// :where [[?e :type :burger]]
// }}
let _ = client.query?;
Transisitor's Structs and Enums
Actions is a builder struct to help you create a Vec<Action> for tx_log. Available functions are:
newstatic method to instantiate structActions.append_put<T: Serialize>(action: T)appends aPuttoActionswith novalid-time.Putwrites a document.append_put_timed<T: Serialize>(action: T, date: DateTime<FixedOffset>)appends aPuttoActionswithvalid-time.append_delete(id: CruxId)appends aDeletetoActionswith novalid-time. Deletes the specific document at lastvalid-time.append_delete_timed(id: CruxId, date: DateTime<FixedOffset>)appends aDeletetoActionswithvalid-time. Deletes the specific document at the givenvalid-time.append_evict(id: CruxId)appends anEvicttoActions. Evicts a document entirely, including all historical versions (receives only the ID to evict).append_match_doc<T: Serialize>(id: CruxId, action: T)appends aMatchtoActionswith novalid-time. Matches the current state of an entity, if the state doesn't match the provided document, the transaction will not continue.append_match_doc_timed<T: Serialize>(id: CruxId, action: T, date: DateTime<FixedOffset>)appends aMatchtoActionswithvalid-time.buildgenerates theVec<Action>fromActions
use Crux;
use Actions;
Query is a struct responsible for creating the fields and serializing them into the correct query format. It has a function for each field and a build function to help check if it is correctyly formatted.
findis a static builder function to define the elements inside the:findclause.where_clauseis a builder function that defines the vector os elements inside the:where []array.order_byis a builder function to define the elements inside the:order-byclause.argsis a builder function to define the elements inside the:argsclause.limitis a builder function to define the elements inside the:limitclause.offsetis a builder function to define the elements inside the:offsetclause.with_full_resultsis a builder function to define the flagfull-results?as true. This allows yourqueryresponse to return the whole document instead of only the searched keys. The result of the Query{:query {:find [?user ?a] :where [[?user :first-name ?a]] :full-results? true}}will be aBTreeSet<Vec<String>>like([{:crux.db/id :fafilda, :first-name "Jorge", :last-name "Klaus"} "Jorge"]), so the document will need further EDN parsing to become the document's struct.
Errors are defined in the CruxError enum.
EdnErroris a wrapper overedn_rs::EdnError.RequestErroris originated byreqwestcrate. Failed to make HTTP request.QueryFormatErroris originated when the provided Query struct did not match schema.QueryErroris responsible for encapsulation the Stacktrace error from Crux response:
use Crux;
use ;
let _client = new.http_client;
// field `n` doesn't exist
let _query_error_response = find
.where_clause
.build;
let error = client.query?;
println!;
// Stacktrace
// QueryError("{:via
// [{:type java.lang.IllegalArgumentException,
// :message \"Find refers to unknown variable: n\",
// :at [crux.query$q invokeStatic \"query.clj\" 1152]}],
// :trace
// [[crux.query$q invokeStatic \"query.clj\" 1152]
// [crux.query$q invoke \"query.clj\" 1099]
// [crux.query$q$fn__10850 invoke \"query.clj\" 1107]
// [clojure.core$binding_conveyor_fn$fn__5754 invoke \"core.clj\" 2030]
// [clojure.lang.AFn call \"AFn.java\" 18]
// [java.util.concurrent.FutureTask run \"FutureTask.java\" 264]
// [java.util.concurrent.ThreadPoolExecutor
// runWorker
// \"ThreadPoolExecutor.java\"
// 1128]
// [java.util.concurrent.ThreadPoolExecutor$Worker
// run
// \"ThreadPoolExecutor.java\"
// 628]
// [java.lang.Thread run \"Thread.java\" 834]],
// :cause \"Find refers to unknown variable: n\"}
// ")
Testing the Crux Client
For testing purpose there is a feature called mock that enables the http_mock function that is a replacement for the http_client function. To use it run your commands with the the flag --features "mock" as in cargo test --test lib --no-fail-fast --features "mock". The mocking feature uses the crate mockito = "0.26" as a Cargo dependency. An example usage with this feature enabled:
use Crux;
use Action;
use Serialize;
use ;
use mock;
Also, struct Actions can be tested with feature mock by using enum ActionMock due to the implementation of impl PartialEq<Vec<ActionMock>> for Actions. A demo example can be:
use ;
Async support
Async feature is still in BETA as it depends heavily on unwraps.
It is possible to use async/await http client, for that it is necessary to enable feature async in transistor, transistor = { version = "2.1.1", features = ["async"] }. With this feature enabled the HttpClient will use reqwest::Client instead of reqwest::blocking::Client. The default async runtime for reqwest::Client is tokio, so it is good to have tokio with feature macros, as well as futures, in your Cargo.toml:
= { = "0.3.5" }
= { = "0.2.22", = ["macros"] }
An async query example can be found below:
use *;
use Crux;
use Serialize;
use Action;
use ;
async
Note use tokio::prelude::*; and #[tokio::main] \n async fn main().
Enabling feature time_as_str
It is possible to use receive the responses (TxLogResponse, EntityTxResponse, EntityHistoryElement) time dates as Strings, to do so you have to enable feature time_as_str:
= { = "2.1.1", = ["time_as_str"] }
Possible Features
mock = ["mockito"] -> http_mock()
time_as_str = [] -> DataTime types become Strings
async = ["tokio", "futures"] -> async/await
Dependencies
A strong dependency of this crate is the edn-rs crate, as many of the return types are in the Edn format, also the edn-derive. The sync http client is reqwest with blocking feature enabled. Chrono for time values that can be DateTime<Utc>, for inserts, and DateTime<FixedOffset>, for reads, and mockito for feature mock.
Licensing
This project is licensed under LGPP-3.0 (GNU Lesser General Public License v3.0).