Skip to main content

celers_broker_sql/
broker_batch.rs

1//! Batch operations, worker management, and task groups
2//!
3//! Batch result storage, drain mode, worker heartbeats,
4//! task grouping, connection health, and DLQ replay operations.
5
6use crate::broker_core::MysqlBroker;
7use crate::types::*;
8use celers_core::{CelersError, Result, SerializedTask, TaskId};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use uuid::Uuid;
13
14/// Slow query information from performance_schema
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SlowQueryInfo {
17    pub query_text: String,
18    pub execution_count: i64,
19    pub avg_time_ms: f64,
20    pub max_time_ms: f64,
21    pub total_time_ms: f64,
22}
23
24/// Worker heartbeat information
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct WorkerHeartbeat {
27    pub worker_id: String,
28    pub last_heartbeat: DateTime<Utc>,
29    pub status: WorkerStatus,
30    pub task_count: i64,
31    pub capabilities: Option<serde_json::Value>,
32}
33
34/// Worker status
35#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
36#[serde(rename_all = "lowercase")]
37pub enum WorkerStatus {
38    Active,
39    Idle,
40    Busy,
41    Offline,
42}
43
44impl std::fmt::Display for WorkerStatus {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            WorkerStatus::Active => write!(f, "active"),
48            WorkerStatus::Idle => write!(f, "idle"),
49            WorkerStatus::Busy => write!(f, "busy"),
50            WorkerStatus::Offline => write!(f, "offline"),
51        }
52    }
53}
54
55/// Task group information
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct TaskGroup {
58    pub group_id: String,
59    pub task_ids: Vec<Uuid>,
60    pub created_at: DateTime<Utc>,
61    pub metadata: Option<serde_json::Value>,
62}
63
64/// Task group status
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct TaskGroupStatus {
67    pub group_id: String,
68    pub total_tasks: i64,
69    pub pending_tasks: i64,
70    pub processing_tasks: i64,
71    pub completed_tasks: i64,
72    pub failed_tasks: i64,
73    pub cancelled_tasks: i64,
74}
75
76/// Batch result storage input
77#[derive(Debug, Clone)]
78pub struct BatchResultInput {
79    pub task_id: Uuid,
80    pub task_name: String,
81    pub status: TaskResultStatus,
82    pub result: Option<serde_json::Value>,
83    pub error: Option<String>,
84    pub traceback: Option<String>,
85    pub runtime_ms: Option<i64>,
86}
87
88impl MysqlBroker {
89    /// Store multiple task results in a single transaction for efficiency
90    ///
91    /// # Arguments
92    ///
93    /// * `results` - Vector of result inputs to store
94    ///
95    /// # Returns
96    ///
97    /// Number of results successfully stored
98    ///
99    /// # Examples
100    ///
101    /// ```no_run
102    /// # use celers_broker_sql::{MysqlBroker, BatchResultInput, TaskResultStatus};
103    /// # use uuid::Uuid;
104    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
105    /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
106    /// let results = vec![
107    ///     BatchResultInput {
108    ///         task_id: Uuid::new_v4(),
109    ///         task_name: "task1".to_string(),
110    ///         status: TaskResultStatus::Success,
111    ///         result: Some(serde_json::json!({"value": 42})),
112    ///         error: None,
113    ///         traceback: None,
114    ///         runtime_ms: Some(100),
115    ///     },
116    ///     BatchResultInput {
117    ///         task_id: Uuid::new_v4(),
118    ///         task_name: "task2".to_string(),
119    ///         status: TaskResultStatus::Success,
120    ///         result: Some(serde_json::json!({"value": 24})),
121    ///         error: None,
122    ///         traceback: None,
123    ///         runtime_ms: Some(150),
124    ///     },
125    /// ];
126    ///
127    /// let stored = broker.store_result_batch(&results).await?;
128    /// println!("Stored {} results", stored);
129    /// # Ok(())
130    /// # }
131    /// ```
132    pub async fn store_result_batch(&self, results: &[BatchResultInput]) -> Result<u64> {
133        if results.is_empty() {
134            return Ok(0);
135        }
136
137        let mut tx = self
138            .pool
139            .begin()
140            .await
141            .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
142
143        let mut stored = 0u64;
144        for result in results {
145            let result_json = result
146                .result
147                .as_ref()
148                .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
149                .unwrap_or_else(|| "null".to_string());
150
151            let rows_affected = sqlx::query(
152                r#"
153                INSERT INTO celers_task_results
154                    (task_id, task_name, status, result, error, traceback, runtime_ms, created_at, completed_at)
155                VALUES (?, ?, ?, ?, ?, ?, ?, NOW(), NOW())
156                ON DUPLICATE KEY UPDATE
157                    task_name = VALUES(task_name),
158                    status = VALUES(status),
159                    result = VALUES(result),
160                    error = VALUES(error),
161                    traceback = VALUES(traceback),
162                    runtime_ms = VALUES(runtime_ms),
163                    completed_at = NOW()
164                "#,
165            )
166            .bind(result.task_id.to_string())
167            .bind(&result.task_name)
168            .bind(result.status.to_string())
169            .bind(result_json)
170            .bind(&result.error)
171            .bind(&result.traceback)
172            .bind(result.runtime_ms)
173            .execute(&mut *tx)
174            .await
175            .map_err(|e| {
176                CelersError::Other(format!("Failed to store result for task {}: {}", result.task_id, e))
177            })?
178            .rows_affected();
179
180            stored += rows_affected;
181        }
182
183        tx.commit()
184            .await
185            .map_err(|e| CelersError::Other(format!("Failed to commit transaction: {}", e)))?;
186
187        Ok(stored)
188    }
189
190    /// Get multiple task results in a single query
191    ///
192    /// # Arguments
193    ///
194    /// * `task_ids` - Vector of task IDs to retrieve results for
195    ///
196    /// # Returns
197    ///
198    /// Vector of task results found
199    ///
200    /// # Examples
201    ///
202    /// ```no_run
203    /// # use celers_broker_sql::MysqlBroker;
204    /// # use uuid::Uuid;
205    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
206    /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
207    /// let task_ids = vec![Uuid::new_v4(), Uuid::new_v4()];
208    /// let results = broker.get_result_batch(&task_ids).await?;
209    /// println!("Retrieved {} results", results.len());
210    /// # Ok(())
211    /// # }
212    /// ```
213    pub async fn get_result_batch(&self, task_ids: &[Uuid]) -> Result<Vec<TaskResult>> {
214        if task_ids.is_empty() {
215            return Ok(Vec::new());
216        }
217
218        let placeholders = task_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
219        let query_str = format!(
220            r#"
221            SELECT task_id, task_name, status, result, error, traceback, created_at, completed_at, runtime_ms
222            FROM celers_task_results
223            WHERE task_id IN ({})
224            "#,
225            placeholders
226        );
227
228        let mut query = sqlx::query_as::<
229            _,
230            (
231                String,
232                String,
233                String,
234                String,
235                Option<String>,
236                Option<String>,
237                DateTime<Utc>,
238                Option<DateTime<Utc>>,
239                Option<i64>,
240            ),
241        >(&query_str);
242        for task_id in task_ids {
243            query = query.bind(task_id.to_string());
244        }
245
246        let rows = query
247            .fetch_all(&self.pool)
248            .await
249            .map_err(|e| CelersError::Other(format!("Failed to fetch results: {}", e)))?;
250
251        rows.into_iter()
252            .map(
253                |(
254                    task_id,
255                    task_name,
256                    status,
257                    result,
258                    error,
259                    traceback,
260                    created_at,
261                    completed_at,
262                    runtime_ms,
263                )| {
264                    Ok(TaskResult {
265                        task_id: Uuid::parse_str(&task_id)
266                            .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
267                        task_name,
268                        status: status.parse()?,
269                        result: serde_json::from_str(&result).ok(),
270                        error,
271                        traceback,
272                        created_at,
273                        completed_at,
274                        runtime_ms,
275                    })
276                },
277            )
278            .collect()
279    }
280
281    /// Enable drain mode - prevents new tasks from being enqueued while allowing processing of existing tasks
282    ///
283    /// This is useful for graceful shutdown scenarios where you want to:
284    /// 1. Stop accepting new work
285    /// 2. Allow workers to finish current tasks
286    /// 3. Drain the queue before shutting down
287    ///
288    /// # Examples
289    ///
290    /// ```no_run
291    /// # use celers_broker_sql::MysqlBroker;
292    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
293    /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
294    /// // Enable drain mode
295    /// broker.enable_drain_mode().await?;
296    ///
297    /// // Check drain status
298    /// let is_draining = broker.is_drain_mode().await?;
299    /// println!("Drain mode: {}", is_draining);
300    ///
301    /// // Disable drain mode when ready
302    /// broker.disable_drain_mode().await?;
303    /// # Ok(())
304    /// # }
305    /// ```
306    pub async fn enable_drain_mode(&self) -> Result<()> {
307        sqlx::query(
308            r#"
309            INSERT INTO celers_queue_config (queue_name, config_key, config_value, updated_at)
310            VALUES (?, 'drain_mode', 'true', NOW())
311            ON DUPLICATE KEY UPDATE config_value = 'true', updated_at = NOW()
312            "#,
313        )
314        .bind(&self.queue_name)
315        .execute(&self.pool)
316        .await
317        .map_err(|e| CelersError::Other(format!("Failed to enable drain mode: {}", e)))?;
318
319        Ok(())
320    }
321
322    /// Disable drain mode - allows new tasks to be enqueued again
323    pub async fn disable_drain_mode(&self) -> Result<()> {
324        sqlx::query(
325            r#"
326            INSERT INTO celers_queue_config (queue_name, config_key, config_value, updated_at)
327            VALUES (?, 'drain_mode', 'false', NOW())
328            ON DUPLICATE KEY UPDATE config_value = 'false', updated_at = NOW()
329            "#,
330        )
331        .bind(&self.queue_name)
332        .execute(&self.pool)
333        .await
334        .map_err(|e| CelersError::Other(format!("Failed to disable drain mode: {}", e)))?;
335
336        Ok(())
337    }
338
339    /// Check if drain mode is enabled
340    pub async fn is_drain_mode(&self) -> Result<bool> {
341        let row: Option<(String,)> = sqlx::query_as(
342            r#"
343            SELECT config_value
344            FROM celers_queue_config
345            WHERE queue_name = ? AND config_key = 'drain_mode'
346            "#,
347        )
348        .bind(&self.queue_name)
349        .fetch_optional(&self.pool)
350        .await
351        .map_err(|e| CelersError::Other(format!("Failed to check drain mode: {}", e)))?;
352
353        Ok(row.map(|(val,)| val == "true").unwrap_or(false))
354    }
355
356    /// Register a worker and update its heartbeat
357    ///
358    /// # Arguments
359    ///
360    /// * `worker_id` - Unique identifier for the worker
361    /// * `status` - Current worker status
362    /// * `capabilities` - Optional JSON object describing worker capabilities
363    ///
364    /// # Examples
365    ///
366    /// ```no_run
367    /// # use celers_broker_sql::{MysqlBroker, WorkerStatus};
368    /// # use serde_json::json;
369    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
370    /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
371    /// broker.register_worker(
372    ///     "worker-1",
373    ///     WorkerStatus::Active,
374    ///     Some(json!({"cpu_cores": 4, "memory_gb": 8}))
375    /// ).await?;
376    /// # Ok(())
377    /// # }
378    /// ```
379    pub async fn register_worker(
380        &self,
381        worker_id: &str,
382        status: WorkerStatus,
383        capabilities: Option<serde_json::Value>,
384    ) -> Result<()> {
385        let capabilities_json = capabilities
386            .as_ref()
387            .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
388            .unwrap_or_else(|| "null".to_string());
389
390        sqlx::query(
391            r#"
392            INSERT INTO celers_worker_heartbeat
393                (worker_id, queue_name, last_heartbeat, status, capabilities, task_count, updated_at)
394            VALUES (?, ?, NOW(), ?, ?, 0, NOW())
395            ON DUPLICATE KEY UPDATE
396                last_heartbeat = NOW(),
397                status = VALUES(status),
398                capabilities = VALUES(capabilities),
399                updated_at = NOW()
400            "#,
401        )
402        .bind(worker_id)
403        .bind(&self.queue_name)
404        .bind(status.to_string())
405        .bind(capabilities_json)
406        .execute(&self.pool)
407        .await
408        .map_err(|e| {
409            CelersError::Other(format!("Failed to register worker: {}", e))
410        })?;
411
412        Ok(())
413    }
414
415    /// Update worker heartbeat to indicate it's still alive
416    pub async fn update_worker_heartbeat(
417        &self,
418        worker_id: &str,
419        status: WorkerStatus,
420    ) -> Result<()> {
421        let rows_affected = sqlx::query(
422            r#"
423            UPDATE celers_worker_heartbeat
424            SET last_heartbeat = NOW(), status = ?, updated_at = NOW()
425            WHERE worker_id = ? AND queue_name = ?
426            "#,
427        )
428        .bind(status.to_string())
429        .bind(worker_id)
430        .bind(&self.queue_name)
431        .execute(&self.pool)
432        .await
433        .map_err(|e| CelersError::Other(format!("Failed to update worker heartbeat: {}", e)))?
434        .rows_affected();
435
436        if rows_affected == 0 {
437            return Err(CelersError::Other(format!(
438                "Worker {} not found",
439                worker_id
440            )));
441        }
442
443        Ok(())
444    }
445
446    /// Get heartbeat information for all workers
447    ///
448    /// # Arguments
449    ///
450    /// * `stale_threshold_secs` - Seconds after which a worker is considered stale/offline
451    ///
452    /// # Returns
453    ///
454    /// Vector of worker heartbeat information
455    ///
456    /// # Examples
457    ///
458    /// ```no_run
459    /// # use celers_broker_sql::MysqlBroker;
460    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
461    /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
462    /// let workers = broker.get_all_worker_heartbeats(60).await?;
463    /// for worker in workers {
464    ///     println!("Worker {} status: {}", worker.worker_id, worker.status);
465    /// }
466    /// # Ok(())
467    /// # }
468    /// ```
469    pub async fn get_all_worker_heartbeats(
470        &self,
471        stale_threshold_secs: i64,
472    ) -> Result<Vec<WorkerHeartbeat>> {
473        let rows: Vec<(String, DateTime<Utc>, String, i64, String)> = sqlx::query_as(
474            r#"
475            SELECT
476                worker_id,
477                last_heartbeat,
478                CASE
479                    WHEN TIMESTAMPDIFF(SECOND, last_heartbeat, NOW()) > ? THEN 'offline'
480                    ELSE status
481                END as status,
482                task_count,
483                COALESCE(capabilities, 'null') as capabilities
484            FROM celers_worker_heartbeat
485            WHERE queue_name = ?
486            ORDER BY last_heartbeat DESC
487            "#,
488        )
489        .bind(stale_threshold_secs)
490        .bind(&self.queue_name)
491        .fetch_all(&self.pool)
492        .await
493        .map_err(|e| CelersError::Other(format!("Failed to fetch worker heartbeats: {}", e)))?;
494
495        rows.into_iter()
496            .map(
497                |(worker_id, last_heartbeat, status, task_count, capabilities)| {
498                    let status = match status.as_str() {
499                        "active" => WorkerStatus::Active,
500                        "idle" => WorkerStatus::Idle,
501                        "busy" => WorkerStatus::Busy,
502                        _ => WorkerStatus::Offline,
503                    };
504                    let capabilities = serde_json::from_str(&capabilities).ok();
505                    Ok(WorkerHeartbeat {
506                        worker_id,
507                        last_heartbeat,
508                        status,
509                        task_count,
510                        capabilities,
511                    })
512                },
513            )
514            .collect()
515    }
516
517    /// Enqueue a group of related tasks
518    ///
519    /// # Arguments
520    ///
521    /// * `group_id` - Unique identifier for the task group
522    /// * `tasks` - Vector of tasks to enqueue
523    /// * `metadata` - Optional metadata for the group
524    ///
525    /// # Returns
526    ///
527    /// Vector of task IDs that were enqueued
528    ///
529    /// # Examples
530    ///
531    /// ```no_run
532    /// # use celers_broker_sql::MysqlBroker;
533    /// # use celers_core::SerializedTask;
534    /// # use serde_json::json;
535    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
536    /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
537    /// # let task1 = SerializedTask::new("test".to_string(), vec![])
538    /// #     .with_priority(0)
539    /// #     .with_max_retries(3);
540    /// # let task2 = SerializedTask::new("test".to_string(), vec![])
541    /// #     .with_priority(0)
542    /// #     .with_max_retries(3);
543    /// let task_ids = broker.enqueue_group(
544    ///     "batch-123",
545    ///     vec![task1, task2],
546    ///     Some(json!({"batch_type": "data_processing"}))
547    /// ).await?;
548    /// println!("Enqueued group with {} tasks", task_ids.len());
549    /// # Ok(())
550    /// # }
551    /// ```
552    pub async fn enqueue_group(
553        &self,
554        group_id: &str,
555        tasks: Vec<SerializedTask>,
556        metadata: Option<serde_json::Value>,
557    ) -> Result<Vec<TaskId>> {
558        if tasks.is_empty() {
559            return Ok(Vec::new());
560        }
561
562        let mut tx = self
563            .pool
564            .begin()
565            .await
566            .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
567
568        let mut task_ids = Vec::new();
569
570        for task in tasks {
571            let task_id = Uuid::new_v4();
572
573            sqlx::query(
574                r#"
575                INSERT INTO celers_tasks
576                    (id, task_name, payload, state, priority, retry_count, max_retries, created_at, scheduled_at, metadata)
577                VALUES (?, ?, ?, 'pending', ?, 0, ?, NOW(), NOW(), ?)
578                "#,
579            )
580            .bind(task_id.to_string())
581            .bind(&task.metadata.name)
582            .bind(&task.payload)
583            .bind(task.metadata.priority)
584            .bind(task.metadata.max_retries as i32)
585            .bind(serde_json::to_string(&json!({"group_id": group_id})).unwrap_or_else(|_| "{}".to_string()))
586            .execute(&mut *tx)
587            .await
588            .map_err(|e| {
589                CelersError::Other(format!("Failed to insert task: {}", e))
590            })?;
591
592            task_ids.push(task_id);
593        }
594
595        // Store group metadata
596        let metadata_json = metadata
597            .as_ref()
598            .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
599            .unwrap_or_else(|| "null".to_string());
600
601        sqlx::query(
602            r#"
603            INSERT INTO celers_task_groups
604                (group_id, queue_name, task_count, created_at, metadata)
605            VALUES (?, ?, ?, NOW(), ?)
606            "#,
607        )
608        .bind(group_id)
609        .bind(&self.queue_name)
610        .bind(task_ids.len() as i64)
611        .bind(metadata_json)
612        .execute(&mut *tx)
613        .await
614        .map_err(|e| CelersError::Other(format!("Failed to insert task group: {}", e)))?;
615
616        tx.commit()
617            .await
618            .map_err(|e| CelersError::Other(format!("Failed to commit transaction: {}", e)))?;
619
620        Ok(task_ids)
621    }
622
623    /// Get status of a task group
624    ///
625    /// # Arguments
626    ///
627    /// * `group_id` - The task group identifier
628    ///
629    /// # Returns
630    ///
631    /// Task group status with counts by state
632    ///
633    /// # Examples
634    ///
635    /// ```no_run
636    /// # use celers_broker_sql::MysqlBroker;
637    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
638    /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
639    /// let status = broker.get_group_status("batch-123").await?;
640    /// println!("Group has {} completed tasks out of {} total",
641    ///     status.completed_tasks, status.total_tasks);
642    /// # Ok(())
643    /// # }
644    /// ```
645    pub async fn get_group_status(&self, group_id: &str) -> Result<TaskGroupStatus> {
646        let row: (i64, i64, i64, i64, i64, i64) = sqlx::query_as(
647            r#"
648            SELECT
649                COUNT(*) as total_tasks,
650                SUM(CASE WHEN state = 'pending' THEN 1 ELSE 0 END) as pending_tasks,
651                SUM(CASE WHEN state = 'processing' THEN 1 ELSE 0 END) as processing_tasks,
652                SUM(CASE WHEN state = 'completed' THEN 1 ELSE 0 END) as completed_tasks,
653                SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
654                SUM(CASE WHEN state = 'cancelled' THEN 1 ELSE 0 END) as cancelled_tasks
655            FROM celers_tasks
656            WHERE JSON_UNQUOTE(JSON_EXTRACT(metadata, '$.group_id')) = ?
657            "#,
658        )
659        .bind(group_id)
660        .fetch_one(&self.pool)
661        .await
662        .map_err(|e| CelersError::Other(format!("Failed to fetch group status: {}", e)))?;
663
664        Ok(TaskGroupStatus {
665            group_id: group_id.to_string(),
666            total_tasks: row.0,
667            pending_tasks: row.1,
668            processing_tasks: row.2,
669            completed_tasks: row.3,
670            failed_tasks: row.4,
671            cancelled_tasks: row.5,
672        })
673    }
674
675    /// Check if connection pool is healthy and can handle current load
676    ///
677    /// Performs a comprehensive health check of the connection pool including:
678    /// - Connection availability (can acquire a connection)
679    /// - Pool utilization (not over-utilized)
680    /// - Database responsiveness (simple query performance)
681    ///
682    /// # Returns
683    ///
684    /// `Ok(true)` if healthy, `Ok(false)` if degraded, `Err` if critical failure
685    ///
686    /// # Examples
687    ///
688    /// ```no_run
689    /// # use celers_broker_sql::MysqlBroker;
690    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
691    /// let broker = MysqlBroker::new("mysql://localhost/db").await?;
692    ///
693    /// match broker.check_connection_health().await {
694    ///     Ok(true) => println!("Connection pool is healthy"),
695    ///     Ok(false) => println!("Connection pool is degraded"),
696    ///     Err(e) => println!("Connection pool has critical issues: {}", e),
697    /// }
698    /// # Ok(())
699    /// # }
700    /// ```
701    pub async fn check_connection_health(&self) -> Result<bool> {
702        // Try to acquire a connection with timeout
703        let start = std::time::Instant::now();
704        let conn_result =
705            tokio::time::timeout(std::time::Duration::from_secs(5), self.pool.acquire()).await;
706
707        match conn_result {
708            Err(_) => {
709                tracing::error!("Connection pool timeout: failed to acquire connection within 5s");
710                Err(CelersError::Other(
711                    "Connection pool exhausted: timeout acquiring connection".to_string(),
712                ))
713            }
714            Ok(Err(e)) => {
715                tracing::error!(error = %e, "Connection pool error");
716                Err(CelersError::Other(format!("Connection pool error: {}", e)))
717            }
718            Ok(Ok(mut conn)) => {
719                let acquire_time = start.elapsed();
720
721                // Warn if connection acquisition took too long
722                if acquire_time > std::time::Duration::from_secs(1) {
723                    tracing::warn!(
724                        acquire_time_ms = acquire_time.as_millis(),
725                        "Slow connection acquisition indicates pool pressure"
726                    );
727                }
728
729                // Test database responsiveness with simple query
730                let query_start = std::time::Instant::now();
731                let result = sqlx::query_scalar::<_, i64>("SELECT 1")
732                    .fetch_one(&mut *conn)
733                    .await;
734
735                match result {
736                    Err(e) => {
737                        tracing::error!(error = %e, "Database health check query failed");
738                        Err(CelersError::Other(format!("Database unresponsive: {}", e)))
739                    }
740                    Ok(_) => {
741                        let query_time = query_start.elapsed();
742
743                        // Warn if database is responding slowly
744                        if query_time > std::time::Duration::from_millis(100) {
745                            tracing::warn!(
746                                query_time_ms = query_time.as_millis(),
747                                "Slow database response indicates potential issues"
748                            );
749                            return Ok(false); // Degraded
750                        }
751
752                        // Check pool utilization
753                        let pool_size = self.pool.size();
754                        let idle_conns = self.pool.num_idle() as u32;
755                        let active_conns = pool_size.saturating_sub(idle_conns);
756                        let utilization = (active_conns as f64 / pool_size as f64) * 100.0;
757
758                        if utilization > 90.0 {
759                            tracing::warn!(
760                                utilization_percent = utilization,
761                                pool_size = pool_size,
762                                active = active_conns,
763                                idle = idle_conns,
764                                "High connection pool utilization"
765                            );
766                            return Ok(false); // Degraded
767                        }
768
769                        tracing::debug!(
770                            acquire_time_ms = acquire_time.as_millis(),
771                            query_time_ms = query_time.as_millis(),
772                            utilization_percent = utilization,
773                            "Connection pool health check passed"
774                        );
775
776                        Ok(true) // Healthy
777                    }
778                }
779            }
780        }
781    }
782
783    /// Batch replay tasks from DLQ with filtering
784    ///
785    /// Requeues multiple tasks from the dead letter queue based on filter criteria.
786    /// This is useful for recovering from systematic failures or replaying tasks
787    /// after fixing bugs.
788    ///
789    /// # Arguments
790    ///
791    /// * `task_name_filter` - Optional task name pattern to match (None = all tasks)
792    /// * `min_retry_count` - Minimum retry count to include (for filtering partial failures)
793    /// * `limit` - Maximum number of tasks to replay
794    ///
795    /// # Returns
796    ///
797    /// Number of tasks successfully requeued from DLQ
798    ///
799    /// # Examples
800    ///
801    /// ```
802    /// # use celers_broker_sql::MysqlBroker;
803    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
804    /// # let broker = MysqlBroker::new("mysql://localhost/test").await?;
805    /// // Replay all tasks with "payment" in the name that failed with 3+ retries
806    /// let count = broker.replay_dlq_batch(Some("payment"), Some(3), 100).await?;
807    /// println!("Replayed {} payment tasks from DLQ", count);
808    ///
809    /// // Replay all failed tasks (no filter)
810    /// let count = broker.replay_dlq_batch(None, None, 1000).await?;
811    /// println!("Replayed {} tasks from DLQ", count);
812    /// # Ok(())
813    /// # }
814    /// ```
815    pub async fn replay_dlq_batch(
816        &self,
817        task_name_filter: Option<&str>,
818        min_retry_count: Option<i32>,
819        limit: i64,
820    ) -> Result<u64> {
821        let mut query = String::from("SELECT id FROM celers_dead_letter_queue WHERE 1=1");
822
823        if task_name_filter.is_some() {
824            query.push_str(" AND task_name LIKE ?");
825        }
826
827        if min_retry_count.is_some() {
828            query.push_str(" AND retry_count >= ?");
829        }
830
831        query.push_str(" ORDER BY failed_at ASC LIMIT ?");
832
833        let mut q = sqlx::query_scalar::<_, String>(&query);
834
835        if let Some(filter) = task_name_filter {
836            q = q.bind(format!("%{}%", filter));
837        }
838
839        if let Some(min_retries) = min_retry_count {
840            q = q.bind(min_retries);
841        }
842
843        q = q.bind(limit);
844
845        let dlq_ids = q
846            .fetch_all(&self.pool)
847            .await
848            .map_err(|e| CelersError::Other(format!("Failed to fetch DLQ IDs: {}", e)))?;
849
850        let mut replayed = 0u64;
851        for dlq_id in dlq_ids {
852            match Uuid::parse_str(&dlq_id) {
853                Ok(id) => {
854                    if self.requeue_from_dlq(&id).await.is_ok() {
855                        replayed += 1;
856                    }
857                }
858                Err(e) => {
859                    tracing::warn!(dlq_id = %dlq_id, error = %e, "Failed to parse DLQ ID");
860                }
861            }
862        }
863
864        tracing::info!(
865            replayed = replayed,
866            task_filter = ?task_name_filter,
867            min_retries = ?min_retry_count,
868            "Batch replay from DLQ completed"
869        );
870
871        Ok(replayed)
872    }
873}