use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Notify, Semaphore};
use tokio_util::sync::CancellationToken;
use super::core::Outbox;
use super::handler::{
Handler, MessageHandler, PerMessageAdapter, TransactionalHandler, TransactionalMessageHandler,
};
use super::manager::{OutboxBuilder, QueueDeclaration};
use super::stats::StatsRegistry;
use super::strategy::{DecoupledStrategy, TransactionalStrategy, generate_worker_id};
use super::taskward::{
BackoffConfig, Bulkhead, BulkheadConfig, ConcurrencyLimit, PanicPolicy, TracingListener,
WorkerBuilder,
};
use super::types::{Partitions, WorkerTuning};
use super::workers::processor::{PartitionProcessor, ProcessorReport};
use crate::Db;
pub struct SpawnContext {
pub pid: i64,
pub db: Db,
pub cancel: CancellationToken,
pub partition_notify: Arc<Notify>,
pub processor_sem: Arc<Semaphore>,
pub start_notify: Arc<Notify>,
#[allow(dead_code)]
pub outbox: Arc<Outbox>,
pub stats_registry: Option<Arc<std::sync::Mutex<StatsRegistry>>>,
pub tuning: WorkerTuning,
}
pub trait ProcessorFactory: Send {
fn spawn(&self, ctx: SpawnContext) -> (String, Pin<Box<dyn Future<Output = ()> + Send>>);
}
fn build_processor_worker<S: super::strategy::ProcessingStrategy + 'static>(
ctx: &SpawnContext,
strategy: S,
) -> (String, Pin<Box<dyn Future<Output = ()> + Send>>) {
let processor = PartitionProcessor::new(strategy, ctx.pid, ctx.tuning.clone(), ctx.db.clone());
let name = format!("processor-{}", ctx.pid);
let (poker_notify, _poker_handle) =
super::taskward::poker(ctx.tuning.idle_interval, ctx.cancel.clone());
let mut builder = WorkerBuilder::<ProcessorReport>::new(&name, ctx.cancel.clone())
.pacing(&ctx.tuning)
.notifier(poker_notify)
.notifier(Arc::clone(&ctx.partition_notify))
.notifier(Arc::clone(&ctx.start_notify))
.bulkhead(Bulkhead::new(
&name,
BulkheadConfig {
semaphore: ConcurrencyLimit::Fixed(Arc::clone(&ctx.processor_sem)),
backoff: BackoffConfig::default(),
},
))
.listener(TracingListener)
.on_panic(PanicPolicy::CatchAndRetry);
builder = super::manager::register_stats(
builder,
ctx.stats_registry.as_ref(),
"processor",
Box::new(|any| {
any.downcast_ref::<ProcessorReport>()
.map_or(0, |r| u64::from(r.messages_processed))
}),
);
let worker = builder.build(processor);
(name, Box::pin(worker.run()))
}
#[must_use = "a queue builder does nothing until a handler is registered via .transactional() or .decoupled()"]
pub struct QueueBuilder {
builder: OutboxBuilder,
name: String,
partitions: Partitions,
}
impl QueueBuilder {
pub(crate) fn new(builder: OutboxBuilder, name: String, partitions: Partitions) -> Self {
Self {
builder,
name,
partitions,
}
}
#[must_use]
pub fn transactional(
self,
handler: impl TransactionalMessageHandler + 'static,
) -> OutboxBuilder {
self.register_transactional(PerMessageAdapter::new(handler), true)
}
#[must_use]
pub fn decoupled(self, handler: impl MessageHandler + 'static) -> OutboxBuilder {
self.register_decoupled(PerMessageAdapter::new(handler), true, None)
}
#[must_use]
pub fn decoupled_with(
self,
handler: impl MessageHandler + 'static,
config: super::types::DecoupledConfig,
) -> OutboxBuilder {
let super::types::DecoupledConfig { lease_duration } = config;
self.register_decoupled(PerMessageAdapter::new(handler), true, Some(lease_duration))
}
#[must_use]
pub fn batch_transactional(
self,
handler: impl TransactionalHandler + 'static,
) -> OutboxBuilder {
self.register_transactional(handler, false)
}
fn register_transactional(
self,
handler: impl TransactionalHandler + 'static,
per_message: bool,
) -> OutboxBuilder {
let factory = TransactionalProcessorFactory {
handler: Arc::new(handler),
per_message,
};
let mut builder = self.builder;
builder.queue_declarations.push(QueueDeclaration {
name: self.name,
partitions: self.partitions,
factory: Box::new(factory),
});
builder
}
#[must_use]
pub fn batch_decoupled(self, handler: impl Handler + 'static) -> OutboxBuilder {
self.register_decoupled(handler, false, None)
}
fn register_decoupled(
self,
handler: impl Handler + 'static,
per_message: bool,
lease_duration_override: Option<Duration>,
) -> OutboxBuilder {
let factory = DecoupledProcessorFactory {
handler: Arc::new(handler),
queue_name: self.name.clone(),
per_message,
lease_duration_override,
};
let mut builder = self.builder;
builder.queue_declarations.push(QueueDeclaration {
name: self.name,
partitions: self.partitions,
factory: Box::new(factory),
});
builder
}
#[must_use]
pub fn batch_decoupled_with(
self,
handler: impl Handler + 'static,
config: super::types::DecoupledConfig,
) -> OutboxBuilder {
let super::types::DecoupledConfig { lease_duration } = config;
self.register_decoupled(handler, false, Some(lease_duration))
}
}
struct TransactionalProcessorFactory<H: TransactionalHandler> {
handler: Arc<H>,
per_message: bool,
}
impl<H: TransactionalHandler + 'static> ProcessorFactory for TransactionalProcessorFactory<H> {
fn spawn(&self, mut ctx: SpawnContext) -> (String, Pin<Box<dyn Future<Output = ()> + Send>>) {
if self.per_message {
ctx.tuning.batch_size = 1;
}
let strategy = TransactionalStrategy::new(Box::new(ArcTransactionalHandler(Arc::clone(
&self.handler,
))));
build_processor_worker(&ctx, strategy)
}
}
struct DecoupledProcessorFactory<H: Handler> {
handler: Arc<H>,
queue_name: String,
per_message: bool,
lease_duration_override: Option<Duration>,
}
impl<H: Handler + 'static> ProcessorFactory for DecoupledProcessorFactory<H> {
fn spawn(&self, mut ctx: SpawnContext) -> (String, Pin<Box<dyn Future<Output = ()> + Send>>) {
if self.per_message {
ctx.tuning.batch_size = 1;
}
if let Some(ld) = self.lease_duration_override {
ctx.tuning.lease_duration = ld;
}
let worker_id = generate_worker_id(&self.queue_name);
let strategy =
DecoupledStrategy::new(Box::new(ArcHandler(Arc::clone(&self.handler))), worker_id);
build_processor_worker(&ctx, strategy)
}
}
struct ArcTransactionalHandler<H: TransactionalHandler>(Arc<H>);
#[async_trait::async_trait]
impl<H: TransactionalHandler> TransactionalHandler for ArcTransactionalHandler<H> {
async fn handle(
&self,
txn: &dyn sea_orm::ConnectionTrait,
msgs: &[super::handler::OutboxMessage],
cancel: CancellationToken,
) -> super::handler::HandlerResult {
self.0.handle(txn, msgs, cancel).await
}
fn processed_count(&self) -> Option<usize> {
self.0.processed_count()
}
}
struct ArcHandler<H: Handler>(Arc<H>);
#[async_trait::async_trait]
impl<H: Handler> Handler for ArcHandler<H> {
async fn handle(
&self,
msgs: &[super::handler::OutboxMessage],
cancel: CancellationToken,
) -> super::handler::HandlerResult {
self.0.handle(msgs, cancel).await
}
fn processed_count(&self) -> Option<usize> {
self.0.processed_count()
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
#[test]
fn partitions_count() {
assert_eq!(Partitions::of(1).count(), 1);
assert_eq!(Partitions::of(2).count(), 2);
assert_eq!(Partitions::of(4).count(), 4);
assert_eq!(Partitions::of(8).count(), 8);
assert_eq!(Partitions::of(16).count(), 16);
assert_eq!(Partitions::of(32).count(), 32);
assert_eq!(Partitions::of(64).count(), 64);
}
}