1use std::sync::Arc;
7use std::time::{Duration, Instant};
8use things3_core::{ObservabilityManager, ThingsDatabase};
9use tokio::time::interval;
10use tracing::{debug, error, info, instrument, warn};
11
12pub struct MetricsCollector {
14 observability: Arc<ObservabilityManager>,
15 database: Arc<ThingsDatabase>,
16 collection_interval: Duration,
17}
18
19impl MetricsCollector {
20 #[must_use]
22 pub fn new(
23 observability: Arc<ObservabilityManager>,
24 database: Arc<ThingsDatabase>,
25 collection_interval: Duration,
26 ) -> Self {
27 Self {
28 observability,
29 database,
30 collection_interval,
31 }
32 }
33
34 #[instrument(skip(self))]
40 pub async fn start_collection(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
41 info!(
42 "Starting metrics collection with interval: {:?}",
43 self.collection_interval
44 );
45
46 let mut interval = interval(self.collection_interval);
47
48 loop {
49 interval.tick().await;
50
51 if let Err(e) = self.collect_metrics().await {
52 error!("Failed to collect metrics: {}", e);
53 }
54 }
55 }
56
57 #[instrument(skip(self))]
59 async fn collect_metrics(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
60 debug!("Collecting metrics");
61
62 self.collect_system_metrics().await?;
64
65 self.collect_database_metrics().await?;
67
68 self.collect_application_metrics().await?;
70
71 debug!("Metrics collection completed");
72 Ok(())
73 }
74
75 #[instrument(skip(self))]
77 async fn collect_system_metrics(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
78 use sysinfo::{Pid, System};
79
80 let mut system = System::new_all();
81 system.refresh_all();
82
83 let current_pid = Pid::from_u32(std::process::id());
85 let process = system.process(current_pid);
86
87 if let Some(process) = process {
88 let memory_usage = process.memory() * 1024; let cpu_usage = f64::from(process.cpu_usage());
90
91 let cache_hit_rate = 0.85; let cache_size = 1024 * 1024; self.observability.update_performance_metrics(
96 memory_usage,
97 cpu_usage,
98 cache_hit_rate,
99 cache_size,
100 );
101
102 debug!(
103 memory_usage = memory_usage,
104 cpu_usage = cpu_usage,
105 cache_hit_rate = cache_hit_rate,
106 cache_size = cache_size,
107 "System metrics collected"
108 );
109 }
110
111 Ok(())
112 }
113
114 #[instrument(skip(self))]
116 async fn collect_database_metrics(
117 &self,
118 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
119 let is_connected = true; if !is_connected {
123 warn!("Database connection is not healthy");
124 self.observability
125 .record_error("database_connection", "Database connection lost");
126 }
127
128 debug!("Database metrics collected");
133 Ok(())
134 }
135
136 #[instrument(skip(self))]
138 async fn collect_application_metrics(
139 &self,
140 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
141 self.collect_task_metrics().await?;
143
144 self.collect_search_metrics().await?;
146
147 self.collect_export_metrics().await?;
149
150 debug!("Application metrics collected");
151 Ok(())
152 }
153
154 #[instrument(skip(self))]
156 async fn collect_task_metrics(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
157 let inbox_count = self
162 .database
163 .get_inbox(Some(1000))
164 .await
165 .map_err(|e| {
166 error!("Failed to get inbox count: {}", e);
167 e
168 })?
169 .len();
170
171 let today_count = self
172 .database
173 .get_today(Some(1000))
174 .await
175 .map_err(|e| {
176 error!("Failed to get today count: {}", e);
177 e
178 })?
179 .len();
180
181 debug!(
182 inbox_count = inbox_count,
183 today_count = today_count,
184 "Task metrics collected"
185 );
186
187 Ok(())
188 }
189
190 #[instrument(skip(self))]
192 async fn collect_search_metrics(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
193 debug!("Search metrics collected");
197 Ok(())
198 }
199
200 #[instrument(skip(self))]
202 async fn collect_export_metrics(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
203 debug!("Export metrics collected");
207 Ok(())
208 }
209}
210
211pub struct PerformanceMonitor {
213 observability: Arc<ObservabilityManager>,
214}
215
216impl PerformanceMonitor {
217 #[must_use]
219 pub fn new(observability: Arc<ObservabilityManager>) -> Self {
220 Self { observability }
221 }
222
223 #[instrument(skip(self, f))]
225 pub fn monitor_db_operation<F, R>(&self, operation: &str, f: F) -> R
226 where
227 F: FnOnce() -> R,
228 {
229 self.observability.record_db_operation(operation, f)
230 }
231
232 #[instrument(skip(self, f))]
234 pub fn monitor_search<F, R>(&self, query: &str, f: F) -> R
235 where
236 F: FnOnce() -> R,
237 {
238 self.observability.record_search_operation(query, f)
239 }
240
241 #[instrument(skip(self))]
243 pub fn monitor_task_operation(&self, operation: &str, count: u64) {
244 self.observability.record_task_operation(operation, count);
245 }
246
247 #[instrument(skip(self, f))]
249 pub fn monitor_export<F, R>(&self, format: &str, f: F) -> R
250 where
251 F: FnOnce() -> R,
252 {
253 let start = Instant::now();
254 let result = f();
255 let duration = start.elapsed();
256
257 debug!(
260 format = format,
261 duration_ms = duration.as_millis(),
262 "Export operation completed"
263 );
264
265 result
266 }
267}
268
269pub struct ErrorTracker {
271 observability: Arc<ObservabilityManager>,
272}
273
274impl ErrorTracker {
275 #[must_use]
277 pub fn new(observability: Arc<ObservabilityManager>) -> Self {
278 Self { observability }
279 }
280
281 #[instrument(skip(self))]
283 pub fn track_error(&self, error_type: &str, error_message: &str) {
284 self.observability.record_error(error_type, error_message);
285 }
286
287 #[instrument(skip(self))]
289 pub fn track_db_error(&self, operation: &str, error: &dyn std::error::Error) {
290 let error_type = format!("database_{operation}");
291 let error_message = format!("Database operation '{operation}' failed: {error}");
292 self.track_error(&error_type, &error_message);
293 }
294
295 #[instrument(skip(self))]
297 pub fn track_search_error(&self, query: &str, error: &dyn std::error::Error) {
298 let error_type = "search_error";
299 let error_message = format!("Search query '{query}' failed: {error}");
300 self.track_error(error_type, &error_message);
301 }
302
303 #[instrument(skip(self))]
305 pub fn track_export_error(&self, format: &str, error: &dyn std::error::Error) {
306 let error_type = "export_error";
307 let error_message = format!("Export in '{format}' format failed: {error}");
308 self.track_error(error_type, &error_message);
309 }
310}
311
312pub async fn start_metrics_collection(
318 observability: Arc<ObservabilityManager>,
319 database: Arc<ThingsDatabase>,
320 collection_interval: Duration,
321) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
322 let collector = MetricsCollector::new(observability, database, collection_interval);
323 collector.start_collection().await
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329 use std::sync::Arc;
330 use std::time::Duration;
331 use tempfile::NamedTempFile;
332 use things3_core::{ObservabilityConfig, ThingsConfig};
333
334 #[test]
335 fn test_performance_monitor_creation() {
336 let temp_file = NamedTempFile::new().unwrap();
337 let db_path = temp_file.path();
338
339 let config = ThingsConfig::new(db_path, false);
340 let rt = tokio::runtime::Runtime::new().unwrap();
341 let _database = Arc::new(
342 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
343 );
344
345 let obs_config = ObservabilityConfig::default();
346 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
347
348 let _monitor = PerformanceMonitor::new(observability);
349 }
351
352 #[test]
353 fn test_error_tracker_creation() {
354 let temp_file = NamedTempFile::new().unwrap();
355 let db_path = temp_file.path();
356
357 let config = ThingsConfig::new(db_path, false);
358 let rt = tokio::runtime::Runtime::new().unwrap();
359 let _database = Arc::new(
360 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
361 );
362
363 let obs_config = ObservabilityConfig::default();
364 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
365
366 let _tracker = ErrorTracker::new(observability);
367 }
369
370 #[test]
371 fn test_metrics_collector_creation() {
372 let temp_file = NamedTempFile::new().unwrap();
373 let db_path = temp_file.path();
374
375 let config = ThingsConfig::new(db_path, false);
376 let rt = tokio::runtime::Runtime::new().unwrap();
377 let database = Arc::new(
378 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
379 );
380
381 let obs_config = ObservabilityConfig::default();
382 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
383
384 let _collector = MetricsCollector::new(observability, database, Duration::from_secs(30));
385 }
387
388 #[tokio::test]
389 async fn test_performance_monitor_timing() {
390 let temp_file = NamedTempFile::new().unwrap();
391 let db_path = temp_file.path();
392
393 let config = ThingsConfig::new(db_path, false);
394 let _database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
395
396 let obs_config = ObservabilityConfig::default();
397 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
398
399 let monitor = PerformanceMonitor::new(Arc::clone(&observability));
400
401 let result = monitor.monitor_db_operation("test_operation", || {
403 "test_result"
405 });
406 assert_eq!(result, "test_result");
407 }
408
409 #[tokio::test]
410 async fn test_performance_monitor_error_tracking() {
411 let temp_file = NamedTempFile::new().unwrap();
412 let db_path = temp_file.path();
413
414 let config = ThingsConfig::new(db_path, false);
415 let _database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
416
417 let obs_config = ObservabilityConfig::default();
418 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
419
420 let monitor = PerformanceMonitor::new(Arc::clone(&observability));
421
422 monitor.monitor_task_operation("test_operation", 5);
424 }
425
426 #[tokio::test]
427 async fn test_error_tracker_database_error() {
428 let temp_file = NamedTempFile::new().unwrap();
429 let db_path = temp_file.path();
430
431 let config = ThingsConfig::new(db_path, false);
432 let _database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
433
434 let obs_config = ObservabilityConfig::default();
435 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
436
437 let tracker = ErrorTracker::new(Arc::clone(&observability));
438
439 let error = std::io::Error::new(std::io::ErrorKind::NotFound, "Database not found");
441 tracker.track_db_error("test_operation", &error);
442 }
443
444 #[tokio::test]
445 async fn test_error_tracker_search_error() {
446 let temp_file = NamedTempFile::new().unwrap();
447 let db_path = temp_file.path();
448
449 let config = ThingsConfig::new(db_path, false);
450 let _database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
451
452 let obs_config = ObservabilityConfig::default();
453 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
454
455 let tracker = ErrorTracker::new(Arc::clone(&observability));
456
457 let error = std::io::Error::new(std::io::ErrorKind::InvalidInput, "Invalid search query");
459 tracker.track_search_error("test query", &error);
460 }
461
462 #[tokio::test]
463 async fn test_error_tracker_export_error() {
464 let temp_file = NamedTempFile::new().unwrap();
465 let db_path = temp_file.path();
466
467 let config = ThingsConfig::new(db_path, false);
468 let _database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
469
470 let obs_config = ObservabilityConfig::default();
471 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
472
473 let tracker = ErrorTracker::new(Arc::clone(&observability));
474
475 let error = std::io::Error::new(std::io::ErrorKind::PermissionDenied, "Export failed");
477 tracker.track_export_error("json", &error);
478 }
479
480 #[tokio::test]
481 async fn test_metrics_collector_system_metrics() {
482 let temp_file = NamedTempFile::new().unwrap();
483 let db_path = temp_file.path();
484
485 let config = ThingsConfig::new(db_path, false);
486 let database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
487
488 let obs_config = ObservabilityConfig::default();
489 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
490
491 let collector = MetricsCollector::new(
492 Arc::clone(&observability),
493 Arc::clone(&database),
494 Duration::from_secs(30),
495 );
496
497 let result = collector.collect_system_metrics().await;
499 assert!(result.is_ok());
500 }
501
502 #[tokio::test]
503 async fn test_metrics_collector_database_metrics() {
504 let temp_file = NamedTempFile::new().unwrap();
505 let db_path = temp_file.path();
506
507 let config = ThingsConfig::new(db_path, false);
508 let database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
509
510 let obs_config = ObservabilityConfig::default();
511 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
512
513 let collector = MetricsCollector::new(
514 Arc::clone(&observability),
515 Arc::clone(&database),
516 Duration::from_secs(30),
517 );
518
519 let result = collector.collect_database_metrics().await;
521 assert!(result.is_ok());
522 }
523
524 #[tokio::test]
525 async fn test_metrics_collector_search_metrics() {
526 let temp_file = NamedTempFile::new().unwrap();
527 let db_path = temp_file.path();
528
529 let config = ThingsConfig::new(db_path, false);
530 let database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
531
532 let obs_config = ObservabilityConfig::default();
533 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
534
535 let collector = MetricsCollector::new(
536 Arc::clone(&observability),
537 Arc::clone(&database),
538 Duration::from_secs(30),
539 );
540
541 let result = collector.collect_search_metrics().await;
543 assert!(result.is_ok());
544 }
545
546 #[tokio::test]
547 async fn test_metrics_collector_export_metrics() {
548 let temp_file = NamedTempFile::new().unwrap();
549 let db_path = temp_file.path();
550
551 let config = ThingsConfig::new(db_path, false);
552 let database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
553
554 let obs_config = ObservabilityConfig::default();
555 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
556
557 let collector = MetricsCollector::new(
558 Arc::clone(&observability),
559 Arc::clone(&database),
560 Duration::from_secs(30),
561 );
562
563 let result = collector.collect_export_metrics().await;
565 assert!(result.is_ok());
566 }
567
568 #[tokio::test]
569 async fn test_start_metrics_collection() {
570 let temp_file = NamedTempFile::new().unwrap();
571 let db_path = temp_file.path();
572
573 let config = ThingsConfig::new(db_path, false);
574 let database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
575
576 let obs_config = ObservabilityConfig::default();
577 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
578
579 let collection_handle = tokio::spawn(async move {
581 start_metrics_collection(observability, database, Duration::from_millis(100)).await
582 });
583
584 tokio::time::sleep(Duration::from_millis(50)).await;
586 collection_handle.abort();
587 }
588
589 #[test]
590 fn test_performance_monitor_with_custom_observability() {
591 let temp_file = NamedTempFile::new().unwrap();
592 let db_path = temp_file.path();
593
594 let config = ThingsConfig::new(db_path, false);
595 let rt = tokio::runtime::Runtime::new().unwrap();
596 let _database = Arc::new(
597 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
598 );
599
600 let obs_config = ObservabilityConfig {
601 service_name: "test-service".to_string(),
602 ..Default::default()
603 };
604 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
605
606 let _monitor = PerformanceMonitor::new(observability);
607 }
609
610 #[test]
611 fn test_error_tracker_with_custom_observability() {
612 let temp_file = NamedTempFile::new().unwrap();
613 let db_path = temp_file.path();
614
615 let config = ThingsConfig::new(db_path, false);
616 let rt = tokio::runtime::Runtime::new().unwrap();
617 let _database = Arc::new(
618 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
619 );
620
621 let obs_config = ObservabilityConfig {
622 service_name: "test-service".to_string(),
623 ..Default::default()
624 };
625 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
626
627 let _tracker = ErrorTracker::new(observability);
628 }
630
631 #[test]
632 fn test_metrics_collector_with_different_intervals() {
633 let temp_file = NamedTempFile::new().unwrap();
634 let db_path = temp_file.path();
635
636 let config = ThingsConfig::new(db_path, false);
637 let rt = tokio::runtime::Runtime::new().unwrap();
638 let database = Arc::new(
639 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
640 );
641
642 let obs_config = ObservabilityConfig::default();
643 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
644
645 let _collector1 = MetricsCollector::new(
647 Arc::clone(&observability),
648 Arc::clone(&database),
649 Duration::from_secs(1),
650 );
651 let _collector2 = MetricsCollector::new(
652 Arc::clone(&observability),
653 Arc::clone(&database),
654 Duration::from_secs(60),
655 );
656 let _collector3 = MetricsCollector::new(
657 Arc::clone(&observability),
658 Arc::clone(&database),
659 Duration::from_millis(500),
660 );
661 }
662}