use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use parking_lot::Mutex;
use crate::ChannelId;
use crate::websocket::{
AnyClient, ChannelView, Client, ClientChannel, ClientChannelId, ClientId,
GetParametersResponder, Parameter, ParameterHandler, ServerListener, SetParametersResponder,
};
#[allow(dead_code)]
pub(crate) struct ClientChannelInfo {
pub(crate) id: ClientChannelId,
pub(crate) topic: String,
}
impl From<&ClientChannel> for ClientChannelInfo {
fn from(channel: &ClientChannel) -> Self {
Self {
id: channel.id,
topic: channel.topic.clone(),
}
}
}
pub(crate) struct ChannelInfo {
pub(crate) id: ChannelId,
pub(crate) topic: String,
}
impl From<ChannelView<'_>> for ChannelInfo {
fn from(channel: ChannelView) -> Self {
Self {
id: channel.id(),
topic: channel.topic().to_string(),
}
}
}
pub(crate) struct MessageData {
#[allow(dead_code)]
pub client_id: ClientId,
pub channel: ClientChannelInfo,
pub data: Vec<u8>,
}
pub(crate) struct GetParameters {
#[allow(dead_code)]
pub client_id: ClientId,
pub param_names: Vec<String>,
pub request_id: Option<String>,
}
pub(crate) struct SetParameters {
#[allow(dead_code)]
pub client_id: ClientId,
pub parameters: Vec<Parameter>,
pub request_id: Option<String>,
}
pub(crate) struct RecordingServerListener {
message_data: Mutex<Vec<MessageData>>,
subscribe: Mutex<Vec<(ClientId, ChannelInfo)>>,
unsubscribe: Mutex<Vec<(ClientId, ChannelInfo)>>,
client_advertise: Mutex<Vec<(ClientId, ClientChannelInfo)>>,
client_unadvertise: Mutex<Vec<(ClientId, ClientChannelInfo)>>,
parameters_subscribe: Mutex<Vec<Vec<String>>>,
parameters_unsubscribe: Mutex<Vec<Vec<String>>>,
parameters_get: Mutex<Vec<GetParameters>>,
parameters_set: Mutex<Vec<SetParameters>>,
parameters_get_result: Mutex<Vec<Parameter>>,
connection_graph_subscribe: AtomicUsize,
connection_graph_unsubscribe: AtomicUsize,
}
impl RecordingServerListener {
pub fn new() -> Self {
Self {
message_data: Mutex::default(),
subscribe: Mutex::default(),
unsubscribe: Mutex::default(),
client_advertise: Mutex::default(),
client_unadvertise: Mutex::default(),
parameters_subscribe: Mutex::default(),
parameters_unsubscribe: Mutex::default(),
parameters_get: Mutex::default(),
parameters_set: Mutex::default(),
parameters_get_result: Mutex::default(),
connection_graph_subscribe: AtomicUsize::default(),
connection_graph_unsubscribe: AtomicUsize::default(),
}
}
pub fn message_data_len(&self) -> usize {
self.message_data.lock().len()
}
pub fn client_advertise_len(&self) -> usize {
self.client_advertise.lock().len()
}
pub fn client_unadvertise_len(&self) -> usize {
self.client_unadvertise.lock().len()
}
pub fn parameters_subscribe_len(&self) -> usize {
self.parameters_subscribe.lock().len()
}
pub fn parameters_unsubscribe_len(&self) -> usize {
self.parameters_unsubscribe.lock().len()
}
pub fn take_message_data(&self) -> Vec<MessageData> {
std::mem::take(&mut self.message_data.lock())
}
pub fn take_subscribe(&self) -> Vec<(ClientId, ChannelInfo)> {
std::mem::take(&mut self.subscribe.lock())
}
pub fn take_unsubscribe(&self) -> Vec<(ClientId, ChannelInfo)> {
std::mem::take(&mut self.unsubscribe.lock())
}
pub fn take_client_advertise(&self) -> Vec<(ClientId, ClientChannelInfo)> {
std::mem::take(&mut self.client_advertise.lock())
}
pub fn take_client_unadvertise(&self) -> Vec<(ClientId, ClientChannelInfo)> {
std::mem::take(&mut self.client_unadvertise.lock())
}
pub fn take_parameters_subscribe(&self) -> Vec<Vec<String>> {
std::mem::take(&mut self.parameters_subscribe.lock())
}
pub fn take_parameters_unsubscribe(&self) -> Vec<Vec<String>> {
std::mem::take(&mut self.parameters_unsubscribe.lock())
}
pub fn take_parameters_get(&self) -> Vec<GetParameters> {
std::mem::take(&mut self.parameters_get.lock())
}
pub fn set_parameters_get_result(&self, result: Vec<Parameter>) {
*self.parameters_get_result.lock() = result;
}
pub fn take_parameters_set(&self) -> Vec<SetParameters> {
std::mem::take(&mut self.parameters_set.lock())
}
fn inc_connection_graph_subscribe(&self) {
self.connection_graph_subscribe
.fetch_add(1, Ordering::AcqRel);
}
pub fn take_connection_graph_subscribe(&self) -> usize {
self.connection_graph_subscribe.swap(0, Ordering::AcqRel)
}
fn inc_connection_graph_unsubscribe(&self) {
self.connection_graph_unsubscribe
.fetch_add(1, Ordering::AcqRel);
}
pub fn take_connection_graph_unsubscribe(&self) -> usize {
self.connection_graph_unsubscribe.swap(0, Ordering::AcqRel)
}
}
impl ServerListener for RecordingServerListener {
fn on_message_data(&self, client: Client, channel: &ClientChannel, payload: &[u8]) {
let mut data = self.message_data.lock();
data.push(MessageData {
client_id: client.id(),
channel: channel.into(),
data: payload.to_vec(),
});
}
fn on_subscribe(&self, client: Client, channel: ChannelView) {
let mut subs = self.subscribe.lock();
subs.push((client.id(), channel.into()));
}
fn on_unsubscribe(&self, client: Client, channel: ChannelView) {
let mut unsubs = self.unsubscribe.lock();
unsubs.push((client.id(), channel.into()));
}
fn on_client_advertise(&self, client: Client, channel: &ClientChannel) {
let mut adverts = self.client_advertise.lock();
adverts.push((client.id(), channel.into()));
}
fn on_client_unadvertise(&self, client: Client, channel: &ClientChannel) {
let mut unadverts = self.client_unadvertise.lock();
unadverts.push((client.id(), channel.into()));
}
fn on_get_parameters(
&self,
client: Client,
param_names: Vec<String>,
request_id: Option<&str>,
) -> Vec<Parameter> {
let mut gets = self.parameters_get.lock();
gets.push(GetParameters {
client_id: client.id(),
param_names: param_names.clone(),
request_id: request_id.map(|s| s.to_string()),
});
self.parameters_get_result.lock().clone()
}
fn on_set_parameters(
&self,
client: Client,
parameters: Vec<Parameter>,
request_id: Option<&str>,
) -> Vec<Parameter> {
let mut sets = self.parameters_set.lock();
sets.push(SetParameters {
client_id: client.id(),
parameters: parameters.clone(),
request_id: request_id.map(|s| s.to_string()),
});
parameters
}
fn on_parameters_subscribe(&self, param_names: Vec<String>) {
let mut subs = self.parameters_subscribe.lock();
subs.push(param_names.clone());
}
fn on_parameters_unsubscribe(&self, param_names: Vec<String>) {
let mut unsubs = self.parameters_unsubscribe.lock();
unsubs.push(param_names.clone());
}
fn on_connection_graph_subscribe(&self) {
self.inc_connection_graph_subscribe();
}
fn on_connection_graph_unsubscribe(&self) {
self.inc_connection_graph_unsubscribe();
}
}
#[derive(Clone, Default)]
#[allow(dead_code)] pub(crate) enum RecordingSetBehavior {
#[default]
Echo,
With(Vec<Parameter>),
DropResponder,
}
#[derive(Clone)]
pub(crate) enum RecordingGetBehavior {
Respond(Vec<Parameter>),
DropResponder,
}
impl Default for RecordingGetBehavior {
fn default() -> Self {
Self::Respond(Vec::new())
}
}
pub(crate) struct RecordingParameterHandler {
parameters_get: Mutex<Vec<GetParameters>>,
parameters_set: Mutex<Vec<SetParameters>>,
get_behavior: Mutex<RecordingGetBehavior>,
set_behavior: Mutex<RecordingSetBehavior>,
}
impl RecordingParameterHandler {
pub fn new() -> Self {
Self {
parameters_get: Mutex::default(),
parameters_set: Mutex::default(),
get_behavior: Mutex::new(RecordingGetBehavior::default()),
set_behavior: Mutex::new(RecordingSetBehavior::default()),
}
}
pub fn set_get_behavior(&self, behavior: RecordingGetBehavior) {
*self.get_behavior.lock() = behavior;
}
pub fn set_set_behavior(&self, behavior: RecordingSetBehavior) {
*self.set_behavior.lock() = behavior;
}
pub fn parameters_get_len(&self) -> usize {
self.parameters_get.lock().len()
}
pub fn parameters_set_len(&self) -> usize {
self.parameters_set.lock().len()
}
pub fn take_parameters_get(&self) -> Vec<GetParameters> {
std::mem::take(&mut self.parameters_get.lock())
}
pub fn take_parameters_set(&self) -> Vec<SetParameters> {
std::mem::take(&mut self.parameters_set.lock())
}
}
impl ParameterHandler for RecordingParameterHandler {
fn get(
&self,
client: AnyClient,
names: Vec<String>,
request_id: Option<String>,
responder: GetParametersResponder,
) {
self.parameters_get.lock().push(GetParameters {
client_id: client.id(),
param_names: names,
request_id,
});
let behavior = self.get_behavior.lock().clone();
match behavior {
RecordingGetBehavior::Respond(values) => responder.respond(values),
RecordingGetBehavior::DropResponder => drop(responder),
}
}
fn set(
&self,
client: AnyClient,
parameters: Vec<Parameter>,
request_id: Option<String>,
responder: SetParametersResponder,
) {
self.parameters_set.lock().push(SetParameters {
client_id: client.id(),
parameters: parameters.clone(),
request_id,
});
let behavior = self.set_behavior.lock().clone();
match behavior {
RecordingSetBehavior::Echo => responder.respond(parameters),
RecordingSetBehavior::With(values) => responder.respond(values),
RecordingSetBehavior::DropResponder => drop(responder),
}
}
}
pub async fn assert_eventually(cond: impl Fn() -> bool) {
let timeout = Duration::from_millis(50);
let poll_interval = Duration::from_millis(1);
let result = tokio::time::timeout(timeout, async {
while !cond() {
tokio::time::sleep(poll_interval).await;
}
})
.await;
assert!(result.is_ok(), "condition not met within {timeout:?}");
}