ih_muse_client/
mock_client.rs1use std::collections::HashMap;
4use std::net::SocketAddr;
5use std::ops::RangeInclusive;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use chrono::Utc;
11use once_cell::sync::Lazy;
12use tokio::sync::Mutex;
13use uuid::Uuid;
14
15use ih_muse_core::{MuseError, MuseResult, Transport};
16use ih_muse_proto::prelude::*;
17
18static NEXT_ELEMENT_ID: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(0));
19
20pub fn get_new_element_id() -> u64 {
22 NEXT_ELEMENT_ID.fetch_add(1, Ordering::SeqCst)
23}
24
25pub struct MockClient {
26 metrics: Arc<Mutex<Vec<MetricDefinition>>>,
27 sent_metrics: Arc<Mutex<Vec<MetricPayload>>>,
28 finest_resolution: Arc<Mutex<TimestampResolution>>,
29 node_state: NodeState,
30}
31
32impl Default for MockClient {
33 fn default() -> Self {
34 Self::new(TimestampResolution::default())
35 }
36}
37
38impl MockClient {
39 pub fn new(default_resolution: TimestampResolution) -> Self {
40 let node_id = Uuid::new_v4();
41 let node_info = NodeInfo {
42 start_date: Utc::now().timestamp(),
43 node_addr: "127.0.0.1:0".parse().unwrap(), };
45 let cluster_id = Uuid::new_v4();
46 let available_nodes = {
47 let mut map = HashMap::new();
48 map.insert(node_id, node_info);
49 map
50 };
51 let node_state = NodeState {
52 node_id,
53 node_info,
54 available_nodes,
55 main_node_id: Some(node_id),
56 current_status: NodeStatus::Leader,
57 cluster_id: Some(cluster_id),
58 };
59
60 MockClient {
61 metrics: Arc::new(Mutex::new(Vec::new())),
62 sent_metrics: Arc::new(Mutex::new(Vec::new())),
63 finest_resolution: Arc::new(Mutex::new(default_resolution)),
64 node_state,
65 }
66 }
67}
68
69#[async_trait]
70impl Transport for MockClient {
71 async fn health_check(&self) -> MuseResult<()> {
72 log::info!("MockClient: health_check called");
73 Ok(())
74 }
75
76 async fn get_node_state(&self) -> MuseResult<NodeState> {
77 log::info!("MockClient: get_node_state called");
78 Ok(self.node_state.clone())
79 }
80
81 async fn get_finest_resolution(&self) -> MuseResult<TimestampResolution> {
82 log::info!("MockClient: get_finest_resolution called");
83 Ok(*self.finest_resolution.lock().await)
84 }
85
86 async fn get_node_elem_ranges(
87 &self,
88 _ini: Option<u64>,
89 _end: Option<u64>,
90 ) -> MuseResult<Vec<NodeElementRange>> {
91 log::info!(
92 "MockClient: get_node_elem_ranges called with {:?}..{:?}",
93 _ini,
94 _end
95 );
96
97 let current_max_elem_id = NEXT_ELEMENT_ID.load(Ordering::SeqCst);
98
99 let range_end = ((current_max_elem_id + 99) / 100) * 100;
101
102 let node_element_range = NodeElementRange {
103 node_id: self.node_state.node_id,
104 range: OrdRangeInc(RangeInclusive::new(0, range_end)),
105 };
106
107 Ok(vec![node_element_range])
108 }
109
110 async fn register_element_kinds(
111 &self,
112 element_kinds: &[ElementKindRegistration],
113 ) -> MuseResult<()> {
114 log::info!(
115 "MockClient: register_element_kinds called with {:?}",
116 element_kinds
117 );
118 Ok(())
119 }
120
121 async fn register_elements(
122 &self,
123 elements: &[ElementRegistration],
124 ) -> MuseResult<Vec<Result<ElementId, MuseError>>> {
125 log::info!("MockClient: register_elements called with {:?}", elements);
126 let results = elements.iter().map(|_| Ok(get_new_element_id())).collect();
127 Ok(results)
128 }
129
130 async fn register_metrics(&self, payload: &[MetricDefinition]) -> MuseResult<()> {
131 log::info!("MockClient: register_metrics called with {:?}", payload);
132 let mut metrics = self.metrics.lock().await;
133 metrics.extend(payload.iter().cloned());
134 Ok(())
135 }
136
137 async fn get_metric_order(&self) -> MuseResult<Vec<MetricDefinition>> {
138 log::info!("MockClient: get_metric_order called");
139 let metrics = self.metrics.lock().await;
140 Ok(metrics.clone())
141 }
142
143 async fn get_metrics(
144 &self,
145 query: &MetricQuery,
146 node_addr: Option<SocketAddr>,
147 ) -> MuseResult<Vec<MetricPayload>> {
148 log::info!(
149 "MockClient: get_metrics from {:?} called with query: {:?}",
150 node_addr,
151 query
152 );
153 if query.parent_id.is_some() {
154 return Err(MuseError::Client(
155 "parent_id not implemented in MockClient".to_string(),
156 ));
157 }
158 let mut results = Vec::new();
159 for payload in self.sent_metrics.lock().await.iter() {
160 if let Some(start_time) = query.start_time {
162 if payload.time < start_time {
163 continue;
164 }
165 }
166 if let Some(end_time) = query.end_time {
167 if payload.time > end_time {
168 continue;
169 }
170 }
171 if let Some(query_element_id) = query.element_id {
173 if payload.element_id != query_element_id {
174 continue;
175 }
176 }
177 if let Some(query_metric_id) = query.metric_id {
179 if !payload.metric_ids.contains(&query_metric_id) {
181 continue;
182 }
183 }
184 results.push(payload.clone());
185 }
186 Ok(results)
187 }
188
189 async fn send_metrics(
190 &self,
191 payload: Vec<MetricPayload>,
192 node_addr: Option<SocketAddr>,
193 ) -> MuseResult<()> {
194 log::info!(
195 "MockClient: send_metrics to {:?} called with {:?}",
196 node_addr,
197 payload
198 );
199 let mut sent_metrics = self.sent_metrics.lock().await;
200 sent_metrics.extend(payload);
201 Ok(())
202 }
203}