celers_broker_sql/broker_batch.rs
1//! Batch operations, worker management, and task groups
2//!
3//! Batch result storage, drain mode, worker heartbeats,
4//! task grouping, connection health, and DLQ replay operations.
5
6use crate::broker_core::MysqlBroker;
7use crate::types::*;
8use celers_core::{CelersError, Result, SerializedTask, TaskId};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use uuid::Uuid;
13
14/// Slow query information from performance_schema
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SlowQueryInfo {
17 pub query_text: String,
18 pub execution_count: i64,
19 pub avg_time_ms: f64,
20 pub max_time_ms: f64,
21 pub total_time_ms: f64,
22}
23
24/// Worker heartbeat information
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct WorkerHeartbeat {
27 pub worker_id: String,
28 pub last_heartbeat: DateTime<Utc>,
29 pub status: WorkerStatus,
30 pub task_count: i64,
31 pub capabilities: Option<serde_json::Value>,
32}
33
34/// Worker status
35#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
36#[serde(rename_all = "lowercase")]
37pub enum WorkerStatus {
38 Active,
39 Idle,
40 Busy,
41 Offline,
42}
43
44impl std::fmt::Display for WorkerStatus {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 WorkerStatus::Active => write!(f, "active"),
48 WorkerStatus::Idle => write!(f, "idle"),
49 WorkerStatus::Busy => write!(f, "busy"),
50 WorkerStatus::Offline => write!(f, "offline"),
51 }
52 }
53}
54
55/// Task group information
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct TaskGroup {
58 pub group_id: String,
59 pub task_ids: Vec<Uuid>,
60 pub created_at: DateTime<Utc>,
61 pub metadata: Option<serde_json::Value>,
62}
63
64/// Task group status
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct TaskGroupStatus {
67 pub group_id: String,
68 pub total_tasks: i64,
69 pub pending_tasks: i64,
70 pub processing_tasks: i64,
71 pub completed_tasks: i64,
72 pub failed_tasks: i64,
73 pub cancelled_tasks: i64,
74}
75
76/// Batch result storage input
77#[derive(Debug, Clone)]
78pub struct BatchResultInput {
79 pub task_id: Uuid,
80 pub task_name: String,
81 pub status: TaskResultStatus,
82 pub result: Option<serde_json::Value>,
83 pub error: Option<String>,
84 pub traceback: Option<String>,
85 pub runtime_ms: Option<i64>,
86}
87
88impl MysqlBroker {
89 /// Store multiple task results in a single transaction for efficiency
90 ///
91 /// # Arguments
92 ///
93 /// * `results` - Vector of result inputs to store
94 ///
95 /// # Returns
96 ///
97 /// Number of results successfully stored
98 ///
99 /// # Examples
100 ///
101 /// ```no_run
102 /// # use celers_broker_sql::{MysqlBroker, BatchResultInput, TaskResultStatus};
103 /// # use uuid::Uuid;
104 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
105 /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
106 /// let results = vec![
107 /// BatchResultInput {
108 /// task_id: Uuid::new_v4(),
109 /// task_name: "task1".to_string(),
110 /// status: TaskResultStatus::Success,
111 /// result: Some(serde_json::json!({"value": 42})),
112 /// error: None,
113 /// traceback: None,
114 /// runtime_ms: Some(100),
115 /// },
116 /// BatchResultInput {
117 /// task_id: Uuid::new_v4(),
118 /// task_name: "task2".to_string(),
119 /// status: TaskResultStatus::Success,
120 /// result: Some(serde_json::json!({"value": 24})),
121 /// error: None,
122 /// traceback: None,
123 /// runtime_ms: Some(150),
124 /// },
125 /// ];
126 ///
127 /// let stored = broker.store_result_batch(&results).await?;
128 /// println!("Stored {} results", stored);
129 /// # Ok(())
130 /// # }
131 /// ```
132 pub async fn store_result_batch(&self, results: &[BatchResultInput]) -> Result<u64> {
133 if results.is_empty() {
134 return Ok(0);
135 }
136
137 let mut tx = self
138 .pool
139 .begin()
140 .await
141 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
142
143 let mut stored = 0u64;
144 for result in results {
145 let result_json = result
146 .result
147 .as_ref()
148 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
149 .unwrap_or_else(|| "null".to_string());
150
151 let rows_affected = sqlx::query(
152 r#"
153 INSERT INTO celers_task_results
154 (task_id, task_name, status, result, error, traceback, runtime_ms, created_at, completed_at)
155 VALUES (?, ?, ?, ?, ?, ?, ?, NOW(), NOW())
156 ON DUPLICATE KEY UPDATE
157 task_name = VALUES(task_name),
158 status = VALUES(status),
159 result = VALUES(result),
160 error = VALUES(error),
161 traceback = VALUES(traceback),
162 runtime_ms = VALUES(runtime_ms),
163 completed_at = NOW()
164 "#,
165 )
166 .bind(result.task_id.to_string())
167 .bind(&result.task_name)
168 .bind(result.status.to_string())
169 .bind(result_json)
170 .bind(&result.error)
171 .bind(&result.traceback)
172 .bind(result.runtime_ms)
173 .execute(&mut *tx)
174 .await
175 .map_err(|e| {
176 CelersError::Other(format!("Failed to store result for task {}: {}", result.task_id, e))
177 })?
178 .rows_affected();
179
180 stored += rows_affected;
181 }
182
183 tx.commit()
184 .await
185 .map_err(|e| CelersError::Other(format!("Failed to commit transaction: {}", e)))?;
186
187 Ok(stored)
188 }
189
190 /// Get multiple task results in a single query
191 ///
192 /// # Arguments
193 ///
194 /// * `task_ids` - Vector of task IDs to retrieve results for
195 ///
196 /// # Returns
197 ///
198 /// Vector of task results found
199 ///
200 /// # Examples
201 ///
202 /// ```no_run
203 /// # use celers_broker_sql::MysqlBroker;
204 /// # use uuid::Uuid;
205 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
206 /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
207 /// let task_ids = vec![Uuid::new_v4(), Uuid::new_v4()];
208 /// let results = broker.get_result_batch(&task_ids).await?;
209 /// println!("Retrieved {} results", results.len());
210 /// # Ok(())
211 /// # }
212 /// ```
213 pub async fn get_result_batch(&self, task_ids: &[Uuid]) -> Result<Vec<TaskResult>> {
214 if task_ids.is_empty() {
215 return Ok(Vec::new());
216 }
217
218 let placeholders = task_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
219 let query_str = format!(
220 r#"
221 SELECT task_id, task_name, status, result, error, traceback, created_at, completed_at, runtime_ms
222 FROM celers_task_results
223 WHERE task_id IN ({})
224 "#,
225 placeholders
226 );
227
228 let mut query = sqlx::query_as::<
229 _,
230 (
231 String,
232 String,
233 String,
234 String,
235 Option<String>,
236 Option<String>,
237 DateTime<Utc>,
238 Option<DateTime<Utc>>,
239 Option<i64>,
240 ),
241 >(&query_str);
242 for task_id in task_ids {
243 query = query.bind(task_id.to_string());
244 }
245
246 let rows = query
247 .fetch_all(&self.pool)
248 .await
249 .map_err(|e| CelersError::Other(format!("Failed to fetch results: {}", e)))?;
250
251 rows.into_iter()
252 .map(
253 |(
254 task_id,
255 task_name,
256 status,
257 result,
258 error,
259 traceback,
260 created_at,
261 completed_at,
262 runtime_ms,
263 )| {
264 Ok(TaskResult {
265 task_id: Uuid::parse_str(&task_id)
266 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
267 task_name,
268 status: status.parse()?,
269 result: serde_json::from_str(&result).ok(),
270 error,
271 traceback,
272 created_at,
273 completed_at,
274 runtime_ms,
275 })
276 },
277 )
278 .collect()
279 }
280
281 /// Enable drain mode - prevents new tasks from being enqueued while allowing processing of existing tasks
282 ///
283 /// This is useful for graceful shutdown scenarios where you want to:
284 /// 1. Stop accepting new work
285 /// 2. Allow workers to finish current tasks
286 /// 3. Drain the queue before shutting down
287 ///
288 /// # Examples
289 ///
290 /// ```no_run
291 /// # use celers_broker_sql::MysqlBroker;
292 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
293 /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
294 /// // Enable drain mode
295 /// broker.enable_drain_mode().await?;
296 ///
297 /// // Check drain status
298 /// let is_draining = broker.is_drain_mode().await?;
299 /// println!("Drain mode: {}", is_draining);
300 ///
301 /// // Disable drain mode when ready
302 /// broker.disable_drain_mode().await?;
303 /// # Ok(())
304 /// # }
305 /// ```
306 pub async fn enable_drain_mode(&self) -> Result<()> {
307 sqlx::query(
308 r#"
309 INSERT INTO celers_queue_config (queue_name, config_key, config_value, updated_at)
310 VALUES (?, 'drain_mode', 'true', NOW())
311 ON DUPLICATE KEY UPDATE config_value = 'true', updated_at = NOW()
312 "#,
313 )
314 .bind(&self.queue_name)
315 .execute(&self.pool)
316 .await
317 .map_err(|e| CelersError::Other(format!("Failed to enable drain mode: {}", e)))?;
318
319 Ok(())
320 }
321
322 /// Disable drain mode - allows new tasks to be enqueued again
323 pub async fn disable_drain_mode(&self) -> Result<()> {
324 sqlx::query(
325 r#"
326 INSERT INTO celers_queue_config (queue_name, config_key, config_value, updated_at)
327 VALUES (?, 'drain_mode', 'false', NOW())
328 ON DUPLICATE KEY UPDATE config_value = 'false', updated_at = NOW()
329 "#,
330 )
331 .bind(&self.queue_name)
332 .execute(&self.pool)
333 .await
334 .map_err(|e| CelersError::Other(format!("Failed to disable drain mode: {}", e)))?;
335
336 Ok(())
337 }
338
339 /// Check if drain mode is enabled
340 pub async fn is_drain_mode(&self) -> Result<bool> {
341 let row: Option<(String,)> = sqlx::query_as(
342 r#"
343 SELECT config_value
344 FROM celers_queue_config
345 WHERE queue_name = ? AND config_key = 'drain_mode'
346 "#,
347 )
348 .bind(&self.queue_name)
349 .fetch_optional(&self.pool)
350 .await
351 .map_err(|e| CelersError::Other(format!("Failed to check drain mode: {}", e)))?;
352
353 Ok(row.map(|(val,)| val == "true").unwrap_or(false))
354 }
355
356 /// Register a worker and update its heartbeat
357 ///
358 /// # Arguments
359 ///
360 /// * `worker_id` - Unique identifier for the worker
361 /// * `status` - Current worker status
362 /// * `capabilities` - Optional JSON object describing worker capabilities
363 ///
364 /// # Examples
365 ///
366 /// ```no_run
367 /// # use celers_broker_sql::{MysqlBroker, WorkerStatus};
368 /// # use serde_json::json;
369 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
370 /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
371 /// broker.register_worker(
372 /// "worker-1",
373 /// WorkerStatus::Active,
374 /// Some(json!({"cpu_cores": 4, "memory_gb": 8}))
375 /// ).await?;
376 /// # Ok(())
377 /// # }
378 /// ```
379 pub async fn register_worker(
380 &self,
381 worker_id: &str,
382 status: WorkerStatus,
383 capabilities: Option<serde_json::Value>,
384 ) -> Result<()> {
385 let capabilities_json = capabilities
386 .as_ref()
387 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
388 .unwrap_or_else(|| "null".to_string());
389
390 sqlx::query(
391 r#"
392 INSERT INTO celers_worker_heartbeat
393 (worker_id, queue_name, last_heartbeat, status, capabilities, task_count, updated_at)
394 VALUES (?, ?, NOW(), ?, ?, 0, NOW())
395 ON DUPLICATE KEY UPDATE
396 last_heartbeat = NOW(),
397 status = VALUES(status),
398 capabilities = VALUES(capabilities),
399 updated_at = NOW()
400 "#,
401 )
402 .bind(worker_id)
403 .bind(&self.queue_name)
404 .bind(status.to_string())
405 .bind(capabilities_json)
406 .execute(&self.pool)
407 .await
408 .map_err(|e| {
409 CelersError::Other(format!("Failed to register worker: {}", e))
410 })?;
411
412 Ok(())
413 }
414
415 /// Update worker heartbeat to indicate it's still alive
416 pub async fn update_worker_heartbeat(
417 &self,
418 worker_id: &str,
419 status: WorkerStatus,
420 ) -> Result<()> {
421 let rows_affected = sqlx::query(
422 r#"
423 UPDATE celers_worker_heartbeat
424 SET last_heartbeat = NOW(), status = ?, updated_at = NOW()
425 WHERE worker_id = ? AND queue_name = ?
426 "#,
427 )
428 .bind(status.to_string())
429 .bind(worker_id)
430 .bind(&self.queue_name)
431 .execute(&self.pool)
432 .await
433 .map_err(|e| CelersError::Other(format!("Failed to update worker heartbeat: {}", e)))?
434 .rows_affected();
435
436 if rows_affected == 0 {
437 return Err(CelersError::Other(format!(
438 "Worker {} not found",
439 worker_id
440 )));
441 }
442
443 Ok(())
444 }
445
446 /// Get heartbeat information for all workers
447 ///
448 /// # Arguments
449 ///
450 /// * `stale_threshold_secs` - Seconds after which a worker is considered stale/offline
451 ///
452 /// # Returns
453 ///
454 /// Vector of worker heartbeat information
455 ///
456 /// # Examples
457 ///
458 /// ```no_run
459 /// # use celers_broker_sql::MysqlBroker;
460 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
461 /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
462 /// let workers = broker.get_all_worker_heartbeats(60).await?;
463 /// for worker in workers {
464 /// println!("Worker {} status: {}", worker.worker_id, worker.status);
465 /// }
466 /// # Ok(())
467 /// # }
468 /// ```
469 pub async fn get_all_worker_heartbeats(
470 &self,
471 stale_threshold_secs: i64,
472 ) -> Result<Vec<WorkerHeartbeat>> {
473 let rows: Vec<(String, DateTime<Utc>, String, i64, String)> = sqlx::query_as(
474 r#"
475 SELECT
476 worker_id,
477 last_heartbeat,
478 CASE
479 WHEN TIMESTAMPDIFF(SECOND, last_heartbeat, NOW()) > ? THEN 'offline'
480 ELSE status
481 END as status,
482 task_count,
483 COALESCE(capabilities, 'null') as capabilities
484 FROM celers_worker_heartbeat
485 WHERE queue_name = ?
486 ORDER BY last_heartbeat DESC
487 "#,
488 )
489 .bind(stale_threshold_secs)
490 .bind(&self.queue_name)
491 .fetch_all(&self.pool)
492 .await
493 .map_err(|e| CelersError::Other(format!("Failed to fetch worker heartbeats: {}", e)))?;
494
495 rows.into_iter()
496 .map(
497 |(worker_id, last_heartbeat, status, task_count, capabilities)| {
498 let status = match status.as_str() {
499 "active" => WorkerStatus::Active,
500 "idle" => WorkerStatus::Idle,
501 "busy" => WorkerStatus::Busy,
502 _ => WorkerStatus::Offline,
503 };
504 let capabilities = serde_json::from_str(&capabilities).ok();
505 Ok(WorkerHeartbeat {
506 worker_id,
507 last_heartbeat,
508 status,
509 task_count,
510 capabilities,
511 })
512 },
513 )
514 .collect()
515 }
516
517 /// Enqueue a group of related tasks
518 ///
519 /// # Arguments
520 ///
521 /// * `group_id` - Unique identifier for the task group
522 /// * `tasks` - Vector of tasks to enqueue
523 /// * `metadata` - Optional metadata for the group
524 ///
525 /// # Returns
526 ///
527 /// Vector of task IDs that were enqueued
528 ///
529 /// # Examples
530 ///
531 /// ```no_run
532 /// # use celers_broker_sql::MysqlBroker;
533 /// # use celers_core::SerializedTask;
534 /// # use serde_json::json;
535 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
536 /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
537 /// # let task1 = SerializedTask::new("test".to_string(), vec![])
538 /// # .with_priority(0)
539 /// # .with_max_retries(3);
540 /// # let task2 = SerializedTask::new("test".to_string(), vec![])
541 /// # .with_priority(0)
542 /// # .with_max_retries(3);
543 /// let task_ids = broker.enqueue_group(
544 /// "batch-123",
545 /// vec![task1, task2],
546 /// Some(json!({"batch_type": "data_processing"}))
547 /// ).await?;
548 /// println!("Enqueued group with {} tasks", task_ids.len());
549 /// # Ok(())
550 /// # }
551 /// ```
552 pub async fn enqueue_group(
553 &self,
554 group_id: &str,
555 tasks: Vec<SerializedTask>,
556 metadata: Option<serde_json::Value>,
557 ) -> Result<Vec<TaskId>> {
558 if tasks.is_empty() {
559 return Ok(Vec::new());
560 }
561
562 let mut tx = self
563 .pool
564 .begin()
565 .await
566 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
567
568 let mut task_ids = Vec::new();
569
570 for task in tasks {
571 let task_id = Uuid::new_v4();
572
573 sqlx::query(
574 r#"
575 INSERT INTO celers_tasks
576 (id, task_name, payload, state, priority, retry_count, max_retries, created_at, scheduled_at, metadata)
577 VALUES (?, ?, ?, 'pending', ?, 0, ?, NOW(), NOW(), ?)
578 "#,
579 )
580 .bind(task_id.to_string())
581 .bind(&task.metadata.name)
582 .bind(&task.payload)
583 .bind(task.metadata.priority)
584 .bind(task.metadata.max_retries as i32)
585 .bind(serde_json::to_string(&json!({"group_id": group_id})).unwrap_or_else(|_| "{}".to_string()))
586 .execute(&mut *tx)
587 .await
588 .map_err(|e| {
589 CelersError::Other(format!("Failed to insert task: {}", e))
590 })?;
591
592 task_ids.push(task_id);
593 }
594
595 // Store group metadata
596 let metadata_json = metadata
597 .as_ref()
598 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
599 .unwrap_or_else(|| "null".to_string());
600
601 sqlx::query(
602 r#"
603 INSERT INTO celers_task_groups
604 (group_id, queue_name, task_count, created_at, metadata)
605 VALUES (?, ?, ?, NOW(), ?)
606 "#,
607 )
608 .bind(group_id)
609 .bind(&self.queue_name)
610 .bind(task_ids.len() as i64)
611 .bind(metadata_json)
612 .execute(&mut *tx)
613 .await
614 .map_err(|e| CelersError::Other(format!("Failed to insert task group: {}", e)))?;
615
616 tx.commit()
617 .await
618 .map_err(|e| CelersError::Other(format!("Failed to commit transaction: {}", e)))?;
619
620 Ok(task_ids)
621 }
622
623 /// Get status of a task group
624 ///
625 /// # Arguments
626 ///
627 /// * `group_id` - The task group identifier
628 ///
629 /// # Returns
630 ///
631 /// Task group status with counts by state
632 ///
633 /// # Examples
634 ///
635 /// ```no_run
636 /// # use celers_broker_sql::MysqlBroker;
637 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
638 /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
639 /// let status = broker.get_group_status("batch-123").await?;
640 /// println!("Group has {} completed tasks out of {} total",
641 /// status.completed_tasks, status.total_tasks);
642 /// # Ok(())
643 /// # }
644 /// ```
645 pub async fn get_group_status(&self, group_id: &str) -> Result<TaskGroupStatus> {
646 let row: (i64, i64, i64, i64, i64, i64) = sqlx::query_as(
647 r#"
648 SELECT
649 COUNT(*) as total_tasks,
650 SUM(CASE WHEN state = 'pending' THEN 1 ELSE 0 END) as pending_tasks,
651 SUM(CASE WHEN state = 'processing' THEN 1 ELSE 0 END) as processing_tasks,
652 SUM(CASE WHEN state = 'completed' THEN 1 ELSE 0 END) as completed_tasks,
653 SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
654 SUM(CASE WHEN state = 'cancelled' THEN 1 ELSE 0 END) as cancelled_tasks
655 FROM celers_tasks
656 WHERE JSON_UNQUOTE(JSON_EXTRACT(metadata, '$.group_id')) = ?
657 "#,
658 )
659 .bind(group_id)
660 .fetch_one(&self.pool)
661 .await
662 .map_err(|e| CelersError::Other(format!("Failed to fetch group status: {}", e)))?;
663
664 Ok(TaskGroupStatus {
665 group_id: group_id.to_string(),
666 total_tasks: row.0,
667 pending_tasks: row.1,
668 processing_tasks: row.2,
669 completed_tasks: row.3,
670 failed_tasks: row.4,
671 cancelled_tasks: row.5,
672 })
673 }
674
675 /// Check if connection pool is healthy and can handle current load
676 ///
677 /// Performs a comprehensive health check of the connection pool including:
678 /// - Connection availability (can acquire a connection)
679 /// - Pool utilization (not over-utilized)
680 /// - Database responsiveness (simple query performance)
681 ///
682 /// # Returns
683 ///
684 /// `Ok(true)` if healthy, `Ok(false)` if degraded, `Err` if critical failure
685 ///
686 /// # Examples
687 ///
688 /// ```no_run
689 /// # use celers_broker_sql::MysqlBroker;
690 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
691 /// let broker = MysqlBroker::new("mysql://localhost/db").await?;
692 ///
693 /// match broker.check_connection_health().await {
694 /// Ok(true) => println!("Connection pool is healthy"),
695 /// Ok(false) => println!("Connection pool is degraded"),
696 /// Err(e) => println!("Connection pool has critical issues: {}", e),
697 /// }
698 /// # Ok(())
699 /// # }
700 /// ```
701 pub async fn check_connection_health(&self) -> Result<bool> {
702 // Try to acquire a connection with timeout
703 let start = std::time::Instant::now();
704 let conn_result =
705 tokio::time::timeout(std::time::Duration::from_secs(5), self.pool.acquire()).await;
706
707 match conn_result {
708 Err(_) => {
709 tracing::error!("Connection pool timeout: failed to acquire connection within 5s");
710 Err(CelersError::Other(
711 "Connection pool exhausted: timeout acquiring connection".to_string(),
712 ))
713 }
714 Ok(Err(e)) => {
715 tracing::error!(error = %e, "Connection pool error");
716 Err(CelersError::Other(format!("Connection pool error: {}", e)))
717 }
718 Ok(Ok(mut conn)) => {
719 let acquire_time = start.elapsed();
720
721 // Warn if connection acquisition took too long
722 if acquire_time > std::time::Duration::from_secs(1) {
723 tracing::warn!(
724 acquire_time_ms = acquire_time.as_millis(),
725 "Slow connection acquisition indicates pool pressure"
726 );
727 }
728
729 // Test database responsiveness with simple query
730 let query_start = std::time::Instant::now();
731 let result = sqlx::query_scalar::<_, i64>("SELECT 1")
732 .fetch_one(&mut *conn)
733 .await;
734
735 match result {
736 Err(e) => {
737 tracing::error!(error = %e, "Database health check query failed");
738 Err(CelersError::Other(format!("Database unresponsive: {}", e)))
739 }
740 Ok(_) => {
741 let query_time = query_start.elapsed();
742
743 // Warn if database is responding slowly
744 if query_time > std::time::Duration::from_millis(100) {
745 tracing::warn!(
746 query_time_ms = query_time.as_millis(),
747 "Slow database response indicates potential issues"
748 );
749 return Ok(false); // Degraded
750 }
751
752 // Check pool utilization
753 let pool_size = self.pool.size();
754 let idle_conns = self.pool.num_idle() as u32;
755 let active_conns = pool_size.saturating_sub(idle_conns);
756 let utilization = (active_conns as f64 / pool_size as f64) * 100.0;
757
758 if utilization > 90.0 {
759 tracing::warn!(
760 utilization_percent = utilization,
761 pool_size = pool_size,
762 active = active_conns,
763 idle = idle_conns,
764 "High connection pool utilization"
765 );
766 return Ok(false); // Degraded
767 }
768
769 tracing::debug!(
770 acquire_time_ms = acquire_time.as_millis(),
771 query_time_ms = query_time.as_millis(),
772 utilization_percent = utilization,
773 "Connection pool health check passed"
774 );
775
776 Ok(true) // Healthy
777 }
778 }
779 }
780 }
781 }
782
783 /// Batch replay tasks from DLQ with filtering
784 ///
785 /// Requeues multiple tasks from the dead letter queue based on filter criteria.
786 /// This is useful for recovering from systematic failures or replaying tasks
787 /// after fixing bugs.
788 ///
789 /// # Arguments
790 ///
791 /// * `task_name_filter` - Optional task name pattern to match (None = all tasks)
792 /// * `min_retry_count` - Minimum retry count to include (for filtering partial failures)
793 /// * `limit` - Maximum number of tasks to replay
794 ///
795 /// # Returns
796 ///
797 /// Number of tasks successfully requeued from DLQ
798 ///
799 /// # Examples
800 ///
801 /// ```
802 /// # use celers_broker_sql::MysqlBroker;
803 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
804 /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
805 /// // Replay all tasks with "payment" in the name that failed with 3+ retries
806 /// let count = broker.replay_dlq_batch(Some("payment"), Some(3), 100).await?;
807 /// println!("Replayed {} payment tasks from DLQ", count);
808 ///
809 /// // Replay all failed tasks (no filter)
810 /// let count = broker.replay_dlq_batch(None, None, 1000).await?;
811 /// println!("Replayed {} tasks from DLQ", count);
812 /// # Ok(())
813 /// # }
814 /// ```
815 pub async fn replay_dlq_batch(
816 &self,
817 task_name_filter: Option<&str>,
818 min_retry_count: Option<i32>,
819 limit: i64,
820 ) -> Result<u64> {
821 let mut query = String::from("SELECT id FROM celers_dead_letter_queue WHERE 1=1");
822
823 if task_name_filter.is_some() {
824 query.push_str(" AND task_name LIKE ?");
825 }
826
827 if min_retry_count.is_some() {
828 query.push_str(" AND retry_count >= ?");
829 }
830
831 query.push_str(" ORDER BY failed_at ASC LIMIT ?");
832
833 let mut q = sqlx::query_scalar::<_, String>(&query);
834
835 if let Some(filter) = task_name_filter {
836 q = q.bind(format!("%{}%", filter));
837 }
838
839 if let Some(min_retries) = min_retry_count {
840 q = q.bind(min_retries);
841 }
842
843 q = q.bind(limit);
844
845 let dlq_ids = q
846 .fetch_all(&self.pool)
847 .await
848 .map_err(|e| CelersError::Other(format!("Failed to fetch DLQ IDs: {}", e)))?;
849
850 let mut replayed = 0u64;
851 for dlq_id in dlq_ids {
852 match Uuid::parse_str(&dlq_id) {
853 Ok(id) => {
854 if self.requeue_from_dlq(&id).await.is_ok() {
855 replayed += 1;
856 }
857 }
858 Err(e) => {
859 tracing::warn!(dlq_id = %dlq_id, error = %e, "Failed to parse DLQ ID");
860 }
861 }
862 }
863
864 tracing::info!(
865 replayed = replayed,
866 task_filter = ?task_name_filter,
867 min_retries = ?min_retry_count,
868 "Batch replay from DLQ completed"
869 );
870
871 Ok(replayed)
872 }
873}