ih_muse_client/
mock_client.rs

1// crates/ih-muse-client/src/mock_client.rs
2
3use std::collections::HashMap;
4use std::net::SocketAddr;
5use std::ops::RangeInclusive;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use chrono::Utc;
11use once_cell::sync::Lazy;
12use tokio::sync::Mutex;
13use uuid::Uuid;
14
15use ih_muse_core::{MuseError, MuseResult, Transport};
16use ih_muse_proto::prelude::*;
17
18static NEXT_ELEMENT_ID: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(0));
19
20/// Get a new unique ElementId as a `u64`
21pub fn get_new_element_id() -> u64 {
22    NEXT_ELEMENT_ID.fetch_add(1, Ordering::SeqCst)
23}
24
25pub struct MockClient {
26    metrics: Arc<Mutex<Vec<MetricDefinition>>>,
27    sent_metrics: Arc<Mutex<Vec<MetricPayload>>>,
28    finest_resolution: Arc<Mutex<TimestampResolution>>,
29    node_state: NodeState,
30}
31
32impl Default for MockClient {
33    fn default() -> Self {
34        Self::new(TimestampResolution::default())
35    }
36}
37
38impl MockClient {
39    pub fn new(default_resolution: TimestampResolution) -> Self {
40        let node_id = Uuid::new_v4();
41        let node_info = NodeInfo {
42            start_date: Utc::now().timestamp(),
43            node_addr: "127.0.0.1:0".parse().unwrap(), // Mock address
44        };
45        let cluster_id = Uuid::new_v4();
46        let available_nodes = {
47            let mut map = HashMap::new();
48            map.insert(node_id, node_info);
49            map
50        };
51        let node_state = NodeState {
52            node_id,
53            node_info,
54            available_nodes,
55            main_node_id: Some(node_id),
56            current_status: NodeStatus::Leader,
57            cluster_id: Some(cluster_id),
58        };
59
60        MockClient {
61            metrics: Arc::new(Mutex::new(Vec::new())),
62            sent_metrics: Arc::new(Mutex::new(Vec::new())),
63            finest_resolution: Arc::new(Mutex::new(default_resolution)),
64            node_state,
65        }
66    }
67}
68
69#[async_trait]
70impl Transport for MockClient {
71    async fn health_check(&self) -> MuseResult<()> {
72        log::info!("MockClient: health_check called");
73        Ok(())
74    }
75
76    async fn get_node_state(&self) -> MuseResult<NodeState> {
77        log::info!("MockClient: get_node_state called");
78        Ok(self.node_state.clone())
79    }
80
81    async fn get_finest_resolution(&self) -> MuseResult<TimestampResolution> {
82        log::info!("MockClient: get_finest_resolution called");
83        Ok(*self.finest_resolution.lock().await)
84    }
85
86    async fn get_node_elem_ranges(
87        &self,
88        _ini: Option<u64>,
89        _end: Option<u64>,
90    ) -> MuseResult<Vec<NodeElementRange>> {
91        log::info!(
92            "MockClient: get_node_elem_ranges called with {:?}..{:?}",
93            _ini,
94            _end
95        );
96
97        let current_max_elem_id = NEXT_ELEMENT_ID.load(Ordering::SeqCst);
98
99        // Calculate the range end, rounded up to the next multiple of 100
100        let range_end = ((current_max_elem_id + 99) / 100) * 100;
101
102        let node_element_range = NodeElementRange {
103            node_id: self.node_state.node_id,
104            range: OrdRangeInc(RangeInclusive::new(0, range_end)),
105        };
106
107        Ok(vec![node_element_range])
108    }
109
110    async fn register_element_kinds(
111        &self,
112        element_kinds: &[ElementKindRegistration],
113    ) -> MuseResult<()> {
114        log::info!(
115            "MockClient: register_element_kinds called with {:?}",
116            element_kinds
117        );
118        Ok(())
119    }
120
121    async fn register_elements(
122        &self,
123        elements: &[ElementRegistration],
124    ) -> MuseResult<Vec<Result<ElementId, MuseError>>> {
125        log::info!("MockClient: register_elements called with {:?}", elements);
126        let results = elements.iter().map(|_| Ok(get_new_element_id())).collect();
127        Ok(results)
128    }
129
130    async fn register_metrics(&self, payload: &[MetricDefinition]) -> MuseResult<()> {
131        log::info!("MockClient: register_metrics called with {:?}", payload);
132        let mut metrics = self.metrics.lock().await;
133        metrics.extend(payload.iter().cloned());
134        Ok(())
135    }
136
137    async fn get_metric_order(&self) -> MuseResult<Vec<MetricDefinition>> {
138        log::info!("MockClient: get_metric_order called");
139        let metrics = self.metrics.lock().await;
140        Ok(metrics.clone())
141    }
142
143    async fn get_metrics(
144        &self,
145        query: &MetricQuery,
146        node_addr: Option<SocketAddr>,
147    ) -> MuseResult<Vec<MetricPayload>> {
148        log::info!(
149            "MockClient: get_metrics from {:?} called with query: {:?}",
150            node_addr,
151            query
152        );
153        if query.parent_id.is_some() {
154            return Err(MuseError::Client(
155                "parent_id not implemented in MockClient".to_string(),
156            ));
157        }
158        let mut results = Vec::new();
159        for payload in self.sent_metrics.lock().await.iter() {
160            // Filter by time range
161            if let Some(start_time) = query.start_time {
162                if payload.time < start_time {
163                    continue;
164                }
165            }
166            if let Some(end_time) = query.end_time {
167                if payload.time > end_time {
168                    continue;
169                }
170            }
171            // Filter by element_id
172            if let Some(query_element_id) = query.element_id {
173                if payload.element_id != query_element_id {
174                    continue;
175                }
176            }
177            // Filter by metric_id if specified
178            if let Some(query_metric_id) = query.metric_id {
179                // Check if any metric_id in the payload matches `query_metric_id`
180                if !payload.metric_ids.contains(&query_metric_id) {
181                    continue;
182                }
183            }
184            results.push(payload.clone());
185        }
186        Ok(results)
187    }
188
189    async fn send_metrics(
190        &self,
191        payload: Vec<MetricPayload>,
192        node_addr: Option<SocketAddr>,
193    ) -> MuseResult<()> {
194        log::info!(
195            "MockClient: send_metrics to {:?} called with {:?}",
196            node_addr,
197            payload
198        );
199        let mut sent_metrics = self.sent_metrics.lock().await;
200        sent_metrics.extend(payload);
201        Ok(())
202    }
203}