graphile_worker_lifecycle_hooks 0.3.0

Lifecycle hooks for graphile_worker, a high performance Rust/PostgreSQL job queue
Documentation
mod context;
mod event;
mod events;
mod plugin;
mod registry;
mod result;

use std::any::{Any, TypeId};
use std::collections::HashMap;

use futures::future::BoxFuture;

pub use context::*;
pub use event::{Event, HookOutput, Interceptable};
pub use events::*;
pub use plugin::Plugin;
pub use registry::HookRegistry;
pub use result::*;

type HandlerVec<Ctx, Out> = Vec<Box<dyn Fn(Ctx) -> BoxFuture<'static, Out> + Send + Sync>>;

#[derive(Default)]
#[doc(hidden)]
pub struct TypeErasedHooks {
    handlers: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
}

impl TypeErasedHooks {
    pub fn is_empty(&self) -> bool {
        self.handlers.is_empty()
    }

    pub fn get_handlers<E: Event>(&self) -> Option<&HandlerVec<E::Context, E::Output>> {
        self.handlers
            .get(&TypeId::of::<E>())
            .and_then(|h| h.downcast_ref())
    }

    pub fn get_handlers_mut<E: Event>(&mut self) -> &mut HandlerVec<E::Context, E::Output> {
        self.handlers
            .entry(TypeId::of::<E>())
            .or_insert_with(|| Box::new(HandlerVec::<E::Context, E::Output>::new()))
            .downcast_mut()
            .expect("Handler type mismatch")
    }

    pub async fn emit<C: Emittable>(&self, ctx: C) {
        if self.is_empty() {
            return;
        }

        ctx.emit_to(self).await
    }

    pub async fn intercept<C: event::Interceptable>(&self, ctx: C) -> C::Output {
        ctx.intercept_with(self).await
    }
}

impl HookRegistry {
    pub fn is_empty(&self) -> bool {
        self.inner.is_empty()
    }

    pub async fn emit<C: Emittable>(&self, ctx: C) {
        self.inner.emit(ctx).await
    }

    pub async fn intercept<C: event::Interceptable>(&self, ctx: C) -> C::Output {
        self.inner.intercept(ctx).await
    }
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;
    use std::time::Duration;

    use graphile_worker_job::Job;
    use serde_json::json;

    use super::*;

    #[test]
    fn emit_on_empty_registry_returns() {
        futures::executor::block_on(async {
            TypeErasedHooks::default()
                .emit(JobCompleteContext {
                    job: Arc::new(Job::builder().build()),
                    worker_id: "worker".to_string(),
                    duration: Duration::ZERO,
                })
                .await;
        });
    }

    #[test]
    fn hook_output_defaults_and_registry_registration_paths() {
        assert!(!<() as HookOutput>::is_terminal(&()));
        assert_eq!(<() as HookOutput>::chain_value(&()), None);
        let _: () = <() as HookOutput>::default_with_value(());

        assert!(!HookResult::Continue.is_terminal());
        assert!(HookResult::Skip.is_terminal());
        assert!(HookResult::Fail("failed".to_string()).is_terminal());

        let value = json!({ "ok": true });
        let schedule_result = JobScheduleResult::Continue(value.clone());
        assert!(!schedule_result.is_terminal());
        assert_eq!(schedule_result.chain_value(), Some(value.clone()));
        assert!(JobScheduleResult::Skip.is_terminal());
        assert!(JobScheduleResult::Fail("failed".to_string()).is_terminal());
        match JobScheduleResult::default_with_value(value.clone()) {
            JobScheduleResult::Continue(actual) => assert_eq!(actual, value),
            _ => panic!("expected continue"),
        }

        let mut registry = HookRegistry::new();
        registry.on(JobComplete, |_ctx| async {});
        assert!(!registry.is_empty());
    }
}