extern crate hakuban;
mod common;
use std::{
error::Error,
future::ready,
sync::{
atomic::{AtomicI64, AtomicU64},
Arc, Mutex,
},
time::{Duration, Instant},
};
use common::{network::Topology, shuffler::Shuffle};
use futures::{
channel::oneshot,
stream::{once, SelectAll},
SinkExt, StreamExt,
};
use hakuban::{
monitor::ExchangeSnapshot, tokio_runtime::WebsocketConnector, DataSynchronized, Exchange, JsonDeserializeError, JsonDeserializeState, JsonSerializeState,
ObjectExposeContract, ObjectState,
};
use serde_json::json;
use tokio::time::timeout;
#[tokio::test]
async fn routed_object_object_desynchronize_on_sink_drop() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(5, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters]) .setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
task.fuzzy_barrier().await;
let mut object_observe = hakuban_a.object_observe_contract(json!({})).build();
task.fuzzy_barrier().await;
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
assert_eq!(state.synchronized, DataSynchronized::Now);
task.fuzzy_barrier().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_ne!(state.synchronized, DataSynchronized::Now);
task.fuzzy_barrier().await;
drop(object_observe);
task.barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
task.fuzzy_barrier().await;
let mut object_expose: ObjectExposeContract = hakuban_b.object_expose_contract(json!({})).build();
task.fuzzy_barrier().await;
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
drop(object_state_sink);
task.fuzzy_barrier().await;
while timeout(Duration::from_millis(1), object_expose.next()).await.is_ok() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_object_data_format_change_gets_propagated() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters]) .setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
let mut object_observe = hakuban_a.object_observe_contract(json!({})).build();
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.barrier().await;
let raw_state = object_state_stream.next().await.unwrap();
assert_eq!(raw_state.format, vec!["abc".to_string()]);
let state = raw_state.try_json_deserialize::<String>();
assert!(matches!(state, Err(JsonDeserializeError::WrongFormat)));
task.barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
let mut object_expose: ObjectExposeContract = hakuban_b.object_expose_contract(json!({})).build();
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.barrier().await;
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize().with_format(["abc".to_string()])).await.unwrap();
task.barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_object_object_terminate_by_object_observer_drop() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(2, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
task.fuzzy_barrier().await;
let object_observe = hakuban_a.object_observe_contract(json!({})).build();
task.fuzzy_barrier().await;
drop(object_observe);
task.barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
task.fuzzy_barrier().await;
let mut object_expose: ObjectExposeContract = hakuban_b.object_expose_contract(json!({})).build();
while timeout(Duration::from_millis(1), object_expose.next()).await.is_err() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.fuzzy_barrier().await;
while timeout(Duration::from_millis(1), object_expose.next()).await.is_ok() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_tag_object_terminate_by_tag_observer_drop() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(2, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
task.fuzzy_barrier().await;
let tag_observe = hakuban_a.tag_observe_contract(json!({})).build();
task.fuzzy_barrier().await;
drop(tag_observe);
task.barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
task.fuzzy_barrier().await;
let mut object_expose: ObjectExposeContract = hakuban_b.object_expose_contract(([json!({})], json!({}))).build();
while timeout(Duration::from_millis(1), object_expose.next()).await.is_err() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.fuzzy_barrier().await;
while timeout(Duration::from_millis(1), object_expose.next()).await.is_ok() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_object_object_termination() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(2, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
let mut object_observe = hakuban_a.object_observe_contract(json!({})).build();
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
drop(object_observe);
let state = object_state_stream.next().await;
assert!(state.is_none());
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
let mut object_expose = hakuban_b.object_expose_contract(json!({})).build();
let mut object_state_sinks = object_expose.next().await.unwrap();
object_state_sinks.next().await.unwrap();
object_state_sinks.send(ObjectState::new(&"xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
task.fuzzy_barrier().await;
drop(object_expose);
let object_state_sink = object_state_sinks.next().await;
assert!(object_state_sink.is_none());
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_object_object_reassignment_on_reobserve() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(4, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
let mut object_observe = hakuban_a.object_observe_contract(json!({})).build();
task.fuzzy_barrier().await;
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
drop(state);
drop(object_state_stream);
drop(object_observe);
task.fuzzy_barrier().await;
let mut object_observe = hakuban_a.object_observe_contract(json!({})).build();
let mut object_state_stream = object_observe.next().await.unwrap();
let mut state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
while state.version == [1] {
state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
}
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
let mut object_expose = hakuban_b.object_expose_contract(json!({})).build();
task.fuzzy_barrier().await;
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
let object_state_sink_state = object_state_sink.next().await;
assert!(object_state_sink_state.is_none());
drop(object_state_sink);
task.fuzzy_barrier().await;
let mut object_state_sink = object_expose.next().await.unwrap();
let object_state_sink_state = object_state_sink.next().await;
assert!(object_state_sink_state.is_some());
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_object_tag_reassignment_on_reexpose() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(2, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
let mut object_observe = hakuban_a.object_observe_contract(([json!({})], json!({}))).build();
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.barrier().await;
task.fuzzy_barrier().await;
let mut state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
while state.version == [1] {
state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
}
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
let mut tag_expose = hakuban_b.tag_expose_contract(json!({})).build();
let mut object_state_sink = tag_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.barrier().await;
drop(object_state_sink);
drop(tag_expose);
task.fuzzy_barrier().await;
let mut tag_expose = hakuban_b.tag_expose_contract(json!({})).build();
let mut object_state_sink = tag_expose.next().await.unwrap();
let object_state_sink_state = object_state_sink.next().await;
assert!(object_state_sink_state.is_some());
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_object_tag_reassignment_on_unexpose() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(1, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("nanachi", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
let mut object_observe = hakuban_a.object_observe_contract(([json!({})], json!({}))).build();
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.barrier().await;
task.barrier().await;
let mut state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
while state.version == [1] {
state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
}
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
task.fuzzy_barrier().await;
Ok(())
})
.task("regu", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
let mut tag_expose = hakuban_b.tag_expose_contract(json!({})).build();
let mut object_state_sink = tag_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.barrier().await;
task.barrier().await;
drop(object_state_sink);
drop(tag_expose);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[3].url.clone())?;
task.barrier().await;
let mut tag_expose = hakuban_b.tag_expose_contract(json!({})).build();
tokio::time::sleep(Duration::from_millis(100)).await;
{
let hakuban_main = Exchange::new();
let _upstream_main = WebsocketConnector::new(hakuban_main.clone(), routers[0].url.clone())?;
let mut exchange_monitor_contract = hakuban_main.object_observe_contract((vec![json!("monitor")], json!("exchange"))).build();
let mut exchange_snapshots = exchange_monitor_contract.next().await.unwrap();
while exchange_snapshots
.next()
.await
.unwrap()
.json_deserialize::<ExchangeSnapshot>()
.data
.objects
.iter()
.find(|object_snapshot| object_snapshot.descriptor.json == json!({}))
.map(|object_snapshot| object_snapshot.object_expose_contract_count == 2)
.unwrap_or(false)
{
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
task.barrier().await;
let mut object_state_sink = tag_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
Ok(())
})
.teardown(|routers| Topology::teardown_network(routers))
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_object_tag_delivery() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(4, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
task.fuzzy_barrier().await;
let mut tag_observe = hakuban_a.tag_observe_contract(json!({})).build();
task.fuzzy_barrier().await;
let mut object_state_stream = tag_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
task.fuzzy_barrier().await;
while !matches!(timeout(Duration::from_millis(1), object_state_stream.next()).await, Ok(None)) {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
task.fuzzy_barrier().await;
let mut object_expose = hakuban_b.object_expose_contract(([json!({})], json!({}))).build();
task.fuzzy_barrier().await;
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
drop(object_expose);
task.barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_tag_object_delivery() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(4, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
task.fuzzy_barrier().await;
let mut object_observe = hakuban_a.object_observe_contract(([json!({})], json!({}))).build();
task.fuzzy_barrier().await;
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
task.fuzzy_barrier().await;
let mut tag_expose = hakuban_b.tag_expose_contract(json!({})).build();
task.fuzzy_barrier().await;
let mut object_state_sink = tag_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_known_state_gets_delivered_even_if_exposer_doesnt_exist() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(1, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("nanachi", |mut task, routers| async move {
let hakuban = Exchange::new();
let _upstream = WebsocketConnector::new(hakuban.clone(), routers[1].url.clone())?;
let mut object_expose = hakuban.object_expose_contract(([json!({})], json!({}))).build();
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.barrier().await;
drop(object_state_sink);
task.fuzzy_barrier().await;
task.barrier().await;
Ok(())
})
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[2].url.clone())?;
let mut object_observe = hakuban_a.object_observe_contract(([json!({})], json!({}))).build();
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.synchronized, DataSynchronized::Now);
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.barrier().await;
while object_state_stream.next().await.unwrap().json_deserialize::<String>().synchronized == DataSynchronized::Now {}
task.fuzzy_barrier().await;
task.barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban = Exchange::new();
let _upstream = WebsocketConnector::new(hakuban.clone(), routers[3].url.clone())?;
task.barrier().await;
task.fuzzy_barrier().await;
let mut object_observe = hakuban.object_observe_contract(([json!({})], json!({}))).build();
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_ne!(state.synchronized, DataSynchronized::Now);
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn object_observe_contract_drop_stops_object_state_stream() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(2, vec![0])
.setup(|_task_count, _| async {
let hakuban_a = Exchange::new();
let mut object_observe_contract = hakuban_a.object_observe_contract(json!({})).build();
let stream = object_observe_contract.next().await.unwrap();
Ok((Arc::new(Mutex::new(Some(object_observe_contract))), Arc::new(Mutex::new(Some(stream)))))
})
.task("regu", |mut task, (_object_observe_contract, stream)| async move {
let mut stream = stream.lock().unwrap().take().unwrap();
task.fuzzy_barrier().await;
let object_state_stream = stream.next().await;
assert!(object_state_stream.is_none());
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, (object_observe_contract, _stream)| async move {
let object_observe_contract = object_observe_contract.lock().unwrap().take().unwrap();
task.fuzzy_barrier().await;
drop(object_observe_contract);
task.fuzzy_barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn object_observe_contract_termination_causes_unassign() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(2, vec![0])
.setup(|_task_count, _| async {
let hakuban = Exchange::new();
Ok(hakuban)
})
.task("regu", |mut task, hakuban| async move {
let object_observe_contract = hakuban.object_observe_contract(json!({})).build();
task.fuzzy_barrier().await;
drop(object_observe_contract);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, hakuban| async move {
let mut object_expose_contract = hakuban.object_expose_contract(json!({})).build();
let mut object_state_sink = object_expose_contract.next().await.unwrap();
let object_state_sink_state = object_state_sink.next().await;
assert!(object_state_sink_state.is_some());
task.fuzzy_barrier().await;
let object_state_sink_state = object_state_sink.next().await;
assert!(object_state_sink_state.is_none());
task.fuzzy_barrier().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_load_balancing_on_new_exposer_appearance() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
let _object_observe_contracts: Vec<_> = (0..10).map(|i| hakuban_a.object_observe_contract((vec!["x"], json!(i))).build()).collect();
task.barrier().await;
task.barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
let mut tag_expose = hakuban_b.tag_expose_contract(json!("x")).build();
let mut terminations = SelectAll::new();
while terminations.len() < 10 {
let stream = tag_expose.next().await.unwrap();
let termination_marker = once(async {});
terminations.push(Box::pin(stream.filter_map(|_| ready(None)).chain(termination_marker)));
}
task.barrier().await;
terminations.skip(4).next().await;
task.barrier().await;
Ok(())
})
.task("nanachi", |mut task, routers| async move {
let hakuban_c = Exchange::new();
let _upstream_c = WebsocketConnector::new(hakuban_c.clone(), routers[3].url.clone())?;
task.barrier().await;
let mut tag_expose = hakuban_c.tag_expose_contract(json!("x")).build();
let mut streams = Vec::new();
while streams.len() < 5 {
streams.push(tag_expose.next().await);
}
task.barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_periodic_load_balancing() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move {
let (sender, receiver) = oneshot::channel::<Vec<usize>>();
Ok((network_topology.setup_network(task_count).await, Arc::new(Mutex::new(Some(sender))), Arc::new(Mutex::new(Some(receiver)))))
})
.task("regu", |mut task, (routers, _sender, receiver)| async move {
let receiver = receiver.lock().unwrap().take().unwrap();
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
let mut object_observe_contracts: Vec<_> = (0..10).map(|i| Some(hakuban_a.object_observe_contract((vec!["x"], json!(i))).build())).collect();
task.barrier().await;
let rikos_streams = receiver.await.unwrap();
for i in rikos_streams {
drop(object_observe_contracts[i].take());
}
tokio::time::sleep(Duration::from_millis(1000)).await;
task.barrier().await;
Ok(())
})
.task("riko", |mut task, (routers, sender, _receiver)| async move {
let sender = sender.lock().unwrap().take().unwrap();
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
let mut tag_expose = hakuban_b.tag_expose_contract(json!("x")).build();
tokio::time::sleep(Duration::from_millis(3000)).await;
let mut assigned_objects = vec![];
while let Ok(Some(mut stream)) = timeout(Duration::from_millis(100), tag_expose.next()).await {
loop {
match timeout(Duration::from_millis(100), stream.next()).await {
Ok(Some(_)) => {}
Ok(None) => break,
Err(_elapsed) => {
assigned_objects.push((stream.descriptor().json.as_i64().unwrap() as usize, stream));
break;
}
}
}
}
assert!(assigned_objects.len() == 5);
sender.send(assigned_objects.iter().map(|(i, _)| *i).collect::<Vec<usize>>()).unwrap();
task.barrier().await;
let mut new_assignments = vec![];
for _ in 0..2 {
let new_sink = tag_expose.next().await.unwrap();
new_assignments.push((new_sink.descriptor().json.as_i64().unwrap() as usize, new_sink));
}
for (i, _stream) in new_assignments {
assert!(!assigned_objects.iter().any(|(j, _)| *j == i));
}
task.barrier().await;
Ok(())
})
.task("nanachi", |mut task, (routers, _sender, _receiver)| async move {
let hakuban_c = Exchange::new();
let _upstream_c = WebsocketConnector::new(hakuban_c.clone(), routers[3].url.clone())?;
let mut tag_expose = hakuban_c.tag_expose_contract(json!("x")).build();
tokio::time::sleep(Duration::from_millis(3000)).await;
let mut assigned_objects = vec![];
while let Ok(Some(mut stream)) = timeout(Duration::from_millis(100), tag_expose.next()).await {
loop {
match timeout(Duration::from_millis(100), stream.next()).await {
Ok(Some(_)) => {}
Ok(None) => break,
Err(_elapsed) => {
assigned_objects.push((stream.descriptor().json.as_i64().unwrap(), stream));
break;
}
}
}
}
assert!(assigned_objects.len() == 5);
task.barrier().await;
tokio::time::sleep(Duration::from_millis(500)).await;
task.barrier().await;
Ok(())
})
.teardown(|(routers, _sender, _receiver)| Topology::teardown_network(routers))
.run(Duration::from_millis(6000))
.await
}
#[tokio::test]
async fn routed_load_limit_simple() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters]) .setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
let _object_observe_contracts: Vec<_> = (0..10).map(|i| hakuban_a.object_observe_contract((vec!["x"], json!(i))).build()).collect();
task.barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let mut tag_expose = hakuban_b.tag_expose_contract(json!("x")).build();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), (routers[2].url.clone().as_str().to_string() + "#load-limit=3").as_str())?;
let start = Instant::now();
let mut sinks = Vec::new();
while Instant::now() - start < Duration::from_secs(6) {
if let Ok(Some(sink)) = timeout(Duration::from_millis(100), tag_expose.next()).await {
sinks.push((Instant::now(), sink));
}
}
assert_eq!(sinks.iter().filter(|(timestamp, _)| *timestamp < start + Duration::from_millis(1000)).count(), 3);
assert_eq!(sinks.len(), 3);
task.barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(12000))
.await
}
#[tokio::test]
async fn routed_load_limit_gradual() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters]) .setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
let _object_observe_contracts: Vec<_> = (0..10).map(|i| hakuban_a.object_observe_contract((vec!["x"], json!(i))).build()).collect();
task.barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let mut tag_expose = hakuban_b.tag_expose_contract(json!("x")).build();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), (routers[2].url.clone().as_str().to_string() + "#load-limit=2:0-2:5-5:7").as_str())?;
let start = Instant::now();
let mut sinks = Vec::new();
while Instant::now() - start < Duration::from_secs(6) {
if let Ok(Some(sink)) = timeout(Duration::from_millis(100), tag_expose.next()).await {
sinks.push((Instant::now(), sink));
}
}
assert_eq!(sinks.iter().filter(|(timestamp, _)| *timestamp < start + Duration::from_millis(1000)).count(), 0);
assert!(sinks.iter().filter(|(timestamp, _)| *timestamp < start + Duration::from_millis(3500)).count() > 4);
assert!(sinks.iter().filter(|(timestamp, _)| *timestamp < start + Duration::from_millis(3500)).count() < 7);
assert_eq!(sinks.iter().filter(|(timestamp, _)| *timestamp < start + Duration::from_millis(6000)).count(), 7);
assert_eq!(sinks.len(), 7);
task.barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(12000))
.await
}
#[tokio::test]
async fn local_instant_object_state_sink_drop_panic() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0, vec![0])
.setup(|_, _| async { Ok(()) })
.task("regu", |_task, _| async move {
let hakuban = Exchange::new();
let mut _object_observe = hakuban.object_observe_contract(json!({})).build();
let mut object_expose = hakuban.object_expose_contract(json!({})).build();
let mut _object_state_sink = object_expose.next().await.unwrap();
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn local_desynchronize_on_object_state_sink_drop() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0, vec![0])
.setup(|_, _| async {
let hakuban = Exchange::new();
let barrier = Arc::new(tokio::sync::Barrier::new(2));
Ok((barrier, hakuban))
})
.task("regu", |_task, (barrier, hakuban)| async move {
let mut object_observe = hakuban.object_observe_contract(json!({})).build();
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
barrier.wait().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert!(matches!(state.synchronized, DataSynchronized::LastAt(_)));
barrier.wait().await;
Ok(())
})
.task("riko", |_task, (barrier, hakuban)| async move {
let mut object_expose = hakuban.object_expose_contract(json!({})).build();
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
barrier.wait().await;
drop(object_state_sink);
drop(object_expose);
barrier.wait().await;
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn local_tag_exposer_object_observe_propagation() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(2, vec![0])
.setup(|_, _| async {
let hakuban = Exchange::new();
Ok(hakuban)
})
.task("regu", |mut task, hakuban| async move {
task.fuzzy_barrier().await;
let mut tag_observe = hakuban.tag_observe_contract(json!({})).build();
let mut object_state_stream = tag_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
task.fuzzy_barrier().await;
let state = object_state_stream.next().await;
assert!(state.is_none() || state.unwrap().synchronized != DataSynchronized::Now);
Ok(())
})
.task("riko", |mut task, hakuban| async move {
task.fuzzy_barrier().await;
let mut object_expose = hakuban.object_expose_contract(([json!({})], json!({}))).build();
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
drop(object_state_sink);
drop(object_expose);
Ok(())
})
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_object_tag_delivery_with_reconnection() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(5, itertools::iproduct!(vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters], vec![false, true]).collect())
.setup(|task_count, (network_topology, drop_exchange)| async move { Ok((network_topology.setup_network(task_count).await, drop_exchange)) })
.task("regu", |mut task, (routers, drop_exchange)| async move {
let mut hakuban_a = Exchange::new();
let upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
task.fuzzy_barrier().await;
let mut tag_observe = hakuban_a.tag_observe_contract(json!({})).build();
task.fuzzy_barrier().await;
let mut object_state_stream = tag_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
drop(object_state_stream);
drop(tag_observe);
drop(upstream_a);
if drop_exchange {
let hakuban_new = Exchange::new();
drop(std::mem::replace(&mut hakuban_a, hakuban_new));
}
tokio::time::sleep(Duration::from_millis(100)).await;
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
let mut tag_observe = hakuban_a.tag_observe_contract(json!({})).build();
let mut object_state_stream = tag_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
task.fuzzy_barrier().await;
loop {
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
if state.version[0] == 3 {
assert_eq!(state.version, &[3]);
assert_eq!(state.data, "xxxz");
break;
}
}
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, (routers, _drop_exchange)| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
task.fuzzy_barrier().await;
let mut object_expose = hakuban_b.object_expose_contract(([json!({})], json!({}))).build();
task.fuzzy_barrier().await;
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
object_state_sink.send(ObjectState::new("xxxz").with_version(vec![3]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
Ok(())
})
.teardown(|(routers, _drop_exchange)| Topology::teardown_network(routers))
.run(Duration::from_millis(4000000))
.await
}
#[tokio::test]
async fn routed_object_tag_delivery_with_late_connection() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(3, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let mut tag_observe = hakuban_a.tag_observe_contract(json!({})).build();
let mut object_expose = hakuban_a.object_expose_contract(([json!({})], json!(1))).build();
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.send(ObjectState::new("1").with_version(vec![1]).json_serialize()).await.unwrap();
let mut _regu_object_state_stream = tag_observe.next().await.unwrap();
task.fuzzy_barrier().await;
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
task.fuzzy_barrier().await;
let mut riko_object_state_stream = tag_observe.next().await.unwrap();
let state = riko_object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "2");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let mut tag_observe = hakuban_b.tag_observe_contract(json!({})).build();
let mut object_expose = hakuban_b.object_expose_contract(([json!({})], json!(2))).build();
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.send(ObjectState::new("2").with_version(vec![1]).json_serialize()).await.unwrap();
let mut _riko_object_state_stream = tag_observe.next().await.unwrap();
task.fuzzy_barrier().await;
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
task.fuzzy_barrier().await;
let mut regu_object_state_stream = tag_observe.next().await.unwrap();
let state = regu_object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "1");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn leader_election_pattern_object_object() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban = Exchange::new();
let upstream = WebsocketConnector::new(hakuban.clone(), routers[1].url.clone())?;
let object_observe = hakuban.object_observe_contract(([json!({})], json!({}))).build();
let mut object_expose = hakuban.object_expose_contract(([json!({})], json!({}))).build();
tokio::time::sleep(Duration::from_millis(100)).await;
task.barrier().await;
task.barrier().await;
object_expose.next().await;
task.barrier().await;
drop(upstream);
drop(object_observe);
drop(object_expose);
drop(hakuban);
tokio::time::sleep(Duration::from_millis(100)).await;
task.barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban = Exchange::new();
let _upstream = WebsocketConnector::new(hakuban.clone(), routers[2].url.clone())?;
task.barrier().await;
let _object_observe = hakuban.object_observe_contract(([json!({})], json!({}))).build();
let mut object_expose = hakuban.object_expose_contract(([json!({})], json!({}))).build();
tokio::time::sleep(Duration::from_millis(100)).await;
task.barrier().await;
while timeout(Duration::from_millis(1), object_expose.next()).await.is_ok() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
object_expose.next().await;
task.barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn leader_election_pattern_tag_object() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(0, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban = Exchange::new();
let upstream = WebsocketConnector::new(hakuban.clone(), routers[1].url.clone())?;
let object_observe = hakuban.object_observe_contract(([json!({})], json!({}))).build();
let mut tag_expose = hakuban.tag_expose_contract(json!({})).build();
tokio::time::sleep(Duration::from_millis(100)).await;
task.barrier().await;
task.barrier().await;
tag_expose.next().await;
task.barrier().await;
drop(upstream);
drop(object_observe);
drop(tag_expose);
drop(hakuban);
tokio::time::sleep(Duration::from_millis(100)).await;
task.barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban = Exchange::new();
let _upstream = WebsocketConnector::new(hakuban.clone(), routers[1].url.clone())?;
task.barrier().await;
let _object_observe = hakuban.object_observe_contract(([json!({})], json!({}))).build();
let mut tag_expose = hakuban.tag_expose_contract(json!({})).build();
tokio::time::sleep(Duration::from_millis(100)).await;
task.barrier().await;
while timeout(Duration::from_millis(1), tag_expose.next()).await.is_ok() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.barrier().await;
tag_expose.next().await;
task.barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(2000))
.await
}
#[tokio::test]
async fn routed_object_object_drop_data_when_object_is_no_more_data_needed() -> Result<(), Box<dyn Error + Sync + Send>> {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).format_timestamp_millis().try_init();
Shuffle::new(3, vec![Topology::SingleRouter, Topology::TwoStackedRouters, Topology::StarOfRouters])
.setup(|task_count, network_topology| async move { Ok(network_topology.setup_network(task_count).await) })
.task("regu", |mut task, routers| async move {
let hakuban_a = Exchange::new();
let _upstream_a = WebsocketConnector::new(hakuban_a.clone(), routers[1].url.clone())?;
let mut object_observe = hakuban_a.object_observe_contract(json!({})).build();
let mut object_state_stream = object_observe.next().await.unwrap();
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxx");
assert_eq!(state.version, &[1]);
task.fuzzy_barrier().await;
drop(object_observe);
tokio::time::sleep(Duration::from_millis(400)).await;
let mut object_observe = hakuban_a.object_observe_contract(json!({})).build();
let mut object_state_stream = object_observe.next().await.unwrap();
if let Ok(_value) = timeout(Duration::from_millis(1000), object_state_stream.next()).await {
panic!("Got something when expected nothing");
}
task.fuzzy_barrier().await;
let state = object_state_stream.next().await.unwrap().json_deserialize::<String>();
assert_eq!(state.data, "xxxy");
assert_eq!(state.version, &[2]);
assert_eq!(state.synchronized, DataSynchronized::Now);
task.fuzzy_barrier().await;
Ok(())
})
.task("riko", |mut task, routers| async move {
let hakuban_b = Exchange::new();
let _upstream_b = WebsocketConnector::new(hakuban_b.clone(), routers[2].url.clone())?;
let mut object_expose: ObjectExposeContract = hakuban_b.object_expose_contract(json!({})).build();
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxx").with_version(vec![1]).json_serialize()).await.unwrap();
task.fuzzy_barrier().await;
drop(object_state_sink);
task.fuzzy_barrier().await;
let mut object_state_sink = object_expose.next().await.unwrap();
object_state_sink.send(ObjectState::new("xxxy").with_version(vec![2]).json_serialize()).await.unwrap();
while timeout(Duration::from_millis(1), object_expose.next()).await.is_ok() {
tokio::time::sleep(Duration::from_millis(1)).await;
}
task.fuzzy_barrier().await;
Ok(())
})
.teardown(Topology::teardown_network)
.run(Duration::from_millis(3000))
.await
}