ih_muse/
muse.rs

1// crates/ih-muse/src/muse.rs
2
3use std::collections::HashMap;
4use std::path::Path;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::Arc;
7
8use tokio::sync::Mutex;
9use tokio::task::JoinHandle;
10use tokio::time::Duration;
11use tokio_util::sync::CancellationToken;
12
13use crate::tasks;
14use crate::timing;
15use ih_muse_client::{MockClient, PoetClient};
16use ih_muse_core::prelude::*;
17use ih_muse_proto::prelude::*;
18use ih_muse_record::{
19    FileRecorder, FileReplayer, RecordedEvent, RecordedEventWithTime, Recorder, Replayer,
20};
21
22/// The main client for interacting with the Muse system.
23///
24/// The `Muse` struct provides methods to initialize the client, register elements,
25/// send metrics, and replay recorded events.
26pub struct Muse {
27    client: Arc<dyn Transport + Send + Sync>,
28    state: Arc<State>,
29    pub recorder: Option<Arc<Mutex<dyn Recorder + Send + Sync>>>,
30    tasks: Vec<JoinHandle<()>>,
31    cancellation_token: CancellationToken,
32    /// Indicates whether the Muse client has been initialized.
33    pub is_initialized: Arc<AtomicBool>,
34    element_buffer: Arc<ElementBuffer>,
35    metric_buffer: Arc<MetricBuffer>,
36    config: Config,
37}
38
39impl Drop for Muse {
40    /// Cleans up resources when the `Muse` instance is dropped.
41    ///
42    /// Cancels any running tasks and releases resources.
43    /// Flushes and Closes any running event recording.
44    fn drop(&mut self) {
45        // Cancel any running tasks
46        self.cancellation_token.cancel();
47        for task in &self.tasks {
48            task.abort();
49        }
50
51        // Flush and close the recorder synchronously if it exists
52        if let Some(recorder) = &self.recorder {
53            let recorder = recorder.clone();
54            let _ = std::thread::spawn(move || {
55                let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
56                rt.block_on(async {
57                    let mut recorder = recorder.lock().await;
58                    if let Err(e) = recorder.flush().await {
59                        eprintln!("Failed to flush recorder: {:?}", e);
60                    }
61                    if let Err(e) = recorder.close().await {
62                        eprintln!("Failed to close recorder: {:?}", e);
63                    }
64                });
65            })
66            .join();
67        }
68    }
69}
70
71impl Muse {
72    /// Creates a new `Muse` client instance.
73    ///
74    /// # Arguments
75    ///
76    /// - `config`: A reference to the [`Config`] object.
77    ///
78    /// # Errors
79    ///
80    /// Returns a [`MuseError::Configuration`] if the client cannot be created with the provided configuration.
81    pub fn new(config: &Config) -> MuseResult<Self> {
82        let client: Arc<dyn Transport + Send + Sync> = match config.client_type {
83            ClientType::Poet => Arc::new(PoetClient::new(&config.endpoints)),
84            ClientType::Mock => Arc::new(MockClient::new(config.default_resolution)),
85        };
86
87        let recorder: Option<Arc<Mutex<dyn Recorder + Send + Sync>>> = if config.recording_enabled {
88            if let Some(path) = &config.recording_path {
89                let file_recorder =
90                    FileRecorder::new(Path::new(path)).expect("Failed to create FileRecorder");
91                Some(Arc::new(Mutex::new(file_recorder))) // Wrap in Mutex here
92            } else {
93                return Err(MuseError::Configuration(
94                    "Recording enabled but no recording path provided".to_string(),
95                ));
96            }
97        } else {
98            None
99        };
100
101        // Create the cancellation token
102        let cancellation_token = CancellationToken::new();
103
104        Ok(Self {
105            client,
106            state: Arc::new(State::new(config.default_resolution)),
107            recorder,
108            tasks: Vec::new(),
109            cancellation_token: cancellation_token.clone(),
110            is_initialized: Arc::new(AtomicBool::new(false)),
111            element_buffer: Arc::new(ElementBuffer::new(config.max_reg_elem_retries)),
112            metric_buffer: Arc::new(MetricBuffer::new()),
113            config: config.clone(),
114        })
115    }
116
117    /// Initializes the Muse client and starts background tasks.
118    ///
119    /// Must be called before using other methods that interact with the Muse system.
120    ///
121    /// # Arguments
122    ///
123    /// - `timeout`: Optional timeout duration for the initialization process.
124    ///
125    /// # Errors
126    ///
127    /// Returns a [`MuseError::MuseInitializationTimeout`] if initialization times out.
128    pub async fn initialize(&mut self, timeout: Option<Duration>) -> MuseResult<()> {
129        // Record the MuseConfig event if recording is enabled
130        if let Some(recorder) = &self.recorder {
131            let config_event = RecordedEvent::MuseConfig(self.config.clone());
132            recorder
133                .lock()
134                .await
135                .record(RecordedEventWithTime::new(config_event))
136                .await
137                .expect("Failed to record MuseConfig event");
138        }
139        // Start background tasks
140        let init_interval = self
141            .config
142            .initialization_interval
143            .unwrap_or(timing::INITIALIZATION_INTERVAL);
144        self.start_tasks(
145            self.config.element_kinds.to_vec(),
146            self.config.metric_definitions.to_vec(),
147            init_interval,
148            self.config.cluster_monitor_interval,
149        );
150        // Wait for initialization to complete, with an optional timeout
151        let deadline = timeout.map(|t| tokio::time::Instant::now() + t);
152        while !self.is_initialized() {
153            if let Some(deadline) = deadline {
154                if tokio::time::Instant::now() >= deadline {
155                    return Err(MuseError::MuseInitializationTimeout(timeout.unwrap()));
156                }
157            }
158            tokio::time::sleep(init_interval).await;
159        }
160        Ok(())
161    }
162
163    /// Retrieves a reference to the internal [`State`] object.
164    ///
165    /// # Returns
166    ///
167    /// An `Arc` pointing to the internal `State`.
168    pub fn get_state(&self) -> Arc<State> {
169        self.state.clone()
170    }
171
172    /// Retrieves the finest resolution of timestamps from the state.
173    ///
174    /// # Returns
175    ///
176    /// The current `TimestampResolution` as set in the state.
177    pub fn get_finest_resolution(&self) -> TimestampResolution {
178        self.state.get_finest_resolution()
179    }
180
181    /// Retrieves a reference to the internal transport client.
182    ///
183    /// # Returns
184    ///
185    /// An `Arc` pointing to the transport client implementing `Transport`.
186    pub fn get_client(&self) -> Arc<dyn Transport + Send + Sync> {
187        self.client.clone()
188    }
189
190    fn start_tasks(
191        &mut self,
192        element_kinds: Vec<ElementKindRegistration>,
193        metric_definitions: Vec<MetricDefinition>,
194        initialization_interval: Duration,
195        cluster_monitor_interval: Option<Duration>,
196    ) {
197        let cancellation_token = self.cancellation_token.clone();
198        let client = self.client.clone();
199        let state = self.state.clone();
200        let is_initialized = self.is_initialized.clone();
201
202        // Start the recorded flushing task
203        if let Some(recorder) = &self.recorder {
204            let flush_interval = self
205                .config
206                .recording_flush_interval
207                .unwrap_or(timing::RECORDING_FLUSH_INTERVAL);
208            let flush_task = tokio::spawn(tasks::start_recorder_flush_task(
209                cancellation_token.clone(),
210                recorder.clone(),
211                flush_interval,
212            ));
213            self.tasks.push(flush_task);
214        }
215
216        // Start the initialization task
217        let init_task_handle = tokio::spawn(tasks::start_init_task(
218            cancellation_token.clone(),
219            client.clone(),
220            state.clone(),
221            element_kinds,
222            metric_definitions,
223            initialization_interval,
224            is_initialized.clone(),
225        ));
226        self.tasks.push(init_task_handle);
227
228        // Start the cluster monitoring task
229        let cluster_monitoring_handle = tokio::spawn(tasks::start_cluster_monitor(
230            cancellation_token.clone(),
231            client.clone(),
232            state.clone(),
233            is_initialized,
234            cluster_monitor_interval.unwrap_or(timing::CLUSTER_MONITOR_INTERVAL),
235        ));
236        self.tasks.push(cluster_monitoring_handle);
237
238        // Start element registration task
239        let elem_reg_handle = tokio::spawn(tasks::start_element_registration_task(
240            cancellation_token.clone(),
241            client.clone(),
242            state.clone(),
243            self.element_buffer.clone(),
244        ));
245        self.tasks.push(elem_reg_handle);
246
247        // Start metric sender task
248        let metric_sender_handle = tokio::spawn(tasks::start_metric_sender_task(
249            cancellation_token.clone(),
250            client,
251            state,
252            self.metric_buffer.clone(),
253        ));
254        self.tasks.push(metric_sender_handle);
255    }
256
257    /// Checks if the Muse client has been initialized.
258    ///
259    /// # Returns
260    ///
261    /// `true` if initialized, `false` otherwise.
262    pub fn is_initialized(&self) -> bool {
263        self.is_initialized.load(Ordering::SeqCst)
264    }
265
266    /// Registers a new element with the Muse system.
267    ///
268    /// # Arguments
269    ///
270    /// - `kind_code`: The kind code of the element.
271    /// - `name`: The name of the element.
272    /// - `metadata`: A map of metadata key-value pairs.
273    /// - `parent_id`: Optional parent [`LocalElementId`].
274    ///
275    /// # Returns
276    ///
277    /// A [`LocalElementId`] representing the registered element.
278    ///
279    /// # Errors
280    ///
281    /// Returns a [`MuseError`] if registration fails.
282    pub async fn register_element(
283        &self,
284        kind_code: &str,
285        name: String,
286        metadata: HashMap<String, String>,
287        parent_id: Option<LocalElementId>,
288    ) -> MuseResult<LocalElementId> {
289        let local_elem_id = generate_local_element_id();
290        self.register_element_inner(local_elem_id, kind_code, name, metadata, parent_id)
291            .await?;
292        Ok(local_elem_id)
293    }
294
295    async fn register_element_inner(
296        &self,
297        local_elem_id: LocalElementId,
298        kind_code: &str,
299        name: String,
300        metadata: HashMap<String, String>,
301        parent_id: Option<LocalElementId>,
302    ) -> MuseResult<()> {
303        // Record the event if recorder is enabled
304        if let Some(recorder) = &self.recorder {
305            let event = RecordedEvent::ElementRegistration {
306                local_elem_id,
307                kind_code: kind_code.to_string(),
308                name: name.clone(),
309                metadata: metadata.clone(),
310                parent_id,
311            };
312            recorder
313                .lock()
314                .await
315                .record(RecordedEventWithTime::new(event))
316                .await?;
317        }
318        if !self.state.is_valid_element_kind_code(kind_code) {
319            return Err(MuseError::InvalidElementKindCode(kind_code.to_string()));
320        }
321        let remote_parent_id = match parent_id {
322            Some(p) => {
323                let remote_id = self
324                    .get_remote_element_id(&p)
325                    .ok_or(MuseError::NotAvailableRemoteElementId(p))?;
326                Some(remote_id)
327            }
328            None => None,
329        };
330        let element = ElementRegistration::new(kind_code, name, metadata, remote_parent_id);
331        self.element_buffer
332            .add_element(local_elem_id, element)
333            .await;
334        Ok(())
335    }
336
337    /// Retrieves the remote `ElementId` associated with a given `LocalElementId`.
338    ///
339    /// # Arguments
340    ///
341    /// - `local_elem_id`: The `LocalElementId` for which to retrieve the `ElementId`.
342    ///
343    /// # Returns
344    ///
345    /// An `Option<ElementId>` containing the associated `ElementId` if it exists.
346    pub fn get_remote_element_id(&self, local_elem_id: &LocalElementId) -> Option<ElementId> {
347        self.state.get_element_id(local_elem_id)
348    }
349
350    /// Sends a metric value associated with an element.
351    ///
352    /// # Arguments
353    ///
354    /// - `local_elem_id`: The local ID of the element.
355    /// - `metric_code`: The code identifying the metric.
356    /// - `value`: The metric value to send.
357    ///
358    /// # Errors
359    ///
360    /// Returns a [`MuseError`] if the metric cannot be sent.
361    pub async fn send_metric(
362        &self,
363        local_elem_id: LocalElementId,
364        metric_code: &str,
365        value: MetricValue,
366    ) -> MuseResult<()> {
367        // Record the event if recorder is enabled
368        if let Some(recorder) = &self.recorder {
369            let event = RecordedEvent::SendMetric {
370                local_elem_id,
371                metric_code: metric_code.to_string(),
372                value,
373            };
374            recorder
375                .lock()
376                .await
377                .record(RecordedEventWithTime::new(event))
378                .await?;
379        }
380
381        if !self.state.is_valid_metric_code(metric_code) {
382            return Err(MuseError::InvalidMetricCode(metric_code.to_string()));
383        }
384
385        self.metric_buffer
386            .add_metric(local_elem_id, metric_code.to_string(), value)
387            .await;
388
389        Ok(())
390    }
391
392    /// Retrieves metrics from the Muse system based on a query.
393    ///
394    /// **Note**: The `Muse` client is primarily intended for sending metrics to the Muse system.
395    /// This method is provided mainly for testing purposes and is not recommended for use in production code.
396    ///
397    /// # Arguments
398    ///
399    /// - `query`: The [`MetricQuery`] specifying the criteria for retrieving metrics.
400    ///
401    /// # Returns
402    ///
403    /// A vector of [`MetricPayload`]s matching the query.
404    ///
405    /// # Errors
406    ///
407    /// Returns a [`MuseError`] if the metrics cannot be retrieved.
408    pub async fn get_metrics(&self, query: &MetricQuery) -> MuseResult<Vec<MetricPayload>> {
409        // For testing purposes, we use the client to get metrics.
410        // Note that in production use, the Muse client is not intended for retrieving metrics.
411        self.client.get_metrics(query, None).await
412    }
413
414    /// Replays events from a recording file.
415    ///
416    /// Useful for testing or replaying historical data.
417    ///
418    /// # Arguments
419    ///
420    /// - `replay_path`: The file path to the recording.
421    ///
422    /// # Errors
423    ///
424    /// Returns a [`MuseError`] if replaying fails.
425    pub async fn replay(&self, replay_path: &Path) -> MuseResult<()> {
426        if self.config.recording_enabled {
427            return Err(MuseError::Replaying(
428                "Cannot replay with recording enabled".to_string(),
429            ));
430        }
431        let mut replayer = FileReplayer::new(replay_path).await?;
432
433        let mut last_timestamp = None;
434
435        while let Some(timed_event) = replayer.next_event().await? {
436            if let Some(last) = last_timestamp {
437                let delay = Duration::from_micros((timed_event.timestamp - last) as u64);
438                tokio::time::sleep(delay).await;
439            }
440
441            last_timestamp = Some(timed_event.timestamp);
442
443            match timed_event.event {
444                RecordedEvent::MuseConfig(recorded_config) => {
445                    if !self.config.is_relevantly_equal(&recorded_config) {
446                        let differences = recorded_config.pretty_diff(&self.config);
447                        log::warn!(
448                            "Recorded config and current config do not match:\n{}",
449                            differences
450                        );
451                    }
452                }
453                RecordedEvent::ElementRegistration {
454                    local_elem_id,
455                    kind_code,
456                    name,
457                    metadata,
458                    parent_id,
459                } => {
460                    self.register_element_inner(
461                        local_elem_id,
462                        &kind_code,
463                        name,
464                        metadata,
465                        parent_id,
466                    )
467                    .await?;
468                }
469                RecordedEvent::SendMetric {
470                    local_elem_id,
471                    metric_code,
472                    value,
473                } => {
474                    self.send_metric(local_elem_id, &metric_code, value).await?;
475                }
476            }
477        }
478        Ok(())
479    }
480
481    /// Initializes a Muse instance using the Config recorded in a replay file.
482    /// Sets `recording_enabled` in the Config to `false` to prevent re-recording during replay.
483    ///
484    /// # Arguments
485    /// - `replay_path`: The path to the replay file containing the Config.
486    ///
487    /// # Returns
488    /// A new Muse instance initialized with the Config extracted from the replay file.
489    pub async fn from_replay(replay_path: &Path) -> MuseResult<Self> {
490        let mut replayer = FileReplayer::new(replay_path).await?;
491        let mut config: Option<Config> = None;
492
493        // Extract the ConfigUpdate event from the replay file.
494        if let Some(timed_event) = replayer.next_event().await? {
495            if let RecordedEvent::MuseConfig(mut c) = timed_event.event {
496                c.recording_enabled = false;
497                config = Some(c);
498            }
499        }
500
501        // Ensure the Config is present in the replay file.
502        let config = config.ok_or_else(|| {
503            MuseError::Replaying("No ConfigUpdate event found in the replay file.".to_string())
504        })?;
505
506        // Create and initialize a new Muse instance with the extracted Config.
507        let mut muse = Muse::new(&config)?;
508        muse.initialize(None).await?;
509        Ok(muse)
510    }
511
512    /// Checks if a replay should start based on the presence of a replay file.
513    /// If the replay file exists and contains valid events, it will start the replay process.
514    ///
515    /// # Arguments
516    /// - `replay_path`: The path to the replay file.
517    ///
518    /// # Returns
519    /// A Result indicating success or failure of the replay process.
520    pub async fn check_and_replay(replay_path: &Path) -> MuseResult<Self> {
521        if replay_path.exists() {
522            log::info!("Replay file found: {:?}", replay_path);
523            let muse = Muse::from_replay(replay_path).await?;
524            muse.replay(replay_path).await?;
525            log::info!("Replay completed successfully.");
526            return Ok(muse);
527        }
528        log::info!("No replay file found at: {:?}", replay_path);
529        Err(MuseError::Replaying(format!(
530            "Replay file not found: {:?}",
531            replay_path
532        )))
533    }
534}