use std::borrow::Cow;
use crate::{
background::{BackgroundTask, Command},
errors::Error,
events::{Event, EventType},
network_conf::NetworkConfiguration,
RunParameters,
};
use clap::Parser;
use influxdb::WriteQuery;
use tokio::sync::{
mpsc::{self, channel, Sender},
oneshot,
};
use tokio_stream::{wrappers::ReceiverStream, Stream};
const BACKGROUND_RECEIVER: &str = "Background Receiver";
const BACKGROUND_SENDER: &str = "Background Sender";
#[derive(Clone)]
pub struct Client {
cmd_tx: Sender<Command>,
run_parameters: RunParameters,
global_seq: u64,
group_seq: u64,
}
impl Client {
pub async fn new_and_init() -> Result<Self, Box<dyn std::error::Error>> {
let run_parameters = RunParameters::try_parse()?;
let (cmd_tx, cmd_rx) = channel(1);
let background = BackgroundTask::new(cmd_rx, run_parameters.clone()).await?;
let mut client = Self {
cmd_tx,
run_parameters,
global_seq: 0,
group_seq: 0,
};
tokio::spawn(background.run());
client.wait_network_initialized().await?;
let global_seq_num = client
.signal_and_wait(
"initialized_global",
client.run_parameters.test_instance_count,
)
.await?;
let group_seq_num = client
.signal_and_wait(
format!("initialized_group_{}", client.run_parameters.test_group_id),
client.run_parameters.test_group_instance_count as u64,
)
.await?;
client.record_message(format!(
"claimed sequence numbers; global={}, group({})={}",
global_seq_num, client.run_parameters.test_group_id, group_seq_num
));
client.global_seq = global_seq_num;
client.group_seq = group_seq_num;
Ok(client)
}
pub async fn publish(
&self,
topic: impl Into<Cow<'static, str>>,
message: impl Into<Cow<'static, serde_json::Value>>,
) -> Result<u64, Error> {
let (sender, receiver) = oneshot::channel();
let cmd = Command::Publish {
topic: topic.into().into_owned(),
message: message.into().into_owned(),
sender,
};
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
receiver.await.expect(BACKGROUND_SENDER)
}
pub async fn subscribe(
&self,
topic: impl Into<Cow<'static, str>>,
capacity: usize,
) -> impl Stream<Item = Result<serde_json::Value, Error>> {
let (stream, out) = mpsc::channel(capacity);
let cmd = Command::Subscribe {
topic: topic.into().into_owned(),
stream,
};
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
ReceiverStream::new(out)
}
pub async fn signal_and_wait(
&self,
state: impl Into<Cow<'static, str>>,
target: u64,
) -> Result<u64, Error> {
let state = state.into().into_owned();
let res = self.signal(state.clone()).await?;
self.barrier(state, target).await?;
Ok(res)
}
pub async fn signal(&self, state: impl Into<Cow<'static, str>>) -> Result<u64, Error> {
let (sender, receiver) = oneshot::channel();
let state = state.into().into_owned();
let cmd = Command::SignalEntry { state, sender };
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
receiver.await.expect(BACKGROUND_SENDER)
}
pub async fn barrier(
&self,
state: impl Into<Cow<'static, str>>,
target: u64,
) -> Result<(), Error> {
let (sender, receiver) = oneshot::channel();
let state = state.into().into_owned();
let cmd = Command::Barrier {
state,
target,
sender,
};
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
receiver.await.expect(BACKGROUND_SENDER)
}
async fn wait_network_initialized(&self) -> Result<(), Error> {
let (sender, receiver) = oneshot::channel();
let cmd = Command::WaitNetworkInitializedStart { sender };
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
receiver.await.expect(BACKGROUND_SENDER)?;
let (sender, receiver) = oneshot::channel();
let cmd = Command::WaitNetworkInitializedBarrier { sender };
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
receiver.await.expect(BACKGROUND_SENDER)?;
let (sender, receiver) = oneshot::channel();
let cmd = Command::WaitNetworkInitializedEnd { sender };
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
receiver.await.expect(BACKGROUND_SENDER)?;
Ok(())
}
pub async fn configure_network(&self, config: NetworkConfiguration) -> Result<(), Error> {
let (sender, receiver) = oneshot::channel();
let state = config.callback_state.clone();
let target = if let Some(callback_target) = config.callback_target {
callback_target
} else {
0
};
let cmd = Command::NetworkShaping { sender, config };
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
receiver.await.expect(BACKGROUND_SENDER)?;
self.barrier(state, target).await?;
Ok(())
}
pub fn record_message(&self, message: impl Into<Cow<'static, str>>) {
let message = message.into().into_owned();
let event = Event {
event: EventType::Message { message },
};
let json_event = serde_json::to_string(&event).expect("Event Serialization");
println!("{}", json_event);
}
pub async fn record_success(self) -> Result<(), Error> {
let (sender, receiver) = oneshot::channel();
let cmd = Command::SignalSuccess { sender };
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
receiver.await.expect(BACKGROUND_SENDER)?;
Ok(())
}
pub async fn record_failure(self, error: impl Into<Cow<'static, str>>) -> Result<(), Error> {
let error = error.into().into_owned();
let (sender, receiver) = oneshot::channel();
let cmd = Command::SignalFailure { error, sender };
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
receiver.await.expect(BACKGROUND_SENDER)?;
Ok(())
}
pub async fn record_crash(
self,
error: impl Into<Cow<'static, str>>,
stacktrace: impl Into<Cow<'static, str>>,
) -> Result<(), Error> {
let error = error.into().into_owned();
let stacktrace = stacktrace.into().into_owned();
let (sender, receiver) = oneshot::channel();
let cmd = Command::SignalCrash {
error,
stacktrace,
sender,
};
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
receiver.await.expect(BACKGROUND_SENDER)?;
Ok(())
}
pub async fn record_metric(&self, write_query: WriteQuery) -> Result<(), Error> {
let (sender, receiver) = oneshot::channel();
let cmd = Command::Metric {
write_query,
sender,
};
self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);
receiver.await.expect(BACKGROUND_SENDER)?;
Ok(())
}
pub fn run_parameters(&self) -> RunParameters {
self.run_parameters.clone()
}
pub fn global_seq(&self) -> u64 {
self.global_seq
}
pub fn group_seq(&self) -> u64 {
self.group_seq
}
}