hakuban 0.8.5

Data-object sharing library
Documentation
extern crate hakuban;
mod common;

use std::{env, error::Error, time::Instant};

use common::router::Router;
use futures::{SinkExt, StreamExt};
use hakuban::{tokio_runtime::WebsocketConnector, Exchange, JsonSerializeState, ObjectState};

#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn ping() -> Result<(), Box<dyn Error + Sync + Send>> {
	use std::time::Duration;

	let reps: i64 = env::var("REPS").map(|string| string.parse().unwrap()).unwrap_or(100000);
	let debug: bool = env::var("DEBUG").map(|string| string.parse().unwrap()).unwrap_or(false);

	let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();

	let router_common = Router::spawn("router", vec![], "nodelay=true", debug, 0).await;

	let router = router_common.clone();
	let a: tokio::task::JoinHandle<()> = tokio::spawn(async move {
		let hakuban = Exchange::new();
		let _upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone()).unwrap();
		let mut expose_contract = hakuban.object_expose_contract((vec!["-"], "a")).build();
		let mut observe_contract = hakuban.object_observe_contract((vec!["-"], "b")).build();
		let mut object_state_sink = expose_contract.next().await.unwrap();

		let started_at = Instant::now();
		let mut object_state_stream = observe_contract.next().await.unwrap();
		let mut last_received_version = -1;
		for i in 0..reps {
			object_state_sink.send(ObjectState::new("aaa").with_version(vec![i]).json_serialize()).await.unwrap();
			while last_received_version < i {
				last_received_version = object_state_stream.next().await.unwrap().version[0];
			}
		}
		let duration = (Instant::now() - started_at).as_nanos();
		println!(
			"Duration: {}ns,  Time per loop iteration: {}ns,  Loops per second: {}",
			duration,
			duration as f64 / reps as f64,
			reps as f64 * 1000000000f64 / (duration as f64)
		);
		tokio::time::sleep(Duration::from_millis(100)).await;
	});

	let router = router_common.clone();
	let b: tokio::task::JoinHandle<()> = tokio::spawn(async move {
		let hakuban = Exchange::new();
		let _upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone()).unwrap();
		let mut expose_contract = hakuban.object_expose_contract((vec!["-"], "b")).build();
		let mut observe_contract = hakuban.object_observe_contract((vec!["-"], "a")).build();
		let mut object_state_sink = expose_contract.next().await.unwrap();

		let mut object_state_stream = observe_contract.next().await.unwrap();
		let mut last_received_version = -1;
		for i in 0..reps {
			while last_received_version < i {
				last_received_version = object_state_stream.next().await.unwrap().version[0];
			}
			object_state_sink.send(ObjectState::new("aaa").with_version(vec![i]).json_serialize()).await.unwrap();
		}
		tokio::time::sleep(Duration::from_millis(100)).await;
	});

	a.await.unwrap();
	b.await.unwrap();

	Ok(())
}