graphile_worker_ctx 0.6.1

Worker Context package for graphile_worker, a high performance Rust/PostgreSQL job queue
Documentation
use graphile_worker_extensions::{Extensions, ReadOnlyExtensions};
use graphile_worker_job::Job;
use sqlx::{postgres::PgPoolOptions, PgPool};

use crate::{SharedTaskDetails, WorkerContext};

fn create_test_job() -> Job {
    Job::builder()
        .id(1)
        .payload(serde_json::json!({"test": "data"}))
        .task_identifier("test_task".to_string())
        .build()
}

fn create_test_pool() -> PgPool {
    PgPoolOptions::new()
        .connect_lazy("postgres://test:test@localhost/test")
        .expect("Failed to create lazy pool")
}

fn create_extensions() -> ReadOnlyExtensions {
    ReadOnlyExtensions::new(Extensions::default())
}

#[derive(Clone, Debug)]
struct TestExtension {
    value: &'static str,
}

#[tokio::test]
async fn test_worker_context_builder() {
    let job = create_test_job();
    let pool = create_test_pool();
    let extensions = create_extensions();
    let task_details = SharedTaskDetails::default();

    let ctx = WorkerContext::builder()
        .payload(serde_json::json!({"key": "value"}))
        .pg_pool(pool)
        .schema("graphile_worker".to_string())
        .job(job)
        .worker_id("worker-1".to_string())
        .extensions(extensions)
        .task_details(task_details)
        .use_local_time(true)
        .build();

    assert_eq!(ctx.payload(), &serde_json::json!({"key": "value"}));
    assert_eq!(ctx.schema().escaped(), "graphile_worker");
    assert_eq!(ctx.worker_id(), "worker-1");
    assert!(ctx.use_local_time());
}

#[tokio::test]
async fn test_worker_context_builder_use_local_time_default() {
    let job = create_test_job();
    let pool = create_test_pool();
    let extensions = create_extensions();
    let task_details = SharedTaskDetails::default();

    let ctx = WorkerContext::builder()
        .payload(serde_json::json!({}))
        .pg_pool(pool)
        .schema("schema".to_string())
        .job(job)
        .worker_id("worker".to_string())
        .extensions(extensions)
        .task_details(task_details)
        .build();

    assert!(!ctx.use_local_time());
}

#[tokio::test]
async fn test_worker_context_from_shared_job_uses_job_payload() {
    let job = std::sync::Arc::new(create_test_job());
    let pool = create_test_pool();
    let mut extensions = Extensions::default();
    extensions.insert(TestExtension { value: "present" });
    let extensions = ReadOnlyExtensions::new(extensions);
    let task_details = SharedTaskDetails::default();

    let ctx = WorkerContext::from_shared_job(
        job.clone(),
        pool,
        "graphile_worker".to_string(),
        "worker-1".to_string(),
        extensions,
        task_details,
        true,
    );

    assert_eq!(ctx.payload(), job.payload());
    assert_eq!(ctx.job().id(), job.id());
    assert_eq!(ctx.schema().escaped(), "graphile_worker");
    assert_eq!(ctx.worker_id(), "worker-1");
    assert!(ctx.use_local_time());
    assert_eq!(
        ctx.extensions().get::<TestExtension>().unwrap().value,
        "present"
    );
    assert_eq!(ctx.get_ext::<TestExtension>().unwrap().value, "present");
}

#[tokio::test]
async fn test_worker_context_builder_uses_job_payload_when_payload_missing() {
    let job = create_test_job();
    let expected_payload = job.payload().clone();
    let pool = create_test_pool();
    let extensions = create_extensions();
    let task_details = SharedTaskDetails::default();

    let ctx = WorkerContext::builder()
        .pg_pool(pool)
        .schema("schema".to_string())
        .job(job)
        .worker_id("worker".to_string())
        .extensions(extensions)
        .task_details(task_details)
        .build();

    assert_eq!(ctx.payload(), &expected_payload);
}