Skip to main content

vantage_dataset/mocks/
queue.rs

1//! Queue mock implementation for testing and examples
2
3use crate::traits::{DataSet, InsertableDataSet, Result, ValueSet};
4use serde::Serialize;
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex};
7use vantage_core::util::error::Context;
8use vantage_types::Entity;
9
10/// MockQueue collects all messages from all topics
11#[derive(Debug, Clone)]
12pub struct MockQueue {
13    // topic_name -> Vec<messages>
14    topics: Arc<Mutex<HashMap<String, Vec<serde_json::Value>>>>,
15}
16
17impl MockQueue {
18    pub fn init() -> Self {
19        Self {
20            topics: Arc::new(Mutex::new(HashMap::new())),
21        }
22    }
23
24    pub fn message_count(&self, topic_name: &str) -> usize {
25        self.topics
26            .lock()
27            .unwrap()
28            .get(topic_name)
29            .map(|v| v.len())
30            .unwrap_or(0)
31    }
32
33    pub fn get_messages(&self, topic_name: &str) -> Vec<serde_json::Value> {
34        self.topics
35            .lock()
36            .unwrap()
37            .get(topic_name)
38            .cloned()
39            .unwrap_or_default()
40    }
41
42    pub fn get_all_messages(&self) -> HashMap<String, Vec<serde_json::Value>> {
43        self.topics.lock().unwrap().clone()
44    }
45
46    pub(crate) fn push_message(&self, topic_name: &str, message: serde_json::Value) {
47        let mut topics = self.topics.lock().unwrap();
48        topics
49            .entry(topic_name.to_string())
50            .or_default()
51            .push(message);
52    }
53}
54
55/// Topic represents a typed topic in the queue
56pub struct Topic<E> {
57    queue: MockQueue,
58    topic_name: String,
59    _phantom: std::marker::PhantomData<E>,
60}
61
62impl<E: Entity> ValueSet for Topic<E> {
63    type Id = String;
64    type Value = serde_json::Value;
65}
66
67impl<E> Topic<E>
68where
69    E: Serialize + Send,
70{
71    pub fn new(queue: &MockQueue) -> Self {
72        // Use the type name as topic identifier
73        let topic_name = std::any::type_name::<E>()
74            .split("::")
75            .last()
76            .unwrap_or("unknown");
77        Self {
78            queue: queue.clone(),
79            topic_name: topic_name.to_string(),
80            _phantom: std::marker::PhantomData,
81        }
82    }
83}
84
85#[async_trait::async_trait]
86impl<E> DataSet<E> for Topic<E> where E: Entity {}
87
88#[async_trait::async_trait]
89impl<E> InsertableDataSet<E> for Topic<E>
90where
91    E: Entity + Serialize,
92{
93    async fn insert_return_id(&self, record: &E) -> Result<Self::Id> {
94        let value = serde_json::to_value(record).context("Failed to serialize record")?;
95
96        self.queue.push_message(&self.topic_name, value);
97
98        Ok(uuid::Uuid::new_v4().to_string())
99    }
100}