extern crate hakuban;
mod common;
use std::{env, error::Error, time::Instant};
use common::router::Router;
use futures::{SinkExt, StreamExt};
use hakuban::{tokio_runtime::WebsocketConnector, Exchange, JsonSerializeState, ObjectState};
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn ping() -> Result<(), Box<dyn Error + Sync + Send>> {
use std::time::Duration;
let reps: i64 = env::var("REPS").map(|string| string.parse().unwrap()).unwrap_or(100000);
let debug: bool = env::var("DEBUG").map(|string| string.parse().unwrap()).unwrap_or(false);
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
let router_common = Router::spawn("router", vec![], "nodelay=true", debug, 0).await;
let router = router_common.clone();
let a: tokio::task::JoinHandle<()> = tokio::spawn(async move {
let hakuban = Exchange::new();
let _upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone()).unwrap();
let mut expose_contract = hakuban.object_expose_contract((vec!["-"], "a")).build();
let mut observe_contract = hakuban.object_observe_contract((vec!["-"], "b")).build();
let mut object_state_sink = expose_contract.next().await.unwrap();
let started_at = Instant::now();
let mut object_state_stream = observe_contract.next().await.unwrap();
let mut last_received_version = -1;
for i in 0..reps {
object_state_sink.send(ObjectState::new("aaa").with_version(vec![i]).json_serialize()).await.unwrap();
while last_received_version < i {
last_received_version = object_state_stream.next().await.unwrap().version[0];
}
}
let duration = (Instant::now() - started_at).as_nanos();
println!(
"Duration: {}ns, Time per loop iteration: {}ns, Loops per second: {}",
duration,
duration as f64 / reps as f64,
reps as f64 * 1000000000f64 / (duration as f64)
);
tokio::time::sleep(Duration::from_millis(100)).await;
});
let router = router_common.clone();
let b: tokio::task::JoinHandle<()> = tokio::spawn(async move {
let hakuban = Exchange::new();
let _upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone()).unwrap();
let mut expose_contract = hakuban.object_expose_contract((vec!["-"], "b")).build();
let mut observe_contract = hakuban.object_observe_contract((vec!["-"], "a")).build();
let mut object_state_sink = expose_contract.next().await.unwrap();
let mut object_state_stream = observe_contract.next().await.unwrap();
let mut last_received_version = -1;
for i in 0..reps {
while last_received_version < i {
last_received_version = object_state_stream.next().await.unwrap().version[0];
}
object_state_sink.send(ObjectState::new("aaa").with_version(vec![i]).json_serialize()).await.unwrap();
}
tokio::time::sleep(Duration::from_millis(100)).await;
});
a.await.unwrap();
b.await.unwrap();
Ok(())
}