hakuban 0.7.2

Data-object sharing library
Documentation
extern crate hakuban;
mod common;

use std::{error::Error, time::Instant};

use common::Router;
use futures::SinkExt;
use futures_util::stream::StreamExt;
use hakuban::{format::JsonSerializeState, tokio_runtime::WebsocketConnector, LocalExchange, ObjectState};

#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn ping() -> Result<(), Box<dyn Error + Sync + Send>> {
	let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();

	let router_common = Router::spawn("router", "nodelay=true", false).await;

	let router = router_common.clone();
	let a: tokio::task::JoinHandle<()> = tokio::spawn(async move {
		let hakuban = LocalExchange::builder().with_name("a").build();
		let _upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone()).unwrap();
		let mut expose_contract = hakuban.object((vec!["-"], "a")).expose();
		let mut observe_contract = hakuban.object((vec!["-"], "b")).observe();
		let mut object_state_sink = expose_contract.next().await.unwrap();

		let started_at = Instant::now();
		let mut object_state_stream = None;
		let mut last_received_version = -1;
		for i in 0..100000 {
			//info!("{}",i);
			object_state_sink.send(ObjectState::new("aaa").with_version(vec![i]).json_serialize()).await.unwrap();
			if object_state_stream.is_none() {
				object_state_stream = Some(observe_contract.next().await.unwrap());
			}
			while last_received_version < i && object_state_sink.current().is_some() {
				last_received_version = object_state_stream.as_mut().unwrap().next().await.unwrap().version[0];
			}
		}
		let duration = (Instant::now() - started_at).as_nanos();
		println!(
			"Duration: {}ns,  Time per loop iteration: {}ns,  Loops per second: {}",
			duration,
			duration as f64 / 100000f64,
			100000f64 * 1000000000f64 / (duration as f64)
		);
	});

	let router = router_common.clone();
	let b: tokio::task::JoinHandle<()> = tokio::spawn(async move {
		let hakuban = LocalExchange::builder().with_name("a").build();
		let _upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone()).unwrap();
		let mut expose_contract = hakuban.object((vec!["-"], "b")).expose();
		let mut observe_contract = hakuban.object((vec!["-"], "a")).observe();
		let mut object_state_sink = expose_contract.next().await.unwrap();

		let mut object_state_stream = None;
		let mut last_received_version = -1;
		for i in 0..100000 {
			object_state_sink.send(ObjectState::new("aaa").with_version(vec![i]).json_serialize()).await.unwrap();
			if object_state_stream.is_none() {
				object_state_stream = Some(observe_contract.next().await.unwrap());
			}
			while last_received_version < i && object_state_sink.current().is_some() {
				last_received_version = object_state_stream.as_mut().unwrap().next().await.unwrap().version[0];
			}
		}
	});

	a.await.unwrap();
	b.await.unwrap();

	Ok(())
}