integration_tests_sv2 0.1.1

Sv2 Integration Tests Framework
use std::{collections::VecDeque, sync::Arc};
use stratum_apps::{
    custom_mutex::Mutex,
    stratum_core::parsers_sv2::{AnyMessage, Tlv},
};

use crate::types::MsgType;

#[allow(clippy::type_complexity)]
#[derive(Debug, Clone)]
pub struct MessagesAggregator {
    messages: Arc<Mutex<VecDeque<(MsgType, AnyMessage<'static>, Option<Vec<Tlv>>)>>>,
}

impl Default for MessagesAggregator {
    fn default() -> Self {
        Self::new()
    }
}

impl MessagesAggregator {
    /// Creates a new [`MessagesAggregator`].
    pub fn new() -> Self {
        Self {
            messages: Arc::new(Mutex::new(VecDeque::new())),
        }
    }

    /// Adds a message to the end of the queue.
    pub fn add_message(&self, msg_type: MsgType, message: AnyMessage<'static>) {
        self.add_message_with_tlvs(msg_type, message, None);
    }

    /// Adds a message with TLV fields to the end of the queue.
    pub fn add_message_with_tlvs(
        &self,
        msg_type: MsgType,
        message: AnyMessage<'static>,
        tlv_fields: Option<Vec<Tlv>>,
    ) {
        self.messages
            .safe_lock(|messages| messages.push_back((msg_type, message, tlv_fields)))
            .unwrap();
    }

    /// Returns false if the queue is empty, true otherwise.
    pub fn is_empty(&self) -> bool {
        self.messages
            .safe_lock(|messages| messages.is_empty())
            .unwrap()
    }

    /// Clears all messages from the queue.
    pub fn clear(&self) {
        self.messages
            .safe_lock(|messages| messages.clear())
            .unwrap();
    }

    /// returns true if contains message_type
    pub fn has_message_type(&self, message_type: u8) -> bool {
        let has_message: bool = self
            .messages
            .safe_lock(|messages| {
                for (t, _, _) in messages.iter() {
                    if *t == message_type {
                        return true; // Exit early with `true`
                    }
                }
                false // Default value if no match is found
            })
            .unwrap();
        has_message
    }

    /// returns true if contains message_type and removes messages from the queue
    /// until the first message of type message_type.
    pub fn has_message_type_with_remove(&self, message_type: u8) -> bool {
        self.messages
            .safe_lock(|messages| {
                let mut cloned_messages = messages.clone();
                for (pos, (t, _, _)) in cloned_messages.iter().enumerate() {
                    if *t == message_type {
                        let drained = cloned_messages.drain(pos + 1..).collect();
                        *messages = drained;
                        return true;
                    }
                }
                false
            })
            .unwrap()
    }

    /// The aggregator queues messages in FIFO order, so this function returns the oldest message in
    /// the queue.
    ///
    /// The returned message is removed from the queue.
    pub fn next_message(&self) -> Option<(MsgType, AnyMessage<'static>)> {
        self.next_message_with_tlvs()
            .map(|(msg_type, msg, _)| (msg_type, msg))
    }

    /// The aggregator queues messages in FIFO order, so this function returns the oldest message
    /// with its TLV fields in the queue.
    ///
    /// The returned message is removed from the queue.
    pub fn next_message_with_tlvs(
        &self,
    ) -> Option<(MsgType, AnyMessage<'static>, Option<Vec<Tlv>>)> {
        let is_state = self
            .messages
            .safe_lock(|messages| {
                let mut cloned = messages.clone();
                if let Some((msg_type, msg, tlv_fields)) = cloned.pop_front() {
                    *messages = cloned;
                    Some((msg_type, msg, tlv_fields))
                } else {
                    None
                }
            })
            .unwrap();
        is_state
    }
}