use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use super::DurableProcessWorker;
use super::process::ProcessRegistry;
use crate::PluginError;
const PROCESS_WORK_POLL_INTERVAL: Duration = Duration::from_millis(400);
pub struct ProcessWorkRunner {
run_handle: Arc<dyn ProcessRunHandle>,
notify: Arc<Notify>,
}
impl ProcessWorkRunner {
pub fn new(run_handle: Arc<dyn ProcessRunHandle>) -> Self {
Self {
run_handle,
notify: Arc::new(Notify::new()),
}
}
pub fn inline(worker: DurableProcessWorker) -> Self {
Self::new(Arc::new(InlineProcessRunHandle::new(worker)))
}
pub fn poke_handle(&self) -> ProcessWorkPoke {
ProcessWorkPoke {
notify: Arc::clone(&self.notify),
}
}
pub fn spawn(self) -> ProcessWorkPoke {
let poke = self.poke_handle();
tokio::spawn(async move {
self.run().await;
});
poke
}
async fn run(self) {
self.drive("startup").await;
let mut poll = tokio::time::interval(PROCESS_WORK_POLL_INTERVAL);
poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = self.notify.notified() => {
self.drive("poke").await;
}
_ = poll.tick() => {
self.drive("poll").await;
}
}
}
}
async fn drive(&self, reason: &str) {
if let Err(err) = self.run_handle.claim_and_run_pending().await {
tracing::warn!("process work runner drive ({reason}) failed: {err}");
}
}
}
#[derive(Clone)]
pub struct ProcessWorkDriver {
registry: Arc<dyn ProcessRegistry>,
poke: ProcessWorkPoke,
}
impl ProcessWorkDriver {
pub fn new(registry: Arc<dyn ProcessRegistry>, poke: ProcessWorkPoke) -> Self {
Self { registry, poke }
}
pub fn process_registry(&self) -> Arc<dyn ProcessRegistry> {
Arc::clone(&self.registry)
}
pub fn poke_handle(&self) -> ProcessWorkPoke {
self.poke.clone()
}
}
#[derive(Clone)]
pub struct ProcessWorkPoke {
notify: Arc<Notify>,
}
impl ProcessWorkPoke {
pub fn poke(&self) {
self.notify.notify_one();
}
}
#[async_trait::async_trait]
pub trait ProcessRunHandle: Send + Sync {
async fn claim_and_run_pending(&self) -> Result<(), PluginError>;
}
pub struct InlineProcessRunHandle {
worker: DurableProcessWorker,
}
impl InlineProcessRunHandle {
pub fn new(worker: DurableProcessWorker) -> Self {
Self { worker }
}
}
#[async_trait::async_trait]
impl ProcessRunHandle for InlineProcessRunHandle {
async fn claim_and_run_pending(&self) -> Result<(), PluginError> {
self.worker.drive_pending_processes().await
}
}