Crate hakuban[][src]

Expand description

Hakuban is a data-object sharing library. Check out the README file if you haven’t been there yet.

Hakuban is a simple mechanism which allows exposing data-objects in some processes, and observing them in other processes. Hakuban takes care of all the boring details of data transmission, synchronization, object lifetime management, load balancing etc.

At a high level the API can be divided into 5 parts:

Concepts

An object is a basic unit of share-able data. Objects can be observed and/or exposed by processes. Every object is uniquely identified by a set of tags and a single json object1. Such unique identifier is represented by the ObjectDescriptor struct.

There is no API to access objects directly. Instead, object state can be retrieved and asserted through ObjectObserve and ObjectExpose contracts.

A tag represents a set of objects. Process observing a tag declares interest in observing all existing objects tagged with that tag. Likewise, process exposing a tag declares capability to expose any and all existing objects tagged with that tag. Every tag is uniquely identified by a json object1. Represented by the TagDescriptor struct.

Tags are not meant to represent data model level collections. They are merely wildcards for declaring observe/expose contracts for objects of yet-unknown descriptors. TODO: explain with example

To exchange object versions processes can connect to each other in a tree topology. Processes (nodes) at the leaf-end of the tree will only receive data relevant to them. LocalNode struct represents local hakuban node’s state. It is the main access point to the hakuban network. It allows building contracts and access to node-wide events. (There is node::RemoteNode struct too, but it should not be needed outside of custom network transport implementations. Look at tokio.rs if you want to do that.)

TODO: explain how single exposer is picked and assigned for each object

Example

let hakuban = hakuban::LocalNode::builder().build();
let observe = hakuban.object((["some-tag"],"xxx")).observe::<String>();

std::thread::spawn(move || {
	let expose = hakuban.object((["some-tag"],"xxx")).expose();
	expose.set_object_data(&"aaaaaa");
});

for _event in observe.changes() {
	if let Some(data) = observe.object_data()? {
		println!("{}", data);
		break;
	}
}

Connecting somewhere

Start the hakuban-router (binary built by this lib), and following 2 processes. Order doesn’t matter. The A and B processes will communicate through the hakuban-router instance.

You can also make them communicate directly, by making one of them create tokio::WebsocketListener instead of tokio::WebsocketConnector.

At this moment you should not have more than one WebsocketConnector running at the same time in a single process, to keep the network tree-shaped2, other graphs are not supported.

Process A:

let hakuban = LocalNode::builder().build();
let _upstream = WebsocketConnector::new("ws://127.0.0.1:3001")?.start(hakuban.clone());
let observe = hakuban.object((["some-tag"],"xxx")).observe::<String>();

for _event in observe.changes() {
	if let Some(data) = observe.object_data()? {
		println!("{}", data);
		break;
	}
}

Process B:

let hakuban = LocalNode::builder().build();
let _upstream = WebsocketConnector::new("ws://127.0.0.1:3001")?.start(hakuban.clone());
let expose = hakuban.object((["some-tag"],"xxx")).expose();

for _event in expose.changes() {
	if expose.assigned() {
		expose.set_object_data(&"aaaaaa");
		break;
	}
}

Event handling

Async - with futures::Stream

use futures_util::StreamExt;

let mut events: EventStream<_> = observe.changes().into();
while let Some(event) = events.next().await {
	println!("{:?}", event);
}

Sync - with an Iterator

for event in observe.changes() {
	println!("{:?}", event);
}

Sync - with a callback

This method is discouraged and very limited. Callbacks get called synchronously while events are being processed in Hakuban core. So, calling any hakuban code from inside an event callback may result in a deadlock. It’s still useful for FFI etc. Just don’t do anything serious inside.

use hakuban::events::CallbackRegistry;

observe.changes().register(Box::new(|event|
	println!("{:?}", event)
)).forever();

Buffering events, Annotating events, Merging multiple event sources into one buffer (Stream/Iterator)

Look at events module documentation.

Cargo feature flags

  • default: ["tokio-runtime"]
  • async: makes EventStream implement futures::Stream
  • tokio-runtime: enables compilation of tokio.rs, the only runtime&network transport implemented so far. You’ll need it if you want to communicate with other processes.

FFI / so

Check out documentation of the ffi module.

Included binaries

There is one binary built by this crate - the hakuban-router. It’s basically a WebsocketListener surrounded by “main”. It accepts connections and will naturally route hakuban communication, like every other hakuban process with WebsocketListener would.


  1. any valid json really, string, array, number, etc. 

  2. unless you make sure there is no overlap in object and tag descriptors visible on the other side of both connections 

Modules

events

Event delivery pipeline

ffi

Interface for inferior programming languages

message

Communication primitives; only for tests and custom transports

node

High-level hakuban network interface

object

Object contracts, and supporting structures

tag

Tag contracts, and supporting structures

tokio

Provides Websocket transport running in Tokio runtime

Structs

LocalNode

Main access point to the Hakuban network

ObjectDescriptor

Unique object identifier

ObjectExpose

Represents a wish, a contract to expose an object

ObjectObserve

Represents a wish, a contract to observe an object

TagDescriptor

Unique tag identifier

TagExpose

Represents a wish, a contract to expose any object with specific tag

TagObserve

Represents a wish, a contract to observe all objects with specific tag

Enums

DefaultSerializerError

Error returned by object accessors if (de)serialization fails and default (de)serializer is used

Type Definitions

ObjectRawData

= Arc<Vec<u8>>

ObjectType

= Vec<String>

ObjectVersion

= Vec<i64>