//because "cargo fmt" keeps replacing comment spaces with tabs, despite "format_code_in_doc_comments=false"
/*!
Hakuban is a data-object sharing library.
Check out the [README](https://gitlab.com/yunta/hakuban/-/blob/main/README.md) file if you haven't been there yet.
Hakuban is a simple mechanism which allows exposing data-objects in some processes, and observing them in other processes.
Hakuban takes care of all the boring details of data transmission, synchronization, object lifetime management, load balancing etc.
At a high level the API can be divided into 6 parts:
* Network management: [LocalExchange], [WebsocketListener](tokio_runtime::WebsocketListener), [WebsocketConnector](tokio_runtime::WebsocketConnector)
* Descriptors ("rich labels" for objects and tags): [ObjectDescriptor], [TagDescriptor]
* Object and tag contracts: [ObjectObserveContract], [ObjectExposeContract], [TagObserveContract], [TagExposeContract]
* Object state access: [ObjectState], [ObjectStateStream], [ObjectStateSink]
* [FFI](ffi), WASM
* Other modules and structures, exposed to enable tests and custom transports implementation: [message], [RemoteExchange](exchange::RemoteExchange)
# Concepts
An __object__ is a basic unit of share-able data.
Objects can be [observed](ObjectObserveContract) and/or [exposed](ObjectExposeContract) by processes.
Every object is uniquely identified by a set of __tags__ and a single json object[^1]. Such unique identifier is represented by the [ObjectDescriptor] struct.
There is no API to access objects directly.
Instead, object state can be retrieved and asserted through [ObjectStateStream]s and [ObjectStateSink]s obtained from observe/expose contracts.
A __tag__ represents a set of __objects__.
Process holding a [TagObserveContract] declares interest in observing all existing objects tagged with that tag.
Likewise, process holding a [TagExposeContract] declares capability to expose any and all existing objects tagged with that tag.
Every tag is uniquely identified by a json object[^1]. Represented by the [TagDescriptor] struct.
Tags are not meant to represent data model level collections.
They are merely wildcards for declaring observe/expose contracts for objects of yet-unknown [ObjectDescriptor]s. `TODO: explain with an example`
To exchange object data, processes can connect to each other in a tree topology.
Processes (exchanges) at the leaf-end of the tree will only receive data relevant to them.
[LocalExchange] struct represents local hakuban exchange's state.
It is the main access point to the hakuban network. It allows building contracts and access to exchange-wide events.
(There is [exchange::RemoteExchange] struct too, but it should not be needed outside of custom network transport implementations. Look at tokio.rs if you want to do that.)
`TODO: explain how single exposer is picked and assigned for each object`
[^1]: any valid json really, string, array, number, etc.
# Example
```rust
# use hakuban::prelude::*;
# use futures_util::stream::StreamExt;
# use futures_util::sink::SinkExt;
# tokio_test::block_on(async {
let exchange = LocalExchange::builder().build();
let mut observe_contract = exchange.object((["some-tag"],"xxx")).observe();
let mut expose_contract = exchange.object((["some-tag"],"xxx")).expose();
tokio::spawn(async move {
//ExposeContract emits ObjectStateSink when first observer appears
let mut object_state_sink = expose_contract.next().await.unwrap();
let object_state = ObjectState::new("my data").json_serialize();
object_state_sink.send(object_state).await;
});
tokio::spawn(async move {
//ObserveContract emits ObjectStateStream when exposer uploads first ObjectState
let mut object_state_stream = observe_contract.next().await.unwrap();
let object_state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
println!("{:?}", object_state.data);
});
# });
```
# Connecting somewhere
Start the `hakuban-router` (binary built by this lib), and following 2 processes. Order doesn't matter, they will happily wait for each other ❤️.
The A and B processes will communicate through the `hakuban-router` instance.
You can also make them communicate directly, by making one of them create a [tokio_runtime::WebsocketListener] instead of the [tokio_runtime::WebsocketConnector].
You should not have more than one WebsocketConnector running at the same time in a single process, to keep the network tree-shaped[^2], other graphs are not supported.
[^2]: unless you make sure there is no overlap in object and tag descriptors visible on the other side of both connections
Process A:
```rust
# use hakuban::prelude::*;
# use futures_util::stream::StreamExt;
# use futures_util::sink::SinkExt;
# tokio_test::block_on(async {
let exchange = LocalExchange::builder().build();
let _upstream = WebsocketConnector::new(exchange.clone(), "ws://127.0.0.1:3001")?;
let mut observe_contract = exchange.object((["some-tag"],"xxx")).observe();
let object_state = observe_contract.next().await.unwrap().next().await.unwrap();
println!("{:?}", object_state.data);
# Ok::<(),Box<dyn std::error::Error>>(())
# });
```
Process B:
```rust
# use hakuban::prelude::*;
# use futures_util::stream::StreamExt;
# use futures_util::sink::SinkExt;
# tokio_test::block_on(async {
let exchange = LocalExchange::builder().build();
let _upstream = WebsocketConnector::new(exchange.clone(), "ws://127.0.0.1:3001")?;
let mut expose_contract = exchange.object((["some-tag"],"xxx")).expose();
let object_state = ObjectState::new("🦀").json_serialize();
let mut object_state_sink = expose_contract.next().await.unwrap();
object_state_sink.send(object_state).await;
//waiting for Process A to receive the goods and exit
while object_state_sink.next().await.is_some() {};
# Ok::<(),Box<dyn std::error::Error>>(())
# });
```
# Cargo feature flags
* `default`: `["tokio-runtime"]`
* `tokio-runtime`: enables compilation of tokio.rs, the only runtime&network transport implemented so far. You'll need it if you want to communicate with other processes.
* `musl`: allows building statically-linked binaries: `cargo build --target x86_64-unknown-linux-musl --release --features musl`
# FFI / so
Check out documentation of the [ffi] module.
# Included binaries
There is one binary built by this crate - the `hakuban-router`.
It's basically a WebsocketListener surrounded by "main".
It accepts connections and will naturally route hakuban communication, like every other hakuban process with WebsocketListener would.
*/
/*
* bugs to fix
* LockCheck declarations are missing drop calls
* features
* everything listed in the README/roadmap
* gap-less expose handover
* avoid resending descriptors
* error!() in ffi functions, before better error handling gets implemented
* clean-up
* check if error returns in ffi don't cut through allocations
* de arc-ify selfs where possible
* merge *contract.rs into a single file
* ensure all structs hashable by pointers are Pin - or rather stop hashing by pointers
* C-NEWTYPE: use struct DataVersion(Vec<i64>); instead of type =, etc.
* C-DEBUG: implement debug on all public types
* make diff caching smarter
* dbg # format should be replaced with introspection mechanism
* add wake(), and all FFI calls to LockCheck, to figure out which ffi calls can deadlock on GILs
* consider
* can expose contract and observe contract be structs instead of traits? enums instead of dyns?
* maybe get rid of million hashbrown monomorphizations
* all "cores" could be wrappers with Pin<Arc<*Internal>> inside, to ensure id() method stability
* batch-lock
* split "links" into multiple locks
* rate limiting
* periodic rebalance
* drop multiple-tags-per-object support in favour of exactly-one-tag-per-object (groups, possibly tree-structured)
* rewrite protocol to explicitly control remote side interface lifetimes
* json_(de)serialize on stream
* checksum data
* data signing
* bindings
*/
pub use *;
pub use LocalExchange;
pub use ;
pub use ;
pub use ;
pub use WebsocketConnector;