apalis-postgres 1.0.0-rc.7

Background task processing for rust using apalis and postgres
Documentation
use apalis_core::backend::{BackendExt, ListQueues, QueueInfo};
use serde_json::Value;
use ulid::Ulid;

use crate::{CompactType, PgContext, PostgresStorage};

impl<Args, D, F> ListQueues for PostgresStorage<Args, CompactType, D, F>
where
    PostgresStorage<Args, CompactType, D, F>:
        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
{
    fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send {
        let pool = self.pool.clone();
        struct QueueInfoRow {
            pub name: Option<String>,
            pub stats: Option<Value>,
            pub workers: Option<Value>,
            pub activity: Option<Value>,
        }

        async move {
            let queues = sqlx::query_file_as!(QueueInfoRow, "queries/backend/list_queues.sql")
                .fetch_all(&pool)
                .await?
                .into_iter()
                .map(|row| QueueInfo {
                    name: row.name.unwrap_or_default(),
                    stats: serde_json::from_value(row.stats.unwrap()).unwrap_or_default(),
                    workers: serde_json::from_value(row.workers.unwrap()).unwrap_or_default(),
                    activity: serde_json::from_value(row.activity.unwrap()).unwrap_or_default(),
                })
                .collect();
            Ok(queues)
        }
    }
}