messaging_thread_pool 5.0.3

A library for aiding the creation of typed thread pool of objects that is communicated with via channels
Documentation
use crossbeam_channel::{SendError, unbounded};
use tracing::instrument;

use crate::{
    ThreadPool,
    id_targeted::IdTargeted,
    pool_item::PoolItem,
    request_with_response::RequestWithResponse,
    sender_couplet::{SenderCouplet, thread_abort_send_error},
    thread_request_response::ThreadRequestResponse,
};

impl<P> ThreadPool<P>
where
    P: PoolItem,
{
    /// This function sends a request to a worker thread and receives a response back
    ///
    /// The request is received as an iterator and the responses are received back as an iterator
    #[instrument(skip(self, requests))]
    pub fn send_and_receive<T>(
        &self,
        requests: impl Iterator<Item = T>,
    ) -> Result<impl Iterator<Item = T::Response>, SendError<SenderCouplet<P>>>
    where
        T: RequestWithResponse<P> + IdTargeted,
    {
        let (return_back_to, receive_from_worker) = unbounded::<ThreadRequestResponse<P>>();
        self.send(return_back_to, requests)?;
        Ok(self.receive::<T>(receive_from_worker))
    }

    #[instrument(skip(self, request))]
    pub fn send_and_receive_once<T>(
        &self,
        request: T,
    ) -> Result<T::Response, SendError<SenderCouplet<P>>>
    where
        T: RequestWithResponse<P> + IdTargeted,
    {
        let request_id = request.id();
        let mut responses = self.send_and_receive(std::iter::once(request))?;
        if let Some(response) = responses.next()
            && responses.next().is_none()
        {
            return Ok(response);
        }

        Err(thread_abort_send_error::<P>(request_id))
    }
}

#[cfg(test)]
mod tests {
    use crate::{ThreadPool, samples::*, thread_request_response::*};

    #[test]
    fn two_threads_three_echoes_receives_expected_response() {
        let target = ThreadPool::<Randoms>::new(2);

        let requests = (0..3u64).map(|i| ThreadEchoRequest::new(i, format!("ping {i}")));

        let results: Vec<ThreadEchoResponse> = target.send_and_receive(requests).unwrap().collect();

        assert_eq!(results.len(), 3);

        assert!(results.contains(&ThreadEchoResponse::new(0, "ping 0".to_string(), 0)));
        assert!(results.contains(&ThreadEchoResponse::new(1, "ping 1".to_string(), 1)));
        assert!(results.contains(&ThreadEchoResponse::new(2, "ping 2".to_string(), 0)));
    }

    #[test]
    fn single_thread_single_init_receives_expected_response() {
        let target = ThreadPool::<Randoms>::new(1);

        let requests = (0..1).map(RandomsAddRequest);

        let result: Vec<AddResponse> = target.send_and_receive(requests).unwrap().collect();

        assert_eq!(result.len(), 1);
        assert_eq!(0, result[0].id());
    }
}