Crate hakuban

source ·
Expand description

Hakuban is a data-object sharing library. Check out the README file if you haven’t been there yet.

Hakuban is a simple mechanism which allows exposing data-objects in some processes, and observing them in other processes. Hakuban takes care of all the boring details of data transmission, synchronization, object lifetime management, load balancing etc.

The API can be roughly divided into 6 parts:

Concepts

A hakuban object is akin to a mutable shared variable. Like a mutable variable can be assigned a new value (by a writer) - a hakuban object can be assigned a new object state (by an exposer). A mutable variable reader can retrieve its current value - and a hakuban observer can read the current state of a hakuban object.

Every hakuban object belongs to some set of tags. Tags can be used for “wildcard” observing/exposing of objects. To be an observer of a tag is to observe all objects tagged with that tag. To be an exposer of a tag is to be able to expose every object tagged with that tag. Tags are not meant to represent data model level collections.

A hakuban exchange is a central point where observers and exposers can meet and share their objects. A single process will typically build a single local exchange. Multiple local exchanges can be connected together over a network, forming what can be considered a single big exchange accessible by multiple processes.

A wish to observe an object (or a tag) can be declared to a local exchange, which will return an object observe contract (or a tag observe contract). Likewise, a wish to expose an object, or a tag, can be declared to a local exchange, resulting in, respectively, an object expose contract or a tag expose contract.

All four possible contracts ({object, tag} × {observe, expose}) are async Streams. Object observe contracts and tag observe contracts are streams which emit streams of object states. Object expose contracts and tag observe contracts are streams which emit sinks of object states.

Every hakuban object is uniquely identified by an object descriptor. Every hakuban tag is uniquely identified by a tag descriptor. A tag descriptor consists of a single json object1. An object descriptor consists of a set of tag descriptors, and a single json object1.

A tag descriptor created with json object {a: 1, b: 2} and a tag descriptor created with json object {b: 2, a: 1} are equal, and identify the same tag. Two object descriptors created with identical json objects but distinct sets of tags are NOT equal, and identify distinct objects.

TODO: explain how single exposer is picked and assigned for each object

Example

let exchange = LocalExchange::builder().build();
let mut observe_contract = exchange.object((["some-tag"],"xxx")).observe();
let mut expose_contract = exchange.object((["some-tag"],"xxx")).expose();

tokio::spawn(async move {
	//ExposeContract emits ObjectStateSink when first observer appears
	let mut object_state_sink = expose_contract.next().await.unwrap();
	let object_state = ObjectState::new("my data").json_serialize();
	object_state_sink.send(object_state).await;
});

tokio::spawn(async move {
	//ObserveContract emits ObjectStateStream when exposer uploads first ObjectState
	let mut object_state_stream = observe_contract.next().await.unwrap();
	let object_state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
	println!("{:?}", object_state.data);
});

Connecting somewhere

Start the hakuban-router (binary built by this lib), and following 2 processes. Order doesn’t matter, they will happily wait for each other ❤️. The A and B processes will communicate through the hakuban-router instance.

You can also make them communicate directly, by making one of them create a tokio_runtime::WebsocketListener instead of the tokio_runtime::WebsocketConnector.

You should not have more than one WebsocketConnector running at the same time in a single process, to keep the network tree-shaped2, other graphs are not supported.

Process A:

let exchange = LocalExchange::builder().build();
let _upstream = WebsocketConnector::new(exchange.clone(), "ws://127.0.0.1:3001")?;
let mut observe_contract = exchange.object((["some-tag"],"xxx")).observe();

let object_state = observe_contract.next().await.unwrap().next().await.unwrap();
println!("{:?}", object_state.data);

Process B:

let exchange = LocalExchange::builder().build();
let _upstream = WebsocketConnector::new(exchange.clone(), "ws://127.0.0.1:3001")?;
let mut expose_contract = exchange.object((["some-tag"],"xxx")).expose();

let object_state = ObjectState::new("🦀").json_serialize();
let mut object_state_sink = expose_contract.next().await.unwrap();
object_state_sink.send(object_state).await;

//waiting for Process A to receive the goods and exit
while object_state_sink.next().await.is_some() {};

Cargo feature flags

  • default: ["tokio-runtime"]
  • tokio-runtime: enables compilation of tokio.rs, the only runtime&network transport implemented so far. You’ll need it if you want to communicate with other processes.
  • musl: allows building statically-linked binaries: cargo build --target x86_64-unknown-linux-musl --release --features musl

FFI / so

Check out documentation of the ffi module.

Included binaries

There is one binary built by this crate - the hakuban-router. It’s basically a WebsocketListener surrounded by “main”. It accepts connections and will naturally route hakuban communication, like every other hakuban process with WebsocketListener would.


  1. any valid json really, string, array, number, etc. 

  2. unless you make sure there is no overlap in object and tag descriptors visible on the other side of both connections 

Re-exports

Modules

  • High-level hakuban network interface
  • Interface for inferior programming languages
  • Convenience functions to easily serialize and deserialize ObjectStates
  • Communication primitives; only for tests and custom transports
  • Object contracts, and supporting structures
  • Prologue
  • Tag contracts, and supporting structures
  • Provides Websocket transport running in Tokio runtime

Structs

Enums

Traits

Type Definitions