1use crate::{TaskId, TaskPriority};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::time::{Duration, Instant};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub enum TaskLifecycleEvent {
14 Enqueued {
16 queue: String,
17 priority: TaskPriority,
18 payload_size_bytes: usize,
19 max_retries: u32,
20 timeout_seconds: u64,
21 },
22 Dequeued {
24 worker_id: String,
25 queue: String,
26 wait_time_ms: u64,
27 payload_size_bytes: usize,
28 },
29 ExecutionStarted {
31 worker_id: String,
32 attempt: u32,
33 max_retries: u32,
34 },
35 ExecutionCompleted {
37 worker_id: String,
38 duration_ms: u64,
39 result_size_bytes: usize,
40 attempt: u32,
41 },
42 ExecutionFailed {
44 worker_id: String,
45 duration_ms: u64,
46 error: String,
47 error_source: Option<String>,
48 attempt: u32,
49 },
50 Retrying {
52 attempt: u32,
53 max_retries: u32,
54 delay_ms: u64,
55 reason: String,
56 },
57 PermanentlyFailed {
59 total_attempts: u32,
60 final_error: String,
61 total_duration_ms: u64,
62 },
63 Scheduled {
65 execute_at: chrono::DateTime<chrono::Utc>,
66 delay_ms: i64,
67 queue: String,
68 },
69 MovedToRegularQueue {
71 from_scheduled: bool,
72 delay_from_scheduled_ms: i64,
73 queue: String,
74 },
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct TaskPerformanceMetrics {
80 pub task_name: String,
81 pub execution_count: u64,
82 pub total_duration_ms: u64,
83 pub average_duration_ms: f64,
84 pub min_duration_ms: u64,
85 pub max_duration_ms: u64,
86 pub success_count: u64,
87 pub failure_count: u64,
88 pub success_rate: f64,
89 pub last_execution: chrono::DateTime<chrono::Utc>,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct QueuePerformanceMetrics {
95 pub queue_name: String,
96 pub current_depth: i64,
97 pub peak_depth: i64,
98 pub total_processed: u64,
99 pub total_failed: u64,
100 pub average_processing_time_ms: f64,
101 pub throughput_per_minute: f64,
102 pub last_activity: chrono::DateTime<chrono::Utc>,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct WorkerPerformanceMetrics {
108 pub worker_id: String,
109 pub tasks_processed: u64,
110 pub tasks_failed: u64,
111 pub total_processing_time_ms: u64,
112 pub average_processing_time_ms: f64,
113 pub current_active_tasks: usize,
114 pub max_concurrent_tasks: usize,
115 pub uptime_seconds: u64,
116 pub last_heartbeat: chrono::DateTime<chrono::Utc>,
117}
118
119pub fn trace_task_lifecycle_event(task_id: TaskId, task_name: &str, event: TaskLifecycleEvent) {
121 match event {
122 TaskLifecycleEvent::Enqueued {
123 queue,
124 priority,
125 payload_size_bytes,
126 max_retries,
127 timeout_seconds,
128 } => {
129 tracing::info!(
130 task_id = %task_id,
131 task_name = task_name,
132 queue = queue,
133 priority = ?priority,
134 payload_size_bytes = payload_size_bytes,
135 max_retries = max_retries,
136 timeout_seconds = timeout_seconds,
137 event = "enqueued",
138 "Task enqueued"
139 );
140 }
141 TaskLifecycleEvent::Dequeued {
142 worker_id,
143 queue,
144 wait_time_ms,
145 payload_size_bytes,
146 } => {
147 tracing::info!(
148 task_id = %task_id,
149 task_name = task_name,
150 worker_id = worker_id,
151 queue = queue,
152 wait_time_ms = wait_time_ms,
153 payload_size_bytes = payload_size_bytes,
154 event = "dequeued",
155 "Task dequeued"
156 );
157 }
158 TaskLifecycleEvent::ExecutionStarted {
159 worker_id,
160 attempt,
161 max_retries,
162 } => {
163 tracing::info!(
164 task_id = %task_id,
165 task_name = task_name,
166 worker_id = worker_id,
167 attempt = attempt,
168 max_retries = max_retries,
169 event = "execution_started",
170 "Task execution started"
171 );
172 }
173 TaskLifecycleEvent::ExecutionCompleted {
174 worker_id,
175 duration_ms,
176 result_size_bytes,
177 attempt,
178 } => {
179 tracing::info!(
180 task_id = %task_id,
181 task_name = task_name,
182 worker_id = worker_id,
183 duration_ms = duration_ms,
184 result_size_bytes = result_size_bytes,
185 attempt = attempt,
186 success = true,
187 event = "execution_completed",
188 "Task execution completed successfully"
189 );
190 }
191 TaskLifecycleEvent::ExecutionFailed {
192 worker_id,
193 duration_ms,
194 error,
195 error_source,
196 attempt,
197 } => {
198 tracing::error!(
199 task_id = %task_id,
200 task_name = task_name,
201 worker_id = worker_id,
202 duration_ms = duration_ms,
203 error = error,
204 error_source = error_source,
205 attempt = attempt,
206 success = false,
207 event = "execution_failed",
208 "Task execution failed"
209 );
210 }
211 TaskLifecycleEvent::Retrying {
212 attempt,
213 max_retries,
214 delay_ms,
215 reason,
216 } => {
217 tracing::warn!(
218 task_id = %task_id,
219 task_name = task_name,
220 attempt = attempt,
221 max_retries = max_retries,
222 delay_ms = delay_ms,
223 reason = reason,
224 event = "retrying",
225 "Task being retried"
226 );
227 }
228 TaskLifecycleEvent::PermanentlyFailed {
229 total_attempts,
230 final_error,
231 total_duration_ms,
232 } => {
233 tracing::error!(
234 task_id = %task_id,
235 task_name = task_name,
236 total_attempts = total_attempts,
237 final_error = final_error,
238 total_duration_ms = total_duration_ms,
239 event = "permanently_failed",
240 "Task permanently failed"
241 );
242 }
243 TaskLifecycleEvent::Scheduled {
244 execute_at,
245 delay_ms,
246 queue,
247 } => {
248 tracing::info!(
249 task_id = %task_id,
250 task_name = task_name,
251 execute_at = %execute_at,
252 delay_ms = delay_ms,
253 queue = queue,
254 event = "scheduled",
255 "Task scheduled for delayed execution"
256 );
257 }
258 TaskLifecycleEvent::MovedToRegularQueue {
259 from_scheduled,
260 delay_from_scheduled_ms,
261 queue,
262 } => {
263 tracing::info!(
264 task_id = %task_id,
265 task_name = task_name,
266 from_scheduled = from_scheduled,
267 delay_from_scheduled_ms = delay_from_scheduled_ms,
268 queue = queue,
269 event = "moved_to_regular_queue",
270 "Scheduled task moved to regular queue"
271 );
272 }
273 }
274}
275
276pub fn trace_task_error(
278 task_id: TaskId,
279 task_name: &str,
280 error: &dyn std::error::Error,
281 context: &str,
282 worker_id: Option<&str>,
283 attempt: Option<u32>,
284) {
285 tracing::error!(
286 task_id = %task_id,
287 task_name = task_name,
288 error = %error,
289 error_source = error.source().map(|e| e.to_string()).as_deref(),
290 context = context,
291 worker_id = worker_id,
292 attempt = attempt,
293 "Task error occurred"
294 );
295
296 let mut source = error.source();
298 let mut depth = 1;
299 while let Some(err) = source {
300 tracing::debug!(
301 task_id = %task_id,
302 error_depth = depth,
303 error = %err,
304 "Error chain"
305 );
306 source = err.source();
307 depth += 1;
308 }
309}
310
311pub struct PerformanceTracker {
313 start_time: Instant,
314 operation_name: String,
315 context: HashMap<String, String>,
316}
317
318impl PerformanceTracker {
319 pub fn new(operation_name: &str) -> Self {
320 Self {
321 start_time: Instant::now(),
322 operation_name: operation_name.to_string(),
323 context: HashMap::new(),
324 }
325 }
326
327 pub fn with_context(mut self, key: &str, value: &str) -> Self {
328 self.context.insert(key.to_string(), value.to_string());
329 self
330 }
331
332 pub fn add_context(&mut self, key: &str, value: &str) {
333 self.context.insert(key.to_string(), value.to_string());
334 }
335
336 pub fn elapsed(&self) -> Duration {
337 self.start_time.elapsed()
338 }
339
340 pub fn trace_completion(self) {
341 let duration = self.elapsed();
342 tracing::debug!(
343 operation = self.operation_name,
344 duration_ms = duration.as_millis(),
345 context = ?self.context,
346 "Operation completed"
347 );
348 }
349
350 pub fn trace_completion_with_result<T, E>(self, result: &Result<T, E>)
351 where
352 E: std::fmt::Display,
353 {
354 let duration = self.elapsed();
355 match result {
356 Ok(_) => {
357 tracing::info!(
358 operation = self.operation_name,
359 duration_ms = duration.as_millis(),
360 success = true,
361 context = ?self.context,
362 "Operation completed successfully"
363 );
364 }
365 Err(e) => {
366 tracing::error!(
367 operation = self.operation_name,
368 duration_ms = duration.as_millis(),
369 success = false,
370 error = %e,
371 context = ?self.context,
372 "Operation failed"
373 );
374 }
375 }
376 }
377}
378
379pub fn trace_queue_operation(
381 operation: &str,
382 queue: &str,
383 item_count: Option<usize>,
384 duration: Duration,
385 success: bool,
386 error: Option<&str>,
387) {
388 if success {
389 tracing::info!(
390 operation = operation,
391 queue = queue,
392 item_count = item_count,
393 duration_ms = duration.as_millis(),
394 success = true,
395 "Queue operation completed"
396 );
397 } else {
398 tracing::error!(
399 operation = operation,
400 queue = queue,
401 item_count = item_count,
402 duration_ms = duration.as_millis(),
403 success = false,
404 error = error,
405 "Queue operation failed"
406 );
407 }
408}
409
410pub fn trace_worker_operation(
412 worker_id: &str,
413 operation: &str,
414 active_tasks: Option<usize>,
415 duration: Option<Duration>,
416 success: bool,
417 error: Option<&str>,
418) {
419 if success {
420 tracing::info!(
421 worker_id = worker_id,
422 operation = operation,
423 active_tasks = active_tasks,
424 duration_ms = duration.map(|d| d.as_millis()),
425 success = true,
426 "Worker operation completed"
427 );
428 } else {
429 tracing::error!(
430 worker_id = worker_id,
431 operation = operation,
432 active_tasks = active_tasks,
433 duration_ms = duration.map(|d| d.as_millis()),
434 success = false,
435 error = error,
436 "Worker operation failed"
437 );
438 }
439}
440
441#[macro_export]
443macro_rules! traced_operation {
444 ($span_name:expr, $($field:ident = $value:expr),* $(,)?) => {
445 tracing::info_span!($span_name, $($field = $value),*)
446 };
447}
448
449#[macro_export]
451macro_rules! timed_operation {
452 ($operation_name:expr, $code:expr) => {{
453 let tracker = $crate::tracing_utils::PerformanceTracker::new($operation_name);
454 let result = $code;
455 tracker.trace_completion_with_result(&result);
456 result
457 }};
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463
464 #[test]
465 fn test_performance_tracker() {
466 let tracker =
467 PerformanceTracker::new("test_operation").with_context("test_key", "test_value");
468
469 assert_eq!(tracker.operation_name, "test_operation");
470 assert_eq!(
471 tracker.context.get("test_key"),
472 Some(&"test_value".to_string())
473 );
474 assert!(tracker.elapsed().as_nanos() > 0);
475 }
476
477 #[test]
478 fn test_task_lifecycle_event_serialization() {
479 let event = TaskLifecycleEvent::Enqueued {
480 queue: "test_queue".to_string(),
481 priority: TaskPriority::High,
482 payload_size_bytes: 1024,
483 max_retries: 3,
484 timeout_seconds: 300,
485 };
486
487 let serialized = serde_json::to_string(&event).expect("Failed to serialize");
488 let deserialized: TaskLifecycleEvent =
489 serde_json::from_str(&serialized).expect("Failed to deserialize");
490
491 match deserialized {
492 TaskLifecycleEvent::Enqueued {
493 queue,
494 priority,
495 payload_size_bytes,
496 max_retries,
497 timeout_seconds,
498 } => {
499 assert_eq!(queue, "test_queue");
500 assert_eq!(priority, TaskPriority::High);
501 assert_eq!(payload_size_bytes, 1024);
502 assert_eq!(max_retries, 3);
503 assert_eq!(timeout_seconds, 300);
504 }
505 _ => panic!("Unexpected event type"),
506 }
507 }
508}