vantage_dataset/mocks/
queue.rs1use crate::traits::{DataSet, InsertableDataSet, Result, ValueSet};
4use serde::Serialize;
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex};
7use vantage_core::util::error::Context;
8use vantage_types::Entity;
9
10#[derive(Debug, Clone)]
12pub struct MockQueue {
13 topics: Arc<Mutex<HashMap<String, Vec<serde_json::Value>>>>,
15}
16
17impl MockQueue {
18 pub fn init() -> Self {
19 Self {
20 topics: Arc::new(Mutex::new(HashMap::new())),
21 }
22 }
23
24 pub fn message_count(&self, topic_name: &str) -> usize {
25 self.topics
26 .lock()
27 .unwrap()
28 .get(topic_name)
29 .map(|v| v.len())
30 .unwrap_or(0)
31 }
32
33 pub fn get_messages(&self, topic_name: &str) -> Vec<serde_json::Value> {
34 self.topics
35 .lock()
36 .unwrap()
37 .get(topic_name)
38 .cloned()
39 .unwrap_or_default()
40 }
41
42 pub fn get_all_messages(&self) -> HashMap<String, Vec<serde_json::Value>> {
43 self.topics.lock().unwrap().clone()
44 }
45
46 pub(crate) fn push_message(&self, topic_name: &str, message: serde_json::Value) {
47 let mut topics = self.topics.lock().unwrap();
48 topics
49 .entry(topic_name.to_string())
50 .or_default()
51 .push(message);
52 }
53}
54
55pub struct Topic<E> {
57 queue: MockQueue,
58 topic_name: String,
59 _phantom: std::marker::PhantomData<E>,
60}
61
62impl<E: Entity> ValueSet for Topic<E> {
63 type Id = String;
64 type Value = serde_json::Value;
65}
66
67impl<E> Topic<E>
68where
69 E: Serialize + Send,
70{
71 pub fn new(queue: &MockQueue) -> Self {
72 let topic_name = std::any::type_name::<E>()
74 .split("::")
75 .last()
76 .unwrap_or("unknown");
77 Self {
78 queue: queue.clone(),
79 topic_name: topic_name.to_string(),
80 _phantom: std::marker::PhantomData,
81 }
82 }
83}
84
85#[async_trait::async_trait]
86impl<E> DataSet<E> for Topic<E> where E: Entity {}
87
88#[async_trait::async_trait]
89impl<E> InsertableDataSet<E> for Topic<E>
90where
91 E: Entity + Serialize,
92{
93 async fn insert_return_id(&self, record: &E) -> Result<Self::Id> {
94 let value = serde_json::to_value(record).context("Failed to serialize record")?;
95
96 self.queue.push_message(&self.topic_name, value);
97
98 Ok(uuid::Uuid::new_v4().to_string())
99 }
100}