rexecutor-sqlx 0.1.1

A robust job processing library
Documentation
use chrono::Utc;
use rexecutor::backend::Query;
use rexecutor::pruner::{PruneBy, PruneSpec, Spec};
use sqlx::{Postgres, QueryBuilder};

use crate::types::JobStatus;

pub(crate) trait ToQuery {
    fn query(&self) -> QueryBuilder<'_, Postgres>;
}

impl ToQuery for PruneSpec {
    fn query(&self) -> QueryBuilder<'_, Postgres> {
        let status: JobStatus = self.status.into();
        let mut builder = QueryBuilder::new("DELETE FROM rexecutor_jobs WHERE id in (");
        builder.push("SELECT id FROM rexecutor_jobs WHERE status = ");
        builder.push_bind::<JobStatus>(status);
        match &self.executors {
            Spec::Except(executors) => {
                builder.push(" AND executor != ALL(");
                builder.push_bind(executors);
            }
            Spec::Only(executors) => {
                builder.push(" AND executor = ANY(");
                builder.push_bind(executors);
            }
        }
        builder.push(")");
        let in_past = match status {
            JobStatus::Scheduled | JobStatus::Retryable | JobStatus::Executing => false,
            JobStatus::Complete | JobStatus::Cancelled | JobStatus::Discarded => true,
        };
        match self.prune_by {
            PruneBy::MaxAge(time) => {
                builder.push(" AND scheduled_at");
                if in_past {
                    builder.push(" < ");
                    builder.push_bind(Utc::now() - time);
                } else {
                    builder.push(" > ");
                    builder.push_bind(Utc::now() + time);
                }
            }
            PruneBy::MaxLength(count) => {
                builder.push(" ORDER BY scheduled_at");
                if in_past {
                    builder.push(" DESC ");
                } else {
                    builder.push(" ASC ");
                }
                builder.push("OFFSET ");
                builder.push_bind(count as i64);
            }
        }
        builder.push(")");

        builder
    }
}

impl ToQuery for Query<'_> {
    fn query(&self) -> QueryBuilder<'_, Postgres> {
        let mut builder = QueryBuilder::new(
            r#"SELECT 
                id,
                status,
                executor,
                data,
                metadata,
                attempt,
                max_attempts,
                priority,
                tags,
                errors,
                inserted_at,
                scheduled_at,
                attempted_at,
                completed_at,
                cancelled_at,
                discarded_at
            FROM rexecutor_jobs WHERE"#,
        );
        handle_query(&mut builder, self);
        builder.push(" ORDER BY inserted_at DESC");
        builder
    }
}

fn handle_query<'a>(builder: &mut QueryBuilder<'a, Postgres>, query: &'a Query<'_>) {
    match query {
        Query::Not(inner) => {
            builder.push(" NOT");
            handle_query(builder, inner);
        }
        Query::And(queries) => {
            if let Some((last, elements)) = &queries[..].split_last() {
                elements.iter().for_each(|query| {
                    handle_query(builder, query);
                    builder.push(" AND");
                });
                handle_query(builder, last);
            }
        }
        Query::Or(queries) => {
            if let Some((last, elements)) = &queries[..].split_last() {
                elements.iter().for_each(|query| {
                    handle_query(builder, query);
                    builder.push(" OR");
                });
                handle_query(builder, last);
            }
        }
        Query::DataEquals(data) => {
            builder.push(" data = ");
            builder.push_bind(data);
        }
        Query::MetadataEquals(metadata) => {
            builder.push(" metadata = ");
            builder.push_bind(metadata);
        }
        Query::ExecutorEqual(executor) => {
            builder.push(" executor = ");
            builder.push_bind(executor);
        }
        Query::IdEquals(id) => {
            builder.push(" id = ");
            builder.push_bind(Into::<i32>::into(*id));
        }
        Query::IdIn(ids) => {
            let ids: Vec<i32> = ids
                .iter()
                .map(|job_id| Into::<i32>::into(*job_id))
                .collect();
            builder.push(" id = All(");
            builder.push_bind(ids);
            builder.push(")");
        }
        Query::StatusEqual(status) => {
            builder.push(" status = ");
            builder.push_bind(JobStatus::from(*status));
        }
        Query::TagsAllOf(tags) => {
            builder.push(" tags @> ");
            builder.push_bind(&tags[..]);
            builder.push("::varchar[]");
        }
        Query::TagsOneOf(tags) => {
            builder.push(" tags && ");
            builder.push_bind(&tags[..]);
            builder.push("::varchar[]");
        }
        Query::ScheduledAtBefore(scheduled_at) => {
            builder.push(" scheduled_at < ");
            builder.push_bind(scheduled_at);
        }
        Query::ScheduledAtAfter(scheduled_at) => {
            builder.push(" scheduled_at > ");
            builder.push_bind(scheduled_at);
        }
        Query::ScheduledAtEqual(scheduled_at) => {
            builder.push(" scheduled_at = ");
            builder.push_bind(scheduled_at);
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use chrono::TimeDelta;

    #[test]
    fn to_query_for_prune_spec_test() {
        let spec = PruneSpec {
            status: JobStatus::Complete.into(),
            prune_by: PruneBy::MaxAge(TimeDelta::hours(2)),
            executors: Spec::Only(vec!["simple_job"]),
        };

        assert_eq!(
            spec.query().into_sql(),
            "DELETE FROM rexecutor_jobs WHERE id in (SELECT id FROM rexecutor_jobs \
            WHERE status = $1 AND executor = ANY($2) AND scheduled_at < $3)"
        );

        let spec = PruneSpec {
            status: JobStatus::Complete.into(),
            prune_by: PruneBy::MaxLength(10),
            executors: Spec::Only(vec!["simple_job"]),
        };

        assert_eq!(
            spec.query().into_sql(),
            "DELETE FROM rexecutor_jobs WHERE id in (SELECT id FROM rexecutor_jobs \
            WHERE status = $1 AND executor = ANY($2) ORDER BY scheduled_at DESC OFFSET $3)"
        );

        let spec = PruneSpec {
            status: JobStatus::Complete.into(),
            prune_by: PruneBy::MaxLength(10),
            executors: Spec::Except(vec!["simple_job"]),
        };

        assert_eq!(
            spec.query().into_sql(),
            "DELETE FROM rexecutor_jobs WHERE id in (SELECT id FROM rexecutor_jobs \
            WHERE status = $1 AND executor != ALL($2) ORDER BY scheduled_at DESC OFFSET $3)"
        );

        let spec = PruneSpec {
            status: JobStatus::Complete.into(),
            prune_by: PruneBy::MaxAge(TimeDelta::days(1)),
            executors: Spec::Except(vec!["simple_job"]),
        };

        assert_eq!(
            spec.query().into_sql(),
            "DELETE FROM rexecutor_jobs WHERE id in (SELECT id FROM rexecutor_jobs \
            WHERE status = $1 AND executor != ALL($2) AND scheduled_at < $3)"
        );
    }

    #[test]
    fn to_query_for_query_test() {
        let query = Query::And(vec![
            Query::ScheduledAtEqual(Utc::now()),
            Query::TagsOneOf(&["tag1", "tag2"]),
        ]);
        dbg!(query.query().into_sql());
    }
}