rust_task_queue/
tracing_utils.rs

1//! Comprehensive tracing utilities for the Rust Task Queue system
2//!
3//! This module provides structured tracing utilities for observability, debugging,
4//! and production monitoring of task queue operations.
5
6use crate::{TaskId, TaskPriority};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::time::{Duration, Instant};
10
11/// Task lifecycle events for comprehensive observability
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub enum TaskLifecycleEvent {
14    /// Task has been enqueued
15    Enqueued {
16        queue: String,
17        priority: TaskPriority,
18        payload_size_bytes: usize,
19        max_retries: u32,
20        timeout_seconds: u64,
21    },
22    /// Task has been dequeued by a worker
23    Dequeued {
24        worker_id: String,
25        queue: String,
26        wait_time_ms: u64,
27        payload_size_bytes: usize,
28    },
29    /// Task execution has started
30    ExecutionStarted {
31        worker_id: String,
32        attempt: u32,
33        max_retries: u32,
34    },
35    /// Task execution completed successfully
36    ExecutionCompleted {
37        worker_id: String,
38        duration_ms: u64,
39        result_size_bytes: usize,
40        attempt: u32,
41    },
42    /// Task execution failed
43    ExecutionFailed {
44        worker_id: String,
45        duration_ms: u64,
46        error: String,
47        error_source: Option<String>,
48        attempt: u32,
49    },
50    /// Task is being retried
51    Retrying {
52        attempt: u32,
53        max_retries: u32,
54        delay_ms: u64,
55        reason: String,
56    },
57    /// Task has permanently failed
58    PermanentlyFailed {
59        total_attempts: u32,
60        final_error: String,
61        total_duration_ms: u64,
62    },
63    /// Task has been scheduled for delayed execution
64    Scheduled {
65        execute_at: chrono::DateTime<chrono::Utc>,
66        delay_ms: i64,
67        queue: String,
68    },
69    /// Scheduled task moved to regular queue
70    MovedToRegularQueue {
71        from_scheduled: bool,
72        delay_from_scheduled_ms: i64,
73        queue: String,
74    },
75}
76
77/// Performance metrics for task execution
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct TaskPerformanceMetrics {
80    pub task_name: String,
81    pub execution_count: u64,
82    pub total_duration_ms: u64,
83    pub average_duration_ms: f64,
84    pub min_duration_ms: u64,
85    pub max_duration_ms: u64,
86    pub success_count: u64,
87    pub failure_count: u64,
88    pub success_rate: f64,
89    pub last_execution: chrono::DateTime<chrono::Utc>,
90}
91
92/// Queue performance metrics
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct QueuePerformanceMetrics {
95    pub queue_name: String,
96    pub current_depth: i64,
97    pub peak_depth: i64,
98    pub total_processed: u64,
99    pub total_failed: u64,
100    pub average_processing_time_ms: f64,
101    pub throughput_per_minute: f64,
102    pub last_activity: chrono::DateTime<chrono::Utc>,
103}
104
105/// Worker performance metrics
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct WorkerPerformanceMetrics {
108    pub worker_id: String,
109    pub tasks_processed: u64,
110    pub tasks_failed: u64,
111    pub total_processing_time_ms: u64,
112    pub average_processing_time_ms: f64,
113    pub current_active_tasks: usize,
114    pub max_concurrent_tasks: usize,
115    pub uptime_seconds: u64,
116    pub last_heartbeat: chrono::DateTime<chrono::Utc>,
117}
118
119/// Trace a task lifecycle event with structured logging
120pub fn trace_task_lifecycle_event(task_id: TaskId, task_name: &str, event: TaskLifecycleEvent) {
121    match event {
122        TaskLifecycleEvent::Enqueued {
123            queue,
124            priority,
125            payload_size_bytes,
126            max_retries,
127            timeout_seconds,
128        } => {
129            tracing::info!(
130                task_id = %task_id,
131                task_name = task_name,
132                queue = queue,
133                priority = ?priority,
134                payload_size_bytes = payload_size_bytes,
135                max_retries = max_retries,
136                timeout_seconds = timeout_seconds,
137                event = "enqueued",
138                "Task enqueued"
139            );
140        }
141        TaskLifecycleEvent::Dequeued {
142            worker_id,
143            queue,
144            wait_time_ms,
145            payload_size_bytes,
146        } => {
147            tracing::info!(
148                task_id = %task_id,
149                task_name = task_name,
150                worker_id = worker_id,
151                queue = queue,
152                wait_time_ms = wait_time_ms,
153                payload_size_bytes = payload_size_bytes,
154                event = "dequeued",
155                "Task dequeued"
156            );
157        }
158        TaskLifecycleEvent::ExecutionStarted {
159            worker_id,
160            attempt,
161            max_retries,
162        } => {
163            tracing::info!(
164                task_id = %task_id,
165                task_name = task_name,
166                worker_id = worker_id,
167                attempt = attempt,
168                max_retries = max_retries,
169                event = "execution_started",
170                "Task execution started"
171            );
172        }
173        TaskLifecycleEvent::ExecutionCompleted {
174            worker_id,
175            duration_ms,
176            result_size_bytes,
177            attempt,
178        } => {
179            tracing::info!(
180                task_id = %task_id,
181                task_name = task_name,
182                worker_id = worker_id,
183                duration_ms = duration_ms,
184                result_size_bytes = result_size_bytes,
185                attempt = attempt,
186                success = true,
187                event = "execution_completed",
188                "Task execution completed successfully"
189            );
190        }
191        TaskLifecycleEvent::ExecutionFailed {
192            worker_id,
193            duration_ms,
194            error,
195            error_source,
196            attempt,
197        } => {
198            tracing::error!(
199                task_id = %task_id,
200                task_name = task_name,
201                worker_id = worker_id,
202                duration_ms = duration_ms,
203                error = error,
204                error_source = error_source,
205                attempt = attempt,
206                success = false,
207                event = "execution_failed",
208                "Task execution failed"
209            );
210        }
211        TaskLifecycleEvent::Retrying {
212            attempt,
213            max_retries,
214            delay_ms,
215            reason,
216        } => {
217            tracing::warn!(
218                task_id = %task_id,
219                task_name = task_name,
220                attempt = attempt,
221                max_retries = max_retries,
222                delay_ms = delay_ms,
223                reason = reason,
224                event = "retrying",
225                "Task being retried"
226            );
227        }
228        TaskLifecycleEvent::PermanentlyFailed {
229            total_attempts,
230            final_error,
231            total_duration_ms,
232        } => {
233            tracing::error!(
234                task_id = %task_id,
235                task_name = task_name,
236                total_attempts = total_attempts,
237                final_error = final_error,
238                total_duration_ms = total_duration_ms,
239                event = "permanently_failed",
240                "Task permanently failed"
241            );
242        }
243        TaskLifecycleEvent::Scheduled {
244            execute_at,
245            delay_ms,
246            queue,
247        } => {
248            tracing::info!(
249                task_id = %task_id,
250                task_name = task_name,
251                execute_at = %execute_at,
252                delay_ms = delay_ms,
253                queue = queue,
254                event = "scheduled",
255                "Task scheduled for delayed execution"
256            );
257        }
258        TaskLifecycleEvent::MovedToRegularQueue {
259            from_scheduled,
260            delay_from_scheduled_ms,
261            queue,
262        } => {
263            tracing::info!(
264                task_id = %task_id,
265                task_name = task_name,
266                from_scheduled = from_scheduled,
267                delay_from_scheduled_ms = delay_from_scheduled_ms,
268                queue = queue,
269                event = "moved_to_regular_queue",
270                "Scheduled task moved to regular queue"
271            );
272        }
273    }
274}
275
276/// Trace task error with detailed context and error chain
277pub fn trace_task_error(
278    task_id: TaskId,
279    task_name: &str,
280    error: &dyn std::error::Error,
281    context: &str,
282    worker_id: Option<&str>,
283    attempt: Option<u32>,
284) {
285    tracing::error!(
286        task_id = %task_id,
287        task_name = task_name,
288        error = %error,
289        error_source = error.source().map(|e| e.to_string()).as_deref(),
290        context = context,
291        worker_id = worker_id,
292        attempt = attempt,
293        "Task error occurred"
294    );
295
296    // Log error chain for debugging
297    let mut source = error.source();
298    let mut depth = 1;
299    while let Some(err) = source {
300        tracing::debug!(
301            task_id = %task_id,
302            error_depth = depth,
303            error = %err,
304            "Error chain"
305        );
306        source = err.source();
307        depth += 1;
308    }
309}
310
311/// Simple performance tracker for measuring operation durations
312pub struct PerformanceTracker {
313    start_time: Instant,
314    operation_name: String,
315    context: HashMap<String, String>,
316}
317
318impl PerformanceTracker {
319    pub fn new(operation_name: &str) -> Self {
320        Self {
321            start_time: Instant::now(),
322            operation_name: operation_name.to_string(),
323            context: HashMap::new(),
324        }
325    }
326
327    pub fn with_context(mut self, key: &str, value: &str) -> Self {
328        self.context.insert(key.to_string(), value.to_string());
329        self
330    }
331
332    pub fn add_context(&mut self, key: &str, value: &str) {
333        self.context.insert(key.to_string(), value.to_string());
334    }
335
336    pub fn elapsed(&self) -> Duration {
337        self.start_time.elapsed()
338    }
339
340    pub fn trace_completion(self) {
341        let duration = self.elapsed();
342        tracing::debug!(
343            operation = self.operation_name,
344            duration_ms = duration.as_millis(),
345            context = ?self.context,
346            "Operation completed"
347        );
348    }
349
350    pub fn trace_completion_with_result<T, E>(self, result: &Result<T, E>)
351    where
352        E: std::fmt::Display,
353    {
354        let duration = self.elapsed();
355        match result {
356            Ok(_) => {
357                tracing::info!(
358                    operation = self.operation_name,
359                    duration_ms = duration.as_millis(),
360                    success = true,
361                    context = ?self.context,
362                    "Operation completed successfully"
363                );
364            }
365            Err(e) => {
366                tracing::error!(
367                    operation = self.operation_name,
368                    duration_ms = duration.as_millis(),
369                    success = false,
370                    error = %e,
371                    context = ?self.context,
372                    "Operation failed"
373                );
374            }
375        }
376    }
377}
378
379/// Trace queue operation with metrics
380pub fn trace_queue_operation(
381    operation: &str,
382    queue: &str,
383    item_count: Option<usize>,
384    duration: Duration,
385    success: bool,
386    error: Option<&str>,
387) {
388    if success {
389        tracing::info!(
390            operation = operation,
391            queue = queue,
392            item_count = item_count,
393            duration_ms = duration.as_millis(),
394            success = true,
395            "Queue operation completed"
396        );
397    } else {
398        tracing::error!(
399            operation = operation,
400            queue = queue,
401            item_count = item_count,
402            duration_ms = duration.as_millis(),
403            success = false,
404            error = error,
405            "Queue operation failed"
406        );
407    }
408}
409
410/// Trace worker operation with context
411pub fn trace_worker_operation(
412    worker_id: &str,
413    operation: &str,
414    active_tasks: Option<usize>,
415    duration: Option<Duration>,
416    success: bool,
417    error: Option<&str>,
418) {
419    if success {
420        tracing::info!(
421            worker_id = worker_id,
422            operation = operation,
423            active_tasks = active_tasks,
424            duration_ms = duration.map(|d| d.as_millis()),
425            success = true,
426            "Worker operation completed"
427        );
428    } else {
429        tracing::error!(
430            worker_id = worker_id,
431            operation = operation,
432            active_tasks = active_tasks,
433            duration_ms = duration.map(|d| d.as_millis()),
434            success = false,
435            error = error,
436            "Worker operation failed"
437        );
438    }
439}
440
441/// Helper macro for creating traced spans around async operations
442#[macro_export]
443macro_rules! traced_operation {
444    ($span_name:expr, $($field:ident = $value:expr),* $(,)?) => {
445        tracing::info_span!($span_name, $($field = $value),*)
446    };
447}
448
449/// Helper macro for timing operations with automatic tracing
450#[macro_export]
451macro_rules! timed_operation {
452    ($operation_name:expr, $code:expr) => {{
453        let tracker = $crate::tracing_utils::PerformanceTracker::new($operation_name);
454        let result = $code;
455        tracker.trace_completion_with_result(&result);
456        result
457    }};
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463
464    #[test]
465    fn test_performance_tracker() {
466        let tracker =
467            PerformanceTracker::new("test_operation").with_context("test_key", "test_value");
468
469        assert_eq!(tracker.operation_name, "test_operation");
470        assert_eq!(
471            tracker.context.get("test_key"),
472            Some(&"test_value".to_string())
473        );
474        assert!(tracker.elapsed().as_nanos() > 0);
475    }
476
477    #[test]
478    fn test_task_lifecycle_event_serialization() {
479        let event = TaskLifecycleEvent::Enqueued {
480            queue: "test_queue".to_string(),
481            priority: TaskPriority::High,
482            payload_size_bytes: 1024,
483            max_retries: 3,
484            timeout_seconds: 300,
485        };
486
487        let serialized = serde_json::to_string(&event).expect("Failed to serialize");
488        let deserialized: TaskLifecycleEvent =
489            serde_json::from_str(&serialized).expect("Failed to deserialize");
490
491        match deserialized {
492            TaskLifecycleEvent::Enqueued {
493                queue,
494                priority,
495                payload_size_bytes,
496                max_retries,
497                timeout_seconds,
498            } => {
499                assert_eq!(queue, "test_queue");
500                assert_eq!(priority, TaskPriority::High);
501                assert_eq!(payload_size_bytes, 1024);
502                assert_eq!(max_retries, 3);
503                assert_eq!(timeout_seconds, 300);
504            }
505            _ => panic!("Unexpected event type"),
506        }
507    }
508}