1use std::sync::Arc;
7use std::time::{Duration, Instant};
8use things3_core::{ObservabilityManager, ThingsDatabase};
9use tokio::time::interval;
10use tracing::{debug, error, info, instrument, warn};
11
12pub struct MetricsCollector {
14 observability: Arc<ObservabilityManager>,
15 database: Arc<ThingsDatabase>,
16 collection_interval: Duration,
17}
18
19impl MetricsCollector {
20 #[must_use]
22 pub fn new(
23 observability: Arc<ObservabilityManager>,
24 database: Arc<ThingsDatabase>,
25 collection_interval: Duration,
26 ) -> Self {
27 Self {
28 observability,
29 database,
30 collection_interval,
31 }
32 }
33
34 #[instrument(skip(self))]
40 pub async fn start_collection(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
41 info!(
42 "Starting metrics collection with interval: {:?}",
43 self.collection_interval
44 );
45
46 let mut interval = interval(self.collection_interval);
47
48 loop {
49 interval.tick().await;
50
51 if let Err(e) = self.collect_metrics().await {
52 error!("Failed to collect metrics: {}", e);
53 }
54 }
55 }
56
57 #[instrument(skip(self))]
59 async fn collect_metrics(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
60 debug!("Collecting metrics");
61
62 self.collect_system_metrics().await?;
64
65 self.collect_database_metrics().await?;
67
68 self.collect_application_metrics().await?;
70
71 debug!("Metrics collection completed");
72 Ok(())
73 }
74
75 #[instrument(skip(self))]
77 async fn collect_system_metrics(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
78 use sysinfo::{Pid, System};
79
80 let mut system = System::new_all();
81 system.refresh_all();
82
83 let current_pid = Pid::from_u32(std::process::id());
85 let process = system.process(current_pid);
86
87 if let Some(process) = process {
88 let memory_usage = process.memory() * 1024; let cpu_usage = f64::from(process.cpu_usage());
90
91 let cache_hit_rate = 0.85; let cache_size = 1024 * 1024; self.observability.update_performance_metrics(
96 memory_usage,
97 cpu_usage,
98 cache_hit_rate,
99 cache_size,
100 );
101
102 debug!(
103 memory_usage = memory_usage,
104 cpu_usage = cpu_usage,
105 cache_hit_rate = cache_hit_rate,
106 cache_size = cache_size,
107 "System metrics collected"
108 );
109 }
110
111 Ok(())
112 }
113
114 #[instrument(skip(self))]
116 async fn collect_database_metrics(
117 &self,
118 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
119 let is_connected = true; if !is_connected {
123 warn!("Database connection is not healthy");
124 self.observability
125 .record_error("database_connection", "Database connection lost");
126 }
127
128 debug!("Database metrics collected");
133 Ok(())
134 }
135
136 #[instrument(skip(self))]
138 async fn collect_application_metrics(
139 &self,
140 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
141 self.collect_task_metrics().await?;
143
144 self.collect_search_metrics().await?;
146
147 self.collect_export_metrics().await?;
149
150 debug!("Application metrics collected");
151 Ok(())
152 }
153
154 #[instrument(skip(self))]
156 async fn collect_task_metrics(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
157 let inbox_count = self
162 .database
163 .get_inbox(Some(1000))
164 .await
165 .map_err(|e| {
166 error!("Failed to get inbox count: {}", e);
167 e
168 })?
169 .len();
170
171 let today_count = self
172 .database
173 .get_today(Some(1000))
174 .await
175 .map_err(|e| {
176 error!("Failed to get today count: {}", e);
177 e
178 })?
179 .len();
180
181 debug!(
182 inbox_count = inbox_count,
183 today_count = today_count,
184 "Task metrics collected"
185 );
186
187 Ok(())
188 }
189
190 #[instrument(skip(self))]
192 async fn collect_search_metrics(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
193 debug!("Search metrics collected");
197 Ok(())
198 }
199
200 #[instrument(skip(self))]
202 async fn collect_export_metrics(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
203 debug!("Export metrics collected");
207 Ok(())
208 }
209}
210
211pub struct PerformanceMonitor {
213 observability: Arc<ObservabilityManager>,
214}
215
216impl PerformanceMonitor {
217 #[must_use]
219 pub fn new(observability: Arc<ObservabilityManager>) -> Self {
220 Self { observability }
221 }
222
223 #[instrument(skip(self, f))]
225 pub fn monitor_db_operation<F, R>(&self, operation: &str, f: F) -> R
226 where
227 F: FnOnce() -> R,
228 {
229 self.observability.record_db_operation(operation, f)
230 }
231
232 #[instrument(skip(self, f))]
234 pub fn monitor_search<F, R>(&self, query: &str, f: F) -> R
235 where
236 F: FnOnce() -> R,
237 {
238 self.observability.record_search_operation(query, f)
239 }
240
241 #[instrument(skip(self))]
243 pub fn monitor_task_operation(&self, operation: &str, count: u64) {
244 self.observability.record_task_operation(operation, count);
245 }
246
247 #[instrument(skip(self, f))]
249 pub fn monitor_export<F, R>(&self, format: &str, f: F) -> R
250 where
251 F: FnOnce() -> R,
252 {
253 let start = Instant::now();
254 let result = f();
255 let duration = start.elapsed();
256
257 debug!(
260 format = format,
261 duration_ms = duration.as_millis(),
262 "Export operation completed"
263 );
264
265 result
266 }
267}
268
269pub struct ErrorTracker {
271 observability: Arc<ObservabilityManager>,
272}
273
274impl ErrorTracker {
275 #[must_use]
277 pub fn new(observability: Arc<ObservabilityManager>) -> Self {
278 Self { observability }
279 }
280
281 #[instrument(skip(self))]
283 pub fn track_error(&self, error_type: &str, error_message: &str) {
284 self.observability.record_error(error_type, error_message);
285 }
286
287 #[instrument(skip(self))]
289 pub fn track_db_error(&self, operation: &str, error: &dyn std::error::Error) {
290 let error_type = format!("database_{operation}");
291 let error_message = format!("Database operation '{operation}' failed: {error}");
292 self.track_error(&error_type, &error_message);
293 }
294
295 #[instrument(skip(self))]
297 pub fn track_search_error(&self, query: &str, error: &dyn std::error::Error) {
298 let error_type = "search_error";
299 let error_message = format!("Search query '{query}' failed: {error}");
300 self.track_error(error_type, &error_message);
301 }
302
303 #[instrument(skip(self))]
305 pub fn track_export_error(&self, format: &str, error: &dyn std::error::Error) {
306 let error_type = "export_error";
307 let error_message = format!("Export in '{format}' format failed: {error}");
308 self.track_error(error_type, &error_message);
309 }
310}
311
312pub async fn start_metrics_collection(
318 observability: Arc<ObservabilityManager>,
319 database: Arc<ThingsDatabase>,
320 collection_interval: Duration,
321) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
322 let collector = MetricsCollector::new(observability, database, collection_interval);
323 collector.start_collection().await
324}
325
326#[cfg(test)]
327#[allow(deprecated)]
328mod tests {
329 use super::*;
330 use std::sync::Arc;
331 use std::time::Duration;
332 use tempfile::NamedTempFile;
333 use things3_core::{ObservabilityConfig, ThingsConfig};
334
335 #[test]
336 fn test_performance_monitor_creation() {
337 let temp_file = NamedTempFile::new().unwrap();
338 let db_path = temp_file.path();
339
340 let config = ThingsConfig::new(db_path, false);
341 let rt = tokio::runtime::Runtime::new().unwrap();
342 let _database = Arc::new(
343 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
344 );
345
346 let obs_config = ObservabilityConfig::default();
347 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
348
349 let _monitor = PerformanceMonitor::new(observability);
350 }
352
353 #[test]
354 fn test_error_tracker_creation() {
355 let temp_file = NamedTempFile::new().unwrap();
356 let db_path = temp_file.path();
357
358 let config = ThingsConfig::new(db_path, false);
359 let rt = tokio::runtime::Runtime::new().unwrap();
360 let _database = Arc::new(
361 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
362 );
363
364 let obs_config = ObservabilityConfig::default();
365 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
366
367 let _tracker = ErrorTracker::new(observability);
368 }
370
371 #[test]
372 fn test_metrics_collector_creation() {
373 let temp_file = NamedTempFile::new().unwrap();
374 let db_path = temp_file.path();
375
376 let config = ThingsConfig::new(db_path, false);
377 let rt = tokio::runtime::Runtime::new().unwrap();
378 let database = Arc::new(
379 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
380 );
381
382 let obs_config = ObservabilityConfig::default();
383 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
384
385 let _collector = MetricsCollector::new(observability, database, Duration::from_secs(30));
386 }
388
389 #[tokio::test]
390 async fn test_performance_monitor_timing() {
391 let temp_file = NamedTempFile::new().unwrap();
392 let db_path = temp_file.path();
393
394 let config = ThingsConfig::new(db_path, false);
395 let _database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
396
397 let obs_config = ObservabilityConfig::default();
398 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
399
400 let monitor = PerformanceMonitor::new(Arc::clone(&observability));
401
402 let result = monitor.monitor_db_operation("test_operation", || {
404 "test_result"
406 });
407 assert_eq!(result, "test_result");
408 }
409
410 #[tokio::test]
411 async fn test_performance_monitor_error_tracking() {
412 let temp_file = NamedTempFile::new().unwrap();
413 let db_path = temp_file.path();
414
415 let config = ThingsConfig::new(db_path, false);
416 let _database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
417
418 let obs_config = ObservabilityConfig::default();
419 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
420
421 let monitor = PerformanceMonitor::new(Arc::clone(&observability));
422
423 monitor.monitor_task_operation("test_operation", 5);
425 }
426
427 #[tokio::test]
428 async fn test_error_tracker_database_error() {
429 let temp_file = NamedTempFile::new().unwrap();
430 let db_path = temp_file.path();
431
432 let config = ThingsConfig::new(db_path, false);
433 let _database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
434
435 let obs_config = ObservabilityConfig::default();
436 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
437
438 let tracker = ErrorTracker::new(Arc::clone(&observability));
439
440 let error = std::io::Error::new(std::io::ErrorKind::NotFound, "Database not found");
442 tracker.track_db_error("test_operation", &error);
443 }
444
445 #[tokio::test]
446 async fn test_error_tracker_search_error() {
447 let temp_file = NamedTempFile::new().unwrap();
448 let db_path = temp_file.path();
449
450 let config = ThingsConfig::new(db_path, false);
451 let _database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
452
453 let obs_config = ObservabilityConfig::default();
454 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
455
456 let tracker = ErrorTracker::new(Arc::clone(&observability));
457
458 let error = std::io::Error::new(std::io::ErrorKind::InvalidInput, "Invalid search query");
460 tracker.track_search_error("test query", &error);
461 }
462
463 #[tokio::test]
464 async fn test_error_tracker_export_error() {
465 let temp_file = NamedTempFile::new().unwrap();
466 let db_path = temp_file.path();
467
468 let config = ThingsConfig::new(db_path, false);
469 let _database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
470
471 let obs_config = ObservabilityConfig::default();
472 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
473
474 let tracker = ErrorTracker::new(Arc::clone(&observability));
475
476 let error = std::io::Error::new(std::io::ErrorKind::PermissionDenied, "Export failed");
478 tracker.track_export_error("json", &error);
479 }
480
481 #[tokio::test]
482 async fn test_metrics_collector_system_metrics() {
483 let temp_file = NamedTempFile::new().unwrap();
484 let db_path = temp_file.path();
485
486 let config = ThingsConfig::new(db_path, false);
487 let database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
488
489 let obs_config = ObservabilityConfig::default();
490 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
491
492 let collector = MetricsCollector::new(
493 Arc::clone(&observability),
494 Arc::clone(&database),
495 Duration::from_secs(30),
496 );
497
498 let result = collector.collect_system_metrics().await;
500 assert!(result.is_ok());
501 }
502
503 #[tokio::test]
504 async fn test_metrics_collector_database_metrics() {
505 let temp_file = NamedTempFile::new().unwrap();
506 let db_path = temp_file.path();
507
508 let config = ThingsConfig::new(db_path, false);
509 let database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
510
511 let obs_config = ObservabilityConfig::default();
512 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
513
514 let collector = MetricsCollector::new(
515 Arc::clone(&observability),
516 Arc::clone(&database),
517 Duration::from_secs(30),
518 );
519
520 let result = collector.collect_database_metrics().await;
522 assert!(result.is_ok());
523 }
524
525 #[tokio::test]
526 async fn test_metrics_collector_search_metrics() {
527 let temp_file = NamedTempFile::new().unwrap();
528 let db_path = temp_file.path();
529
530 let config = ThingsConfig::new(db_path, false);
531 let database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
532
533 let obs_config = ObservabilityConfig::default();
534 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
535
536 let collector = MetricsCollector::new(
537 Arc::clone(&observability),
538 Arc::clone(&database),
539 Duration::from_secs(30),
540 );
541
542 let result = collector.collect_search_metrics().await;
544 assert!(result.is_ok());
545 }
546
547 #[tokio::test]
548 async fn test_metrics_collector_export_metrics() {
549 let temp_file = NamedTempFile::new().unwrap();
550 let db_path = temp_file.path();
551
552 let config = ThingsConfig::new(db_path, false);
553 let database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
554
555 let obs_config = ObservabilityConfig::default();
556 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
557
558 let collector = MetricsCollector::new(
559 Arc::clone(&observability),
560 Arc::clone(&database),
561 Duration::from_secs(30),
562 );
563
564 let result = collector.collect_export_metrics().await;
566 assert!(result.is_ok());
567 }
568
569 #[tokio::test]
570 async fn test_start_metrics_collection() {
571 let temp_file = NamedTempFile::new().unwrap();
572 let db_path = temp_file.path();
573
574 let config = ThingsConfig::new(db_path, false);
575 let database = Arc::new(ThingsDatabase::new(&config.database_path).await.unwrap());
576
577 let obs_config = ObservabilityConfig::default();
578 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
579
580 let collection_handle = tokio::spawn(async move {
582 start_metrics_collection(observability, database, Duration::from_millis(100)).await
583 });
584
585 tokio::time::sleep(Duration::from_millis(50)).await;
587 collection_handle.abort();
588 }
589
590 #[test]
591 fn test_performance_monitor_with_custom_observability() {
592 let temp_file = NamedTempFile::new().unwrap();
593 let db_path = temp_file.path();
594
595 let config = ThingsConfig::new(db_path, false);
596 let rt = tokio::runtime::Runtime::new().unwrap();
597 let _database = Arc::new(
598 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
599 );
600
601 let obs_config = ObservabilityConfig {
602 service_name: "test-service".to_string(),
603 ..Default::default()
604 };
605 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
606
607 let _monitor = PerformanceMonitor::new(observability);
608 }
610
611 #[test]
612 fn test_error_tracker_with_custom_observability() {
613 let temp_file = NamedTempFile::new().unwrap();
614 let db_path = temp_file.path();
615
616 let config = ThingsConfig::new(db_path, false);
617 let rt = tokio::runtime::Runtime::new().unwrap();
618 let _database = Arc::new(
619 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
620 );
621
622 let obs_config = ObservabilityConfig {
623 service_name: "test-service".to_string(),
624 ..Default::default()
625 };
626 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
627
628 let _tracker = ErrorTracker::new(observability);
629 }
631
632 #[test]
633 fn test_metrics_collector_with_different_intervals() {
634 let temp_file = NamedTempFile::new().unwrap();
635 let db_path = temp_file.path();
636
637 let config = ThingsConfig::new(db_path, false);
638 let rt = tokio::runtime::Runtime::new().unwrap();
639 let database = Arc::new(
640 rt.block_on(async { ThingsDatabase::new(&config.database_path).await.unwrap() }),
641 );
642
643 let obs_config = ObservabilityConfig::default();
644 let observability = Arc::new(ObservabilityManager::new(obs_config).unwrap());
645
646 let _collector1 = MetricsCollector::new(
648 Arc::clone(&observability),
649 Arc::clone(&database),
650 Duration::from_secs(1),
651 );
652 let _collector2 = MetricsCollector::new(
653 Arc::clone(&observability),
654 Arc::clone(&database),
655 Duration::from_secs(60),
656 );
657 let _collector3 = MetricsCollector::new(
658 Arc::clone(&observability),
659 Arc::clone(&database),
660 Duration::from_millis(500),
661 );
662 }
663}