extern crate hakuban;
mod common;
use std::{error::Error, time::Instant};
use common::Router;
use futures::SinkExt;
use futures_util::stream::StreamExt;
use hakuban::{format::JsonSerializeState, tokio_runtime::WebsocketConnector, LocalExchange, ObjectState};
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn ping() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
let router_common = Router::spawn("router", "nodelay=true", false).await;
let router = router_common.clone();
let a: tokio::task::JoinHandle<()> = tokio::spawn(async move {
let hakuban = LocalExchange::builder().with_name("a").build();
let _upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone()).unwrap();
let mut expose_contract = hakuban.object((vec!["-"], "a")).expose();
let mut observe_contract = hakuban.object((vec!["-"], "b")).observe();
let mut object_state_sink = expose_contract.next().await.unwrap();
let started_at = Instant::now();
let mut object_state_stream = None;
let mut last_received_version = -1;
for i in 0..100000 {
object_state_sink.send(ObjectState::new("aaa").with_version(vec![i]).json_serialize()).await.unwrap();
if object_state_stream.is_none() {
object_state_stream = Some(observe_contract.next().await.unwrap());
}
while last_received_version < i && object_state_sink.current().is_some() {
last_received_version = object_state_stream.as_mut().unwrap().next().await.unwrap().version[0];
}
}
let duration = (Instant::now() - started_at).as_nanos();
println!(
"Duration: {}ns, Time per loop iteration: {}ns, Loops per second: {}",
duration,
duration as f64 / 100000f64,
100000f64 * 1000000000f64 / (duration as f64)
);
});
let router = router_common.clone();
let b: tokio::task::JoinHandle<()> = tokio::spawn(async move {
let hakuban = LocalExchange::builder().with_name("a").build();
let _upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone()).unwrap();
let mut expose_contract = hakuban.object((vec!["-"], "b")).expose();
let mut observe_contract = hakuban.object((vec!["-"], "a")).observe();
let mut object_state_sink = expose_contract.next().await.unwrap();
let mut object_state_stream = None;
let mut last_received_version = -1;
for i in 0..100000 {
object_state_sink.send(ObjectState::new("aaa").with_version(vec![i]).json_serialize()).await.unwrap();
if object_state_stream.is_none() {
object_state_stream = Some(observe_contract.next().await.unwrap());
}
while last_received_version < i && object_state_sink.current().is_some() {
last_received_version = object_state_stream.as_mut().unwrap().next().await.unwrap().version[0];
}
}
});
a.await.unwrap();
b.await.unwrap();
Ok(())
}