use std::collections::hash_map::Entry;
use tracing::{Level, event, instrument};
use crate::{
ID_BEING_PROCESSED, pool_item::PoolItem, request_response::RequestResponse,
sender_couplet::SenderCouplet, thread_request_response::*,
};
use super::PoolThread;
impl<P> PoolThread<P>
where
P: PoolItem,
{
#[instrument(skip(self), fields(id=self.thread_id, name=P::name()))]
pub fn message_loop(&mut self) {
let mut thread_start_info = P::thread_start();
while let Ok(sender_couplet) = self.pool_thread_receiver.recv() {
if tracing::enabled!(Level::TRACE) {
event!(
Level::TRACE,
"receiving request {:?}",
sender_couplet.request(),
);
}
let SenderCouplet { return_to, request } = sender_couplet;
let id = request.id();
ID_BEING_PROCESSED.replace(Some(id));
if let Some(thread_start_info) = &mut thread_start_info {
P::pool_item_pre_process(id, thread_start_info);
}
let response = match request {
ThreadRequestResponse::MessagePoolItem(request) => {
if let Some(targeted) = self.pool_item_map.get_mut(&id) {
targeted.process_message(request)
} else {
P::id_not_found(&request)
}
}
ThreadRequestResponse::AddPoolItem(RequestResponse::Request(request)) => {
match P::new_pool_item(request) {
Ok(new_pool_item) => {
event!(
Level::DEBUG,
"Inserting a new {:?} into the threads map, id={:?}",
P::name(),
id
);
match self.pool_item_map.entry(id) {
Entry::Vacant(v) => {
v.insert(new_pool_item);
AddResponse::new(id, Ok(id))
}
Entry::Occupied(_) => AddResponse::new(
id,
Err("failed to add; pool item already exists".to_string()),
),
}
}
Err(new_pool_item_error) => {
AddResponse::new(id, Err(new_pool_item_error.error_message))
}
}
.into()
}
ThreadRequestResponse::RemovePoolItem(RequestResponse::Request(_request)) => {
let success = self.pool_item_map.remove(&id).is_some();
event!(
Level::DEBUG,
"Trying to remove a {:?} from the threads map, id={:?}, success={:?}",
P::name(),
id,
success
);
RemovePoolItemResponse::new(id, success).into()
}
ThreadRequestResponse::ThreadShutdown(RequestResponse::Request(_request)) => {
debug_assert_eq!(
self.thread_id, id,
"this messages should have targeted this thread"
);
let children = self.shutdown_child_pool();
return_to
.send(ThreadShutdownResponse::new(id, children).into())
.expect("the send should always succeed");
debug_assert!(
self.pool_item_map.is_empty(),
"ThreadShutdown should drain all elements"
);
return;
}
ThreadRequestResponse::ThreadEcho(RequestResponse::Request(request)) => {
ThreadEchoResponse::new(id, request.message().to_string(), self.thread_id)
.into()
}
ThreadRequestResponse::ThreadAbort(RequestResponse::Request(_request)) => {
debug_assert_eq!(
self.thread_id, id,
"this messages should have targeted this thread"
);
return_to
.send(ThreadAbortResponse(id).into())
.expect("the send should always succeed");
return;
}
_ => panic!("unrecognised thread thread request"),
};
if tracing::enabled!(Level::TRACE) {
event!(Level::TRACE, ?response);
}
match return_to.send(response) {
Ok(_) => (),
Err(err) => {
event!(
Level::WARN,
"Cannot return results, other end of channel has most likely been dropped. Err = {}",
&err
);
}
};
ID_BEING_PROCESSED.replace(None);
if let Some(thread_start_info) = &mut thread_start_info {
P::pool_item_post_process(id, thread_start_info);
}
}
event!(
Level::INFO,
"request channel closed; message loop exiting, thread_id={}",
self.thread_id
);
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use crossbeam_channel::unbounded;
use tracing::Level;
use tracing_subscriber::{layer::Context, prelude::*, registry::Registry};
use crate::{
pool_thread::PoolThread, samples::*, sender_couplet::SenderCouplet,
thread_request_response::*,
};
struct CapturingLayer {
events: Arc<Mutex<Vec<Level>>>,
}
impl<S> tracing_subscriber::layer::Layer<S> for CapturingLayer
where
S: tracing::Subscriber,
{
fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
let mut events = self.events.lock().expect("events mutex poisoned");
events.push(*event.metadata().level());
}
}
#[test]
fn send_init_id_2_twice_returns_response_indicating_second_request_was_ignored() {
let id = 2;
let init_request = RandomsAddRequest(id);
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(3, request_receive);
request_send
.send(SenderCouplet::new(
response_send.clone(),
init_request.clone(),
))
.unwrap();
request_send
.send(SenderCouplet::new(response_send.clone(), init_request))
.unwrap();
request_send
.send(SenderCouplet::new(response_send, ThreadAbortRequest(3)))
.unwrap();
target.message_loop();
let response_0: AddResponse = response_receive.recv().unwrap().into();
let response_1: AddResponse = response_receive.recv().unwrap().into();
assert_eq!(2, response_0.id());
assert!(response_0.result().is_ok());
assert_eq!(1, target.pool_item_map.len());
assert_eq!(2, target.pool_item_map.get(&id).unwrap().id);
assert_eq!(2, response_1.id());
assert!(response_1.result().is_err());
assert_eq!(
"failed to add; pool item already exists",
response_1.result().err().unwrap()
)
}
#[test]
fn send_remove_element_with_id_12_expected_element_removed_from_map_set() {
let id = 12;
let init_request = RandomsAddRequest(id);
let remove_pool_item_request = RemovePoolItemRequest(id);
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(1, request_receive);
request_send
.send(SenderCouplet::new(response_send.clone(), init_request))
.unwrap();
request_send
.send(SenderCouplet::new(
response_send.clone(),
remove_pool_item_request,
))
.unwrap();
request_send
.send(SenderCouplet::new(response_send, ThreadShutdownRequest(1)))
.unwrap();
target.message_loop();
response_receive.recv().unwrap();
let remove_pool_item_response: RemovePoolItemResponse =
response_receive.recv().unwrap().into();
assert_eq!(id, remove_pool_item_response.id());
assert!(target.pool_item_map.is_empty());
}
#[test]
fn send_remove_element_with_id_2_expected_element_removed_from_map_set() {
let id = 2;
let init_request = RandomsAddRequest(id);
let remove_pool_item_request = RemovePoolItemRequest(id);
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(1, request_receive);
request_send
.send(SenderCouplet::new(response_send.clone(), init_request))
.unwrap();
request_send
.send(SenderCouplet::new(
response_send.clone(),
remove_pool_item_request,
))
.unwrap();
request_send
.send(SenderCouplet::new(response_send, ThreadShutdownRequest(1)))
.unwrap();
target.message_loop();
response_receive.recv().unwrap();
let remove_pool_item_response: RemovePoolItemResponse =
response_receive.recv().unwrap().into();
assert_eq!(id, remove_pool_item_response.id());
assert!(target.pool_item_map.is_empty());
}
#[test]
fn init_id_1_2_thread_shutdown_clears_the_elements_returns_expected_shutdown_threads() {
let init_request_0 = RandomsAddRequest(1);
let init_request_1 = RandomsAddRequest(2);
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(15, request_receive);
request_send
.send(SenderCouplet::new(response_send.clone(), init_request_0))
.unwrap();
request_send
.send(SenderCouplet::new(response_send.clone(), init_request_1))
.unwrap();
request_send
.send(SenderCouplet::new(response_send, ThreadShutdownRequest(15)))
.unwrap();
target.message_loop();
response_receive.recv().unwrap();
response_receive.recv().unwrap();
let thread_shutdown_payload: ThreadShutdownResponse =
response_receive.recv().unwrap().into();
assert!(
thread_shutdown_payload
== ThreadShutdownResponse::new(15, vec![ThreadShutdownResponse::new(1, vec![])])
|| thread_shutdown_payload
== ThreadShutdownResponse::new(
15,
vec![ThreadShutdownResponse::new(2, vec![])]
)
);
assert!(target.pool_item_map.is_empty());
}
#[test]
fn init_id_101_102_thread_shutdown_clears_the_elements_returns_expected_shutdown_threads() {
let init_request_0 = RandomsAddRequest(101);
let init_request_1 = RandomsAddRequest(102);
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(5, request_receive);
request_send
.send(SenderCouplet::new(response_send.clone(), init_request_0))
.unwrap();
request_send
.send(SenderCouplet::new(response_send.clone(), init_request_1))
.unwrap();
request_send
.send(SenderCouplet::new(response_send, ThreadShutdownRequest(5)))
.unwrap();
target.message_loop();
response_receive.recv().unwrap();
response_receive.recv().unwrap();
let thread_shutdown_response: ThreadShutdownResponse =
response_receive.recv().unwrap().into();
assert!(
thread_shutdown_response
== ThreadShutdownResponse::new(5, vec![ThreadShutdownResponse::new(101, vec![])])
|| thread_shutdown_response
== ThreadShutdownResponse::new(
5,
vec![ThreadShutdownResponse::new(102, vec![])]
)
);
assert!(target.pool_item_map.is_empty());
}
#[test]
fn init_id_101_send_get_state_message_to_element_retrieved_expected_response() {
let id = 101;
let init_request = RandomsAddRequest(id);
let get_state_request = SumRequest(id);
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(1, request_receive);
request_send
.send(SenderCouplet::new(response_send.clone(), init_request))
.unwrap();
request_send
.send(SenderCouplet::new(response_send.clone(), get_state_request))
.unwrap();
request_send
.send(SenderCouplet::new(response_send, ThreadShutdownRequest(1)))
.unwrap();
target.message_loop();
response_receive.recv().unwrap();
let response: SumResponse = response_receive.recv().unwrap().into();
assert_eq!(id, response.id);
assert!(response.sum() > 0);
}
#[test]
fn send_init_id_2_expected_element_added_to_map_set() {
let id = 2;
let init_request = RandomsAddRequest(id);
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(3, request_receive);
request_send
.send(SenderCouplet::new(response_send.clone(), init_request))
.unwrap();
request_send
.send(SenderCouplet::new(response_send, ThreadAbortRequest(3)))
.unwrap();
target.message_loop();
let response: AddResponse = response_receive.recv().unwrap().into();
assert_eq!(2, response.id());
assert_eq!(1, target.pool_item_map.len());
assert_eq!(2, target.pool_item_map.get(&id).unwrap().id);
}
#[test]
fn send_init_id_1_expected_element_added_to_map() {
let id = 1;
let init_request = RandomsAddRequest(id);
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(1, request_receive);
request_send
.send(SenderCouplet::new(response_send.clone(), init_request))
.unwrap();
request_send
.send(SenderCouplet::new(response_send, ThreadAbortRequest(1)))
.unwrap();
target.message_loop();
let response: AddResponse = response_receive.recv().unwrap().into();
assert_eq!(1, response.id());
assert_eq!(1, target.pool_item_map.len());
assert_eq!(1, target.pool_item_map.get(&id).unwrap().id);
}
#[test]
fn echo_message_responds_as_expected() {
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(1, request_receive);
request_send
.send(SenderCouplet::new(
response_send.clone(),
ThreadEchoRequest::new(2, "ping".to_string()),
))
.unwrap();
request_send
.send(SenderCouplet::new(response_send, ThreadShutdownRequest(1)))
.unwrap();
target.message_loop();
let thread_echo_response: ThreadEchoResponse = response_receive.recv().unwrap().into();
assert_eq!("ping".to_string(), thread_echo_response.message());
assert_eq!(2, thread_echo_response.thread_id());
assert_eq!(1, thread_echo_response.responding_thread_id());
}
#[test]
fn id_2_receives_abort_message_exits_loop() {
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(2, request_receive);
request_send
.send(SenderCouplet::<Randoms>::new(
response_send,
ThreadAbortRequest(2),
))
.unwrap();
target.message_loop();
let thread_abort_response: ThreadAbortResponse = response_receive.recv().unwrap().into();
assert_eq!(2, thread_abort_response.thread_id());
}
#[test]
fn id_1_receives_abort_message_exits_loop() {
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(1, request_receive);
request_send
.send(SenderCouplet::<Randoms>::new(
response_send,
ThreadAbortRequest(1),
))
.unwrap();
target.message_loop();
let thread_abort_response: ThreadAbortResponse = response_receive.recv().unwrap().into();
assert_eq!(1, thread_abort_response.thread_id());
}
#[test]
fn id_2_receives_shutdown_message_exits_loop() {
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(2, request_receive);
request_send
.send(SenderCouplet::<Randoms>::new(
response_send,
ThreadShutdownRequest(2),
))
.unwrap();
target.message_loop();
let thread_shutdown_payload: ThreadShutdownResponse =
response_receive.recv().unwrap().into();
assert_eq!(2, thread_shutdown_payload.thread_id());
assert_eq!(
&Vec::<ThreadShutdownResponse>::default(),
thread_shutdown_payload.children()
)
}
#[test]
fn id_1_receives_shutdown_message_exits_loop() {
let (response_send, response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let mut target = PoolThread::new(1, request_receive);
request_send
.send(SenderCouplet::<Randoms>::new(
response_send,
ThreadShutdownRequest(1),
))
.unwrap();
target.message_loop();
let thread_shutdown_payload: ThreadShutdownResponse =
response_receive.recv().unwrap().into();
assert_eq!(1, thread_shutdown_payload.thread_id());
assert_eq!(
&Vec::<ThreadShutdownResponse>::default(),
thread_shutdown_payload.children()
)
}
#[test]
fn given_request_sender_dropped_when_message_loop_runs_then_exits_cleanly_without_panic() {
let (_response_send, _response_receive) = unbounded::<ThreadRequestResponse<Randoms>>();
let (request_send, request_receive) = unbounded::<SenderCouplet<Randoms>>();
let events = Arc::new(Mutex::new(Vec::<Level>::new()));
let subscriber = Registry::default().with(CapturingLayer {
events: Arc::clone(&events),
});
let _guard = tracing::subscriber::set_default(subscriber);
drop(request_send);
let mut target = PoolThread::new(1, request_receive);
target.message_loop();
let captured_events = events.lock().expect("events mutex poisoned");
assert!(
captured_events.contains(&Level::INFO),
"expected at least one INFO-level log when request sender is dropped"
);
}
}