Crate hakuban

Crate hakuban 

Source
Expand description

Hakuban is a data-object sharing library. Check out the README file if you haven’t been there yet.

Hakuban is a simple mechanism which allows exposing data-objects in some processes, and observing them in other processes. Hakuban takes care of all the boring details of data transmission, synchronization, object lifetime management, load balancing etc.

At a high level the API can be divided into 6 parts:

§Concepts

A hakuban object represents a shared variable. Atomically writable through a selected ExposeContract. And atomically readable through ObserveContracts. Among existing ExposeContracts, the hakuban network elects a single1 one, which becomes responsible for producing new states of the object. ObserveContracts can only access the most recent object state (that they’ve received so far).

Every object has a unique identifier - an ObjectDescriptor. Every object belongs to some set of tags, which are used for wildcard observing/exposing2. Every tag has a unique identifier - a TagDescriptor.

ExposeContracts and ObserveContracts can be obtained from an Exchange. The Exchange controlls lifetime of objects, selects a single exposer for every object needing to be exposed, handles load-balancing, and facilitates object state propagation. The exchange can be asked to emit:

  • ObjectObserveContract - declaration of interest in observing values of a single object
  • ObjectExposeContract - declaration of ability to expose (update, produce) new values of a single object
  • TagObserveContract - declaration of interest in observing values of all3 objects tagged with a specific tag
  • TagExposeContract - declaration of ability to expose (update, produce) new values of all3 objects tagged with specific tag

Observe contracts are streams emitting streams of ObjectState structs. Expose contracts are streams emiting sinks of ObjectState structs.

Exchanges in separate processes can be connected to each other in a tree topology. Processes (exchanges) at the leaf-end of the tree will only receive data relevant to them.

§Example

let exchange = Exchange::new();

let mut observe_contract = exchange.object_observe_contract((["some-tag"],"xxx")).build();
let mut expose_contract = exchange.object_expose_contract((["some-tag"],"xxx")).build();

tokio::spawn(async move {
	//ExposeContract emits ObjectStateSink when first observer appears
	let mut object_state_sink = expose_contract.next().await.unwrap();
	let object_state = ObjectState::new("my data").json_serialize();
	object_state_sink.send(object_state).await;
});

tokio::spawn(async move {
	//ObserveContract emits ObjectStateStream
	let mut object_state_stream = observe_contract.next().await.unwrap();
	let object_state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
	println!("{:?}", object_state.data);
});

§Connecting somewhere

Start the hakuban-router (binary built by this lib), and following 2 processes. Order doesn’t matter, they will happily wait for each other ❤️. The A and B processes will communicate through the hakuban-router instance.

You can also make them communicate directly, by making one of them create a tokio_runtime::WebsocketListener instead of the tokio_runtime::WebsocketConnector.

You can not have more than one WebsocketConnector attached to a single Exchange, to keep the network tree-shaped.

Process A:

let exchange = Exchange::new();
let _upstream = WebsocketConnector::new(exchange.clone(), "ws://127.0.0.1:3001")?;
let mut observe_contract = exchange.object_observe_contract((["some-tag"],"xxx")).build();

let object_state = observe_contract.next().await.unwrap().next().await.unwrap();
println!("{:?}", object_state.data);

Process B:

let exchange = Exchange::new();
let _upstream = WebsocketConnector::new(exchange.clone(), "ws://127.0.0.1:3001")?;
let mut expose_contract = exchange.object_expose_contract((["some-tag"],"xxx")).build();

let object_state = ObjectState::new("🦀").json_serialize();
let mut object_state_sink = expose_contract.next().await.unwrap();
object_state_sink.send(object_state).await;

//waiting for Process A to receive the goods and exit
while object_state_sink.next().await.is_some() {};

§Cargo feature flags

  • default: ["tokio-runtime", "ffi", "downstream"]
  • tokio-runtime: enables compilation of tokio_runtime.rs, the only runtime&network transport implemented so far. You’ll need it if you want to communicate with other processes.
  • musl: allows building statically-linked binaries: cargo build --target x86_64-unknown-linux-musl --release --features musl
  • downstream: enables processing of incoming connections. there is no corresponding “upstream” feature, outgoing connections are always enabled.
  • wasm: used for building the browser-wasm

§FFI / .so

Check out documentation of the ffi module.

§Included binaries

There is one binary built by this crate - the hakuban-router. It’s basically a WebsocketListener surrounded by “main”. It accepts connections and will naturally route hakuban communication, like every other hakuban process with WebsocketListener would.


  1. It’s usually a single ExposeContract, but the number it’s not guaranteed. More than one ExposeContract may be elected at a time, typically during load-balancing hand-over. 

  2. Tags are not meant to represent data model level collections. They are merely wildcards for declaring observe/expose contracts for objects of yet-unknown ObjectDescriptors. TODO: explain with an example 

  3. All “live” objects. Those of which exists at least one ObjectObserveContract, or, both: ObjectExposeContract and a TagObserveContract. ↩ 1 2

Modules§

ffi
Interface for inferior programming languages
monitor
Debugging, introspection, statistics
tokio_runtime
Provides Websocket transport running in Tokio runtime

Structs§

Exchange
Main access point to the Hakuban network
ObjectDescriptor
Unique object identifier
ObjectExposeContract
Represents a wish, a contract to expose an object
ObjectExposeContractBuilder
ObjectObserveContract
Represents a wish, a contract to observe an object
ObjectObserveContractBuilder
ObjectState
Structure holding individual data-object’s state
ObjectStateSink
A futures::sink::Sink, accepting ObjectStates
ObjectStateSinkParams
Currently empty struct, to hold sink parameters in the future
ObjectStateStream
A futures::stream::Stream, emitting ObjectStates
TagDescriptor
Unique tag identifier
TagExposeContract
Represents a wish, a contract to expose any object with specific tag
TagExposeContractBuilder
TagObserveContract
Represents a wish, a contract to observe all objects with specific tag
TagObserveContractBuilder

Enums§

DataSynchronized
Object synchronization state
JsonDeserializeError

Traits§

HakubanStreamExt
JsonDeserializeState
JsonSerializeState

Type Aliases§

DataBytes
= Arc<Vec<u8>>
DataFormat
= Vec<String>
DataVersion
= Vec<i64>