extern crate hakuban;
#[cfg(not(target_family = "wasm"))]
mod common;
use std::{error::Error, sync::{Arc, Mutex}};
use futures::SinkExt;
use futures_util::stream::StreamExt;
#[cfg(not(target_family = "wasm"))]
use hakuban::{object::ObjectState, tokio_runtime::WebsocketConnector, DataSynchronized, LocalExchange, ObjectExposeContract};
use instant::Duration;
use serde_json::json;
#[cfg(not(target_family = "wasm"))]
use crate::{common::{Router, Shuffle}, hakuban::format::{JsonDeserializeState, JsonSerializeState}};
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn routed_object_object_delivery() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(4)
.setup(|| async {
let router = Router::spawn("router", "", true).await;
Ok(router)
})
.task("regu", |mut task, router| async move {
let hakuban_a = LocalExchange::builder().with_name(task.name).build();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), router.url.clone())?;
task.fuzzy_barrier().await;
let mut object_observe = hakuban_a.object(json!({})).observe();
task.fuzzy_barrier().await;
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, router| async move {
let hakuban_b = LocalExchange::builder().with_name(task.name).build();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), router.url.clone())?;
task.fuzzy_barrier().await;
let mut object_expose: ObjectExposeContract = hakuban_b.object(json!({})).expose();
task.fuzzy_barrier().await;
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn routed_object_object_termination() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(2)
.setup(|| async {
let router = Router::spawn("router", "", true).await;
Ok(router)
})
.task("regu", |mut task, router| async move {
let hakuban_a = LocalExchange::builder().with_name(task.name).build();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), router.url.clone())?;
let mut object_observe = hakuban_a.object(json!({})).observe();
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
drop(object_observe);
let state = object_state_stream.next().await;
assert!(matches!(state, None));
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, router| async move {
let hakuban_b = LocalExchange::builder().with_name(task.name).build();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), router.url.clone())?;
let mut object_expose = hakuban_b.object(json!({})).expose();
let mut object_state_sinks = object_expose.next().await.unwrap();
object_state_sinks.next().await.unwrap();
object_state_sinks.send(ObjectState::new(&"xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
task.fuzzy_barrier().await;
drop(object_expose);
let object_state_sink = object_state_sinks.next().await;
assert!(matches!(object_state_sink, None));
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn routed_object_object_reassignment() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(4)
.setup(|| async {
let router = Router::spawn("router", "", true).await;
Ok(router)
})
.task("regu", |mut task, router| async move {
let hakuban_a = LocalExchange::builder().with_name(task.name).build();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), router.url.clone())?;
let mut object_observe = hakuban_a.object(json!({})).observe();
task.fuzzy_barrier().await;
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
drop(state);
drop(object_state_stream);
drop(object_observe);
task.fuzzy_barrier().await;
let mut object_observe = hakuban_a.object(json!({})).observe();
let mut object_state_stream = object_observe.next().await.unwrap();
let mut state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
while state.version == [1] {
state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
}
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, router| async move {
let hakuban_b = LocalExchange::builder().with_name(task.name).build();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), router.url.clone())?;
let mut object_expose = hakuban_b.object(json!({})).expose();
task.fuzzy_barrier().await;
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
let object_state_sink_state = object_state_sink.next().await;
assert!(matches!(object_state_sink_state, None));
drop(object_state_sink);
task.fuzzy_barrier().await;
let mut object_state_sink = object_expose.next().await.unwrap();
let object_state_sink_state = object_state_sink.next().await;
assert!(matches!(object_state_sink_state, Some(_)));
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn routed_object_tag_delivery() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(4)
.setup(|| async {
let router = Router::spawn("router", "", true).await;
Ok(router)
})
.task("regu", |mut task, router| async move {
let hakuban_a = LocalExchange::builder().with_name(task.name).build();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), router.url.clone())?;
task.fuzzy_barrier().await;
let mut tag_observe = hakuban_a.tag(json!({})).observe();
task.fuzzy_barrier().await;
let mut object_state_stream = tag_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, router| async move {
let hakuban_b = LocalExchange::builder().with_name(task.name).build();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), router.url.clone())?;
task.fuzzy_barrier().await;
let mut object_expose = hakuban_b.object(([json!({})], json!({}))).expose();
task.fuzzy_barrier().await;
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn routed_tag_object_delivery() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(4)
.setup(|| async {
let router = Router::spawn("router", "", true).await;
Ok(router)
})
.task("regu", |mut task, router| async move {
let hakuban_a = LocalExchange::builder().with_name(task.name).build();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), router.url.clone())?;
task.fuzzy_barrier().await;
let mut object_observe = hakuban_a.object(([json!({})], json!({}))).observe();
task.fuzzy_barrier().await;
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, router| async move {
let hakuban_b = LocalExchange::builder().with_name(task.name).build();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), router.url.clone())?;
task.fuzzy_barrier().await;
let mut tag_expose = hakuban_b.tag(json!({})).expose();
task.fuzzy_barrier().await;
let mut object_state_sink = tag_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn object_observe_contract_termination_stops_object_state_stream_stream() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(2)
.setup(|| async {
let hakuban_a = LocalExchange::builder().with_name("common").build();
let object_observe_contract = Arc::new(Mutex::new(hakuban_a.object(json!({})).observe()));
Ok(object_observe_contract)
})
.task("regu", |mut task, object_observe_contract| async move {
let mut object_observe_contract = object_observe_contract.lock().unwrap().clone();
task.fuzzy_barrier().await;
let object_state_stream = object_observe_contract.next().await;
assert!(matches!(object_state_stream, None));
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, object_observe_contract| async move {
let mut object_observe_contract = object_observe_contract.lock().unwrap().clone();
task.fuzzy_barrier().await;
object_observe_contract.terminate();
task.fuzzy_barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn object_observe_contract_termination_causes_unassign() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(2)
.setup(|| async {
let hakuban = LocalExchange::builder().with_name("common").build();
Ok(hakuban)
})
.task("regu", |mut task, hakuban| async move {
let mut object_observe_contract = hakuban.object(json!({})).observe();
task.fuzzy_barrier().await;
object_observe_contract.terminate();
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, hakuban| async move {
let mut object_expose_contract = hakuban.object(json!({})).expose();
let mut object_state_sink = object_expose_contract.next().await.unwrap();
let object_state_sink_state = object_state_sink.next().await;
assert!(matches!(object_state_sink_state, Some(_)));
task.fuzzy_barrier().await;
let object_state_sink_state = object_state_sink.next().await;
assert!(matches!(object_state_sink_state, None));
task.fuzzy_barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn routed_load_balancing_on_new_exposer_appearance() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0)
.setup(|| async {
let router = Router::spawn("router", "", true).await;
Ok(router)
})
.task("regu", |mut task, router| async move {
let hakuban_a = LocalExchange::builder().with_name(task.name).build();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), router.url.clone())?;
let _object_observe_contracts: Vec<_> = (0..10).map(|i| hakuban_a.object((vec!["x"], json!(i))).observe()).collect();
task.barrier().await;
task.barrier().await;
Ok(())
})
.task("riko", |mut task, router| async move {
let hakuban_b = LocalExchange::builder().with_name(task.name).build();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), router.url.clone())?;
let mut tag_expose = hakuban_b.tag(json!("x")).expose();
while tag_expose.ready().len() != 10 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
task.barrier().await;
while tag_expose.ready().len() != 5 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
task.barrier().await;
Ok(())
})
.task("nanachi", |mut task, router| async move {
let hakuban_c = LocalExchange::builder().with_name(task.name).build();
let _upstream_c = WebsocketConnector::new(hakuban_c.clone(), router.url.clone())?;
task.barrier().await;
let mut tag_expose = hakuban_c.tag(json!("x")).expose();
while tag_expose.ready().len() != 5 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
task.barrier().await;
Ok(())
})
.run(Duration::from_millis(2000000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn local_instant_object_state_sink_drop_panic() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0)
.setup(|| async { Ok(()) })
.task("regu", |task, _| async move {
let hakuban = LocalExchange::builder().with_name(task.name).build();
let mut _object_observe = hakuban.object(json!({})).observe();
let mut object_expose = hakuban.object(json!({})).expose();
let mut _object_state_sink = object_expose.next().await.unwrap();
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn local_desynchronize_on_object_state_sink_drop() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0)
.setup(|| async {
let hakuban = LocalExchange::builder().with_name("shared").build();
let barrier = Arc::new(tokio::sync::Barrier::new(2));
Ok((barrier, hakuban))
})
.task("regu", |_task, (barrier, hakuban)| async move {
let mut object_observe = hakuban.object(json!({})).observe();
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
barrier.wait().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert!(matches!(state.synchronized, DataSynchronized::LastAt(_)));
barrier.wait().await;
Ok(())
})
.task("riko", |_task, (barrier, hakuban)| async move {
let mut object_expose = hakuban.object(json!({})).expose();
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
barrier.wait().await;
drop(object_state_sink);
drop(object_expose);
barrier.wait().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn routed_object_tag_delivery_with_reconnection() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(5)
.setup(|| async {
let router = Router::spawn("router", "", true).await;
Ok(router)
})
.task("regu", |mut task, router| async move {
let hakuban_a = LocalExchange::builder().with_name(task.name).build();
let upstream_a = WebsocketConnector::new(hakuban_a.clone(), router.url.clone())?;
task.fuzzy_barrier().await;
let mut tag_observe = hakuban_a.tag(json!({})).observe();
task.fuzzy_barrier().await;
let mut object_state_stream = tag_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
drop(object_state_stream);
drop(tag_observe);
drop(upstream_a);
drop(hakuban_a);
let hakuban_a = LocalExchange::builder().with_name(task.name).build();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), router.url.clone())?;
let mut tag_observe = hakuban_a.tag(json!({})).observe();
let mut object_state_stream = tag_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
task.fuzzy_barrier().await;
loop {
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
if state.version[0] == 3 {
assert_eq!(state.version, &[3]);
assert_eq!(state.data, "xxxz");
break;
}
}
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, router| async move {
let hakuban_b = LocalExchange::builder().with_name(task.name).build();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), router.url.clone())?;
task.fuzzy_barrier().await;
let mut object_expose = hakuban_b.object(([json!({})], json!({}))).expose();
task.fuzzy_barrier().await;
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
object_state_sink.send(ObjectState::new("xxxz").with_version(vec![3]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn routed_object_tag_delivery_with_late_connection() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(3)
.setup(|| async {
let router = Router::spawn("router", "", true).await;
Ok(router)
})
.task("regu", |mut task, router| async move {
let hakuban_a = LocalExchange::builder().with_name(task.name).build();
let mut tag_observe = hakuban_a.tag(json!({})).observe();
let mut object_expose = hakuban_a.object(([json!({})], json!(1))).expose();
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.send(ObjectState::new("1").with_version(vec![1]).json_serialize()).await.unwrap();
let mut _regu_object_state_stream = tag_observe.next().await.unwrap();
task.fuzzy_barrier().await;
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), router.url.clone())?;
task.fuzzy_barrier().await;
let mut riko_object_state_stream = tag_observe.next().await.unwrap();
let state = riko_object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "2");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, router| async move {
let hakuban_b = LocalExchange::builder().with_name(task.name).build();
let mut tag_observe = hakuban_b.tag(json!({})).observe();
let mut object_expose = hakuban_b.object(([json!({})], json!(2))).expose();
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.send(ObjectState::new("2").with_version(vec![1]).json_serialize()).await.unwrap();
let mut _riko_object_state_stream = tag_observe.next().await.unwrap();
task.fuzzy_barrier().await;
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), router.url.clone())?;
task.fuzzy_barrier().await;
let mut regu_object_state_stream = tag_observe.next().await.unwrap();
let state = regu_object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "1");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn leader_election_pattern_object_object() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0)
.setup(|| async {
let router = Router::spawn("router", "", true).await;
Ok(router)
})
.task("regu", |mut task, router| async move {
let hakuban = LocalExchange::builder().with_name(task.name).build();
let upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone())?;
let object_observe = hakuban.object(([json!({})], json!({}))).observe();
let mut object_expose = hakuban.object(([json!({})], json!({}))).expose();
tokio::time::sleep(Duration::from_millis(100)).await;
task.barrier().await;
task.barrier().await;
while object_expose.ready().is_none() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
router.print_all().await;
drop(upstream);
drop(object_observe);
drop(object_expose);
drop(hakuban);
tokio::time::sleep(Duration::from_millis(100)).await;
router.print_all().await;
task.barrier().await;
Ok(())
})
.task("riko", |mut task, router| async move {
let hakuban = LocalExchange::builder().with_name(task.name).build();
let _upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone())?;
task.barrier().await;
let _object_observe = hakuban.object(([json!({})], json!({}))).observe();
let mut object_expose = hakuban.object(([json!({})], json!({}))).expose();
tokio::time::sleep(Duration::from_millis(100)).await;
task.barrier().await;
while object_expose.ready().is_some() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
while object_expose.ready().is_none() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[cfg(not(target_family = "wasm"))]
#[tokio::test]
async fn leader_election_pattern_tag_object() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0)
.setup(|| async {
let router = Router::spawn("router", "", true).await;
Ok(router)
})
.task("regu", |mut task, router| async move {
let hakuban = LocalExchange::builder().with_name(task.name).build();
let upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone())?;
let object_observe = hakuban.object(([json!({})], json!({}))).observe();
let mut tag_expose = hakuban.tag(json!({})).expose();
tokio::time::sleep(Duration::from_millis(100)).await;
task.barrier().await;
task.barrier().await;
while tag_expose.ready().is_empty() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
router.print_all().await;
drop(upstream);
drop(object_observe);
drop(tag_expose);
drop(hakuban);
tokio::time::sleep(Duration::from_millis(100)).await;
router.print_all().await;
task.barrier().await;
Ok(())
})
.task("riko", |mut task, router| async move {
let hakuban = LocalExchange::builder().with_name(task.name).build();
let _upstream = WebsocketConnector::new(hakuban.clone(), router.url.clone())?;
task.barrier().await;
let _object_observe = hakuban.object(([json!({})], json!({}))).observe();
let mut tag_expose = hakuban.tag(json!({})).expose();
tokio::time::sleep(Duration::from_millis(100)).await;
task.barrier().await;
while !tag_expose.ready().is_empty() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
while tag_expose.ready().is_empty() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}