Expand description
Hakuban is a data-object sharing library. Check out the README file if you haven’t been there yet.
Hakuban is a simple mechanism which allows exposing data-objects in some processes, and observing them in other processes. Hakuban takes care of all the boring details of data transmission, synchronization, object lifetime management, load balancing etc.
At a high level the API can be divided into 6 parts:
- Network management: Exchange, tokio_runtime::WebsocketListener, tokio_runtime::WebsocketConnector
- Descriptors (“rich labels” for objects and tags): ObjectDescriptor, TagDescriptor
- Object and tag contracts: ObjectObserveContract, ObjectExposeContract, TagObserveContract, TagExposeContract
- Object state access: ObjectState, ObjectStateStream, ObjectStateSink
- FFI, WASM
§Concepts
A hakuban object represents a shared variable. Atomically writable through a selected ExposeContract. And atomically readable through ObserveContracts. Among existing ExposeContracts, the hakuban network elects a single1 one, which becomes responsible for producing new states of the object. ObserveContracts can only access the most recent object state (that they’ve received so far).
Every object has a unique identifier - an ObjectDescriptor. Every object belongs to some set of tags, which are used for wildcard observing/exposing2. Every tag has a unique identifier - a TagDescriptor.
ExposeContracts and ObserveContracts can be obtained from an Exchange. The Exchange controlls lifetime of objects, selects a single exposer for every object needing to be exposed, handles load-balancing, and facilitates object state propagation. The exchange can be asked to emit:
- ObjectObserveContract - declaration of interest in observing values of a single object
- ObjectExposeContract - declaration of ability to expose (update, produce) new values of a single object
- TagObserveContract - declaration of interest in observing values of all3 objects tagged with a specific tag
- TagExposeContract - declaration of ability to expose (update, produce) new values of all3 objects tagged with specific tag
Observe contracts are streams emitting streams of ObjectState structs. Expose contracts are streams emiting sinks of ObjectState structs.
Exchanges in separate processes can be connected to each other in a tree topology. Processes (exchanges) at the leaf-end of the tree will only receive data relevant to them.
§Example
let exchange = Exchange::new();
let mut observe_contract = exchange.object_observe_contract((["some-tag"],"xxx")).build();
let mut expose_contract = exchange.object_expose_contract((["some-tag"],"xxx")).build();
tokio::spawn(async move {
//ExposeContract emits ObjectStateSink when first observer appears
let mut object_state_sink = expose_contract.next().await.unwrap();
let object_state = ObjectState::new("my data").json_serialize();
object_state_sink.send(object_state).await;
});
tokio::spawn(async move {
//ObserveContract emits ObjectStateStream
let mut object_state_stream = observe_contract.next().await.unwrap();
let object_state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
println!("{:?}", object_state.data);
});§Connecting somewhere
Start the hakuban-router (binary built by this lib), and following 2 processes. Order doesn’t matter, they will happily wait for each other ❤️.
The A and B processes will communicate through the hakuban-router instance.
You can also make them communicate directly, by making one of them create a tokio_runtime::WebsocketListener instead of the tokio_runtime::WebsocketConnector.
You can not have more than one WebsocketConnector attached to a single Exchange, to keep the network tree-shaped.
Process A:
let exchange = Exchange::new();
let _upstream = WebsocketConnector::new(exchange.clone(), "ws://127.0.0.1:3001")?;
let mut observe_contract = exchange.object_observe_contract((["some-tag"],"xxx")).build();
let object_state = observe_contract.next().await.unwrap().next().await.unwrap();
println!("{:?}", object_state.data);Process B:
let exchange = Exchange::new();
let _upstream = WebsocketConnector::new(exchange.clone(), "ws://127.0.0.1:3001")?;
let mut expose_contract = exchange.object_expose_contract((["some-tag"],"xxx")).build();
let object_state = ObjectState::new("🦀").json_serialize();
let mut object_state_sink = expose_contract.next().await.unwrap();
object_state_sink.send(object_state).await;
//waiting for Process A to receive the goods and exit
while object_state_sink.next().await.is_some() {};§Cargo feature flags
default:["tokio-runtime", "ffi", "downstream"]tokio-runtime: enables compilation of tokio_runtime.rs, the only runtime&network transport implemented so far. You’ll need it if you want to communicate with other processes.musl: allows building statically-linked binaries:cargo build --target x86_64-unknown-linux-musl --release --features musldownstream: enables processing of incoming connections. there is no corresponding “upstream” feature, outgoing connections are always enabled.wasm: used for building the browser-wasm
§FFI / .so
Check out documentation of the ffi module.
§Included binaries
There is one binary built by this crate - the hakuban-router.
It’s basically a WebsocketListener surrounded by “main”.
It accepts connections and will naturally route hakuban communication, like every other hakuban process with WebsocketListener would.
It’s usually a single ExposeContract, but the number it’s not guaranteed. More than one ExposeContract may be elected at a time, typically during load-balancing hand-over. ↩
Tags are not meant to represent data model level collections. They are merely wildcards for declaring observe/expose contracts for objects of yet-unknown ObjectDescriptors.
TODO: explain with an example↩All “live” objects. Those of which exists at least one ObjectObserveContract, or, both: ObjectExposeContract and a TagObserveContract. ↩ 1 2
Modules§
- ffi
- Interface for inferior programming languages
- monitor
- Debugging, introspection, statistics
- tokio_
runtime - Provides Websocket transport running in Tokio runtime
Structs§
- Exchange
- Main access point to the Hakuban network
- Object
Descriptor - Unique object identifier
- Object
Expose Contract - Represents a wish, a contract to expose an object
- Object
Expose Contract Builder - Object
Observe Contract - Represents a wish, a contract to observe an object
- Object
Observe Contract Builder - Object
State - Structure holding individual data-object’s state
- Object
State Sink - A futures::sink::Sink, accepting ObjectStates
- Object
State Sink Params - Currently empty struct, to hold sink parameters in the future
- Object
State Stream - A futures::stream::Stream, emitting ObjectStates
- TagDescriptor
- Unique tag identifier
- TagExpose
Contract - Represents a wish, a contract to expose any object with specific tag
- TagExpose
Contract Builder - TagObserve
Contract - Represents a wish, a contract to observe all objects with specific tag
- TagObserve
Contract Builder
Enums§
- Data
Synchronized - Object synchronization state
- Json
Deserialize Error
Traits§
Type Aliases§
- Data
Bytes = Arc<Vec<u8>>- Data
Format = Vec<String>- Data
Version = Vec<i64>