nakadion 0.11.2

A connector for the Nakadi Event Broker
Documentation
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use nakadi::metrics::*;

pub struct CancellationTokenSource {
    cancellation_requested: Arc<AtomicBool>,
    cancelled: Arc<AtomicBool>,
    metrics_collector: Arc<MetricsCollector + Sync + Send + 'static>,
}

impl Drop for CancellationTokenSource {
    fn drop(&mut self) {
        self.request_cancellation()
    }
}

impl CancellationTokenSource {
    pub fn new<M>(metrics_collector: M) -> Self
    where
        M: MetricsCollector + Sync + Send + 'static,
    {
        CancellationTokenSource::new_arc(Arc::new(metrics_collector))
    }

    pub fn new_arc<M>(metrics_collector: Arc<M>) -> Self
    where
        M: MetricsCollector + Sync + Send + 'static,
    {
        CancellationTokenSource {
            cancellation_requested: Arc::new(AtomicBool::new(false)),
            cancelled: Arc::new(AtomicBool::new(false)),
            metrics_collector,
        }
    }

    pub fn request_cancellation(&self) {
        self.cancellation_requested.store(true, Ordering::Relaxed);
    }

    pub fn is_any_cancelled(&self) -> bool {
        self.cancelled.load(Ordering::Relaxed)
    }

    pub fn auto_token(&self) -> AutoCancellationToken {
        AutoCancellationToken {
            cancellation_requested: self.cancellation_requested.clone(),
            cancelled: self.cancelled.clone(),
            metrics_collector: self.metrics_collector.clone(),
        }
    }
}

impl Default for CancellationTokenSource {
    fn default() -> Self {
        CancellationTokenSource {
            cancellation_requested: Arc::new(AtomicBool::new(false)),
            cancelled: Arc::new(AtomicBool::new(false)),
            metrics_collector: Arc::new(DevNullMetricsCollector),
        }
    }
}

pub trait CancellationToken {
    fn cancellation_requested(&self) -> bool;
    fn cancelled(&self);
}

#[derive(Clone)]
pub struct AutoCancellationToken {
    cancellation_requested: Arc<AtomicBool>,
    cancelled: Arc<AtomicBool>,
    metrics_collector: Arc<MetricsCollector + Send + Sync + 'static>,
}

impl Drop for AutoCancellationToken {
    fn drop(&mut self) {
        self.cancelled();
        if ::std::thread::panicking() {
            self.metrics_collector.other_panicked();
            error!(
                "Abnormal cancellation due to a panic on thread '{}'!",
                ::std::thread::current().name().unwrap_or("<unnamed>")
            );
        };
    }
}

impl CancellationToken for AutoCancellationToken {
    fn cancellation_requested(&self) -> bool {
        self.cancellation_requested.load(Ordering::Relaxed)
    }

    fn cancelled(&self) {
        self.cancelled.store(true, Ordering::Relaxed)
    }
}