1use std::collections::HashMap;
4use std::path::Path;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::Arc;
7
8use tokio::sync::Mutex;
9use tokio::task::JoinHandle;
10use tokio::time::Duration;
11use tokio_util::sync::CancellationToken;
12
13use crate::tasks;
14use crate::timing;
15use ih_muse_client::{MockClient, PoetClient};
16use ih_muse_core::prelude::*;
17use ih_muse_proto::prelude::*;
18use ih_muse_record::{
19 FileRecorder, FileReplayer, RecordedEvent, RecordedEventWithTime, Recorder, Replayer,
20};
21
22pub struct Muse {
27 client: Arc<dyn Transport + Send + Sync>,
28 state: Arc<State>,
29 pub recorder: Option<Arc<Mutex<dyn Recorder + Send + Sync>>>,
30 tasks: Vec<JoinHandle<()>>,
31 cancellation_token: CancellationToken,
32 pub is_initialized: Arc<AtomicBool>,
34 element_buffer: Arc<ElementBuffer>,
35 metric_buffer: Arc<MetricBuffer>,
36 config: Config,
37}
38
39impl Drop for Muse {
40 fn drop(&mut self) {
45 self.cancellation_token.cancel();
47 for task in &self.tasks {
48 task.abort();
49 }
50
51 if let Some(recorder) = &self.recorder {
53 let recorder = recorder.clone();
54 let _ = std::thread::spawn(move || {
55 let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
56 rt.block_on(async {
57 let mut recorder = recorder.lock().await;
58 if let Err(e) = recorder.flush().await {
59 eprintln!("Failed to flush recorder: {:?}", e);
60 }
61 if let Err(e) = recorder.close().await {
62 eprintln!("Failed to close recorder: {:?}", e);
63 }
64 });
65 })
66 .join();
67 }
68 }
69}
70
71impl Muse {
72 pub fn new(config: &Config) -> MuseResult<Self> {
82 let client: Arc<dyn Transport + Send + Sync> = match config.client_type {
83 ClientType::Poet => Arc::new(PoetClient::new(&config.endpoints)),
84 ClientType::Mock => Arc::new(MockClient::new(config.default_resolution)),
85 };
86
87 let recorder: Option<Arc<Mutex<dyn Recorder + Send + Sync>>> = if config.recording_enabled {
88 if let Some(path) = &config.recording_path {
89 let file_recorder =
90 FileRecorder::new(Path::new(path)).expect("Failed to create FileRecorder");
91 Some(Arc::new(Mutex::new(file_recorder))) } else {
93 return Err(MuseError::Configuration(
94 "Recording enabled but no recording path provided".to_string(),
95 ));
96 }
97 } else {
98 None
99 };
100
101 let cancellation_token = CancellationToken::new();
103
104 Ok(Self {
105 client,
106 state: Arc::new(State::new(config.default_resolution)),
107 recorder,
108 tasks: Vec::new(),
109 cancellation_token: cancellation_token.clone(),
110 is_initialized: Arc::new(AtomicBool::new(false)),
111 element_buffer: Arc::new(ElementBuffer::new(config.max_reg_elem_retries)),
112 metric_buffer: Arc::new(MetricBuffer::new()),
113 config: config.clone(),
114 })
115 }
116
117 pub async fn initialize(&mut self, timeout: Option<Duration>) -> MuseResult<()> {
129 if let Some(recorder) = &self.recorder {
131 let config_event = RecordedEvent::MuseConfig(self.config.clone());
132 recorder
133 .lock()
134 .await
135 .record(RecordedEventWithTime::new(config_event))
136 .await
137 .expect("Failed to record MuseConfig event");
138 }
139 let init_interval = self
141 .config
142 .initialization_interval
143 .unwrap_or(timing::INITIALIZATION_INTERVAL);
144 self.start_tasks(
145 self.config.element_kinds.to_vec(),
146 self.config.metric_definitions.to_vec(),
147 init_interval,
148 self.config.cluster_monitor_interval,
149 );
150 let deadline = timeout.map(|t| tokio::time::Instant::now() + t);
152 while !self.is_initialized() {
153 if let Some(deadline) = deadline {
154 if tokio::time::Instant::now() >= deadline {
155 return Err(MuseError::MuseInitializationTimeout(timeout.unwrap()));
156 }
157 }
158 tokio::time::sleep(init_interval).await;
159 }
160 Ok(())
161 }
162
163 pub fn get_state(&self) -> Arc<State> {
169 self.state.clone()
170 }
171
172 pub fn get_finest_resolution(&self) -> TimestampResolution {
178 self.state.get_finest_resolution()
179 }
180
181 pub fn get_client(&self) -> Arc<dyn Transport + Send + Sync> {
187 self.client.clone()
188 }
189
190 fn start_tasks(
191 &mut self,
192 element_kinds: Vec<ElementKindRegistration>,
193 metric_definitions: Vec<MetricDefinition>,
194 initialization_interval: Duration,
195 cluster_monitor_interval: Option<Duration>,
196 ) {
197 let cancellation_token = self.cancellation_token.clone();
198 let client = self.client.clone();
199 let state = self.state.clone();
200 let is_initialized = self.is_initialized.clone();
201
202 if let Some(recorder) = &self.recorder {
204 let flush_interval = self
205 .config
206 .recording_flush_interval
207 .unwrap_or(timing::RECORDING_FLUSH_INTERVAL);
208 let flush_task = tokio::spawn(tasks::start_recorder_flush_task(
209 cancellation_token.clone(),
210 recorder.clone(),
211 flush_interval,
212 ));
213 self.tasks.push(flush_task);
214 }
215
216 let init_task_handle = tokio::spawn(tasks::start_init_task(
218 cancellation_token.clone(),
219 client.clone(),
220 state.clone(),
221 element_kinds,
222 metric_definitions,
223 initialization_interval,
224 is_initialized.clone(),
225 ));
226 self.tasks.push(init_task_handle);
227
228 let cluster_monitoring_handle = tokio::spawn(tasks::start_cluster_monitor(
230 cancellation_token.clone(),
231 client.clone(),
232 state.clone(),
233 is_initialized,
234 cluster_monitor_interval.unwrap_or(timing::CLUSTER_MONITOR_INTERVAL),
235 ));
236 self.tasks.push(cluster_monitoring_handle);
237
238 let elem_reg_handle = tokio::spawn(tasks::start_element_registration_task(
240 cancellation_token.clone(),
241 client.clone(),
242 state.clone(),
243 self.element_buffer.clone(),
244 ));
245 self.tasks.push(elem_reg_handle);
246
247 let metric_sender_handle = tokio::spawn(tasks::start_metric_sender_task(
249 cancellation_token.clone(),
250 client,
251 state,
252 self.metric_buffer.clone(),
253 ));
254 self.tasks.push(metric_sender_handle);
255 }
256
257 pub fn is_initialized(&self) -> bool {
263 self.is_initialized.load(Ordering::SeqCst)
264 }
265
266 pub async fn register_element(
283 &self,
284 kind_code: &str,
285 name: String,
286 metadata: HashMap<String, String>,
287 parent_id: Option<LocalElementId>,
288 ) -> MuseResult<LocalElementId> {
289 let local_elem_id = generate_local_element_id();
290 self.register_element_inner(local_elem_id, kind_code, name, metadata, parent_id)
291 .await?;
292 Ok(local_elem_id)
293 }
294
295 async fn register_element_inner(
296 &self,
297 local_elem_id: LocalElementId,
298 kind_code: &str,
299 name: String,
300 metadata: HashMap<String, String>,
301 parent_id: Option<LocalElementId>,
302 ) -> MuseResult<()> {
303 if let Some(recorder) = &self.recorder {
305 let event = RecordedEvent::ElementRegistration {
306 local_elem_id,
307 kind_code: kind_code.to_string(),
308 name: name.clone(),
309 metadata: metadata.clone(),
310 parent_id,
311 };
312 recorder
313 .lock()
314 .await
315 .record(RecordedEventWithTime::new(event))
316 .await?;
317 }
318 if !self.state.is_valid_element_kind_code(kind_code) {
319 return Err(MuseError::InvalidElementKindCode(kind_code.to_string()));
320 }
321 let remote_parent_id = match parent_id {
322 Some(p) => {
323 let remote_id = self
324 .get_remote_element_id(&p)
325 .ok_or(MuseError::NotAvailableRemoteElementId(p))?;
326 Some(remote_id)
327 }
328 None => None,
329 };
330 let element = ElementRegistration::new(kind_code, name, metadata, remote_parent_id);
331 self.element_buffer
332 .add_element(local_elem_id, element)
333 .await;
334 Ok(())
335 }
336
337 pub fn get_remote_element_id(&self, local_elem_id: &LocalElementId) -> Option<ElementId> {
347 self.state.get_element_id(local_elem_id)
348 }
349
350 pub async fn send_metric(
362 &self,
363 local_elem_id: LocalElementId,
364 metric_code: &str,
365 value: MetricValue,
366 ) -> MuseResult<()> {
367 if let Some(recorder) = &self.recorder {
369 let event = RecordedEvent::SendMetric {
370 local_elem_id,
371 metric_code: metric_code.to_string(),
372 value,
373 };
374 recorder
375 .lock()
376 .await
377 .record(RecordedEventWithTime::new(event))
378 .await?;
379 }
380
381 if !self.state.is_valid_metric_code(metric_code) {
382 return Err(MuseError::InvalidMetricCode(metric_code.to_string()));
383 }
384
385 self.metric_buffer
386 .add_metric(local_elem_id, metric_code.to_string(), value)
387 .await;
388
389 Ok(())
390 }
391
392 pub async fn get_metrics(&self, query: &MetricQuery) -> MuseResult<Vec<MetricPayload>> {
409 self.client.get_metrics(query, None).await
412 }
413
414 pub async fn replay(&self, replay_path: &Path) -> MuseResult<()> {
426 if self.config.recording_enabled {
427 return Err(MuseError::Replaying(
428 "Cannot replay with recording enabled".to_string(),
429 ));
430 }
431 let mut replayer = FileReplayer::new(replay_path).await?;
432
433 let mut last_timestamp = None;
434
435 while let Some(timed_event) = replayer.next_event().await? {
436 if let Some(last) = last_timestamp {
437 let delay = Duration::from_micros((timed_event.timestamp - last) as u64);
438 tokio::time::sleep(delay).await;
439 }
440
441 last_timestamp = Some(timed_event.timestamp);
442
443 match timed_event.event {
444 RecordedEvent::MuseConfig(recorded_config) => {
445 if !self.config.is_relevantly_equal(&recorded_config) {
446 let differences = recorded_config.pretty_diff(&self.config);
447 log::warn!(
448 "Recorded config and current config do not match:\n{}",
449 differences
450 );
451 }
452 }
453 RecordedEvent::ElementRegistration {
454 local_elem_id,
455 kind_code,
456 name,
457 metadata,
458 parent_id,
459 } => {
460 self.register_element_inner(
461 local_elem_id,
462 &kind_code,
463 name,
464 metadata,
465 parent_id,
466 )
467 .await?;
468 }
469 RecordedEvent::SendMetric {
470 local_elem_id,
471 metric_code,
472 value,
473 } => {
474 self.send_metric(local_elem_id, &metric_code, value).await?;
475 }
476 }
477 }
478 Ok(())
479 }
480
481 pub async fn from_replay(replay_path: &Path) -> MuseResult<Self> {
490 let mut replayer = FileReplayer::new(replay_path).await?;
491 let mut config: Option<Config> = None;
492
493 if let Some(timed_event) = replayer.next_event().await? {
495 if let RecordedEvent::MuseConfig(mut c) = timed_event.event {
496 c.recording_enabled = false;
497 config = Some(c);
498 }
499 }
500
501 let config = config.ok_or_else(|| {
503 MuseError::Replaying("No ConfigUpdate event found in the replay file.".to_string())
504 })?;
505
506 let mut muse = Muse::new(&config)?;
508 muse.initialize(None).await?;
509 Ok(muse)
510 }
511
512 pub async fn check_and_replay(replay_path: &Path) -> MuseResult<Self> {
521 if replay_path.exists() {
522 log::info!("Replay file found: {:?}", replay_path);
523 let muse = Muse::from_replay(replay_path).await?;
524 muse.replay(replay_path).await?;
525 log::info!("Replay completed successfully.");
526 return Ok(muse);
527 }
528 log::info!("No replay file found at: {:?}", replay_path);
529 Err(MuseError::Replaying(format!(
530 "Replay file not found: {:?}",
531 replay_path
532 )))
533 }
534}