pub mod approval;
pub mod concurrency;
pub mod execution_lock;
pub mod execution_queue;
use std::sync::Arc;
use sqlx::SqlitePool;
use crate::config::{ApprovalConfig, ConcurrencyConfig, ConcurrencyMode};
use crate::executor::resolve_executor;
use crate::models::trigger_attempt::TriggerAttemptStatus;
use crate::models::{ExecutionStatus, execution};
use crate::server::AppState;
pub enum BarrierOutcome {
Proceed,
Reject {
status: TriggerAttemptStatus,
reason: String,
},
Defer {
execution_id: String,
status: ExecutionStatus,
reason: String,
},
}
pub async fn on_execution_complete(
state: &Arc<AppState>,
hook_slug: &str,
concurrency: Option<ConcurrencyConfig>,
approval: Option<ApprovalConfig>,
) {
let pool = state.db.pool();
let Some(config) = &concurrency else {
let _ = execution_lock::release(pool, hook_slug).await;
return;
};
if config.mode != ConcurrencyMode::Queue {
let _ = execution_lock::release(pool, hook_slug).await;
return;
}
let next = execution_queue::peek_next(pool, hook_slug)
.await
.ok()
.flatten();
match next {
None => {
let _ = execution_lock::release(pool, hook_slug).await;
}
Some(queued) => {
if let Err(e) = execution_lock::hand_off(pool, hook_slug, &queued.execution_id).await {
tracing::warn!(hook_slug = %hook_slug, "failed to hand off lock: {e}");
return;
}
let _ = execution_queue::mark_ready(pool, &queued.id).await;
if approval::requires_approval(approval.as_ref()) {
if let Err(e) = execution::mark_pending_approval(pool, &queued.execution_id).await {
tracing::warn!(
execution_id = %queued.execution_id,
"failed to transition dequeued execution to pending_approval: {e}"
);
}
tracing::info!(
execution_id = %queued.execution_id,
hook_slug = %hook_slug,
"dequeued execution is awaiting approval"
);
return;
}
spawn_dequeued_task(
Arc::clone(state),
hook_slug.to_owned(),
queued.execution_id.clone(),
concurrency,
approval,
);
}
}
}
fn spawn_dequeued_task(
state: Arc<AppState>,
hook_slug: String,
execution_id: String,
concurrency: Option<ConcurrencyConfig>,
approval: Option<ApprovalConfig>,
) {
tokio::spawn(run_dequeued(
state,
hook_slug,
execution_id,
concurrency,
approval,
));
}
async fn run_dequeued(
state: Arc<AppState>,
hook_slug: String,
execution_id: String,
concurrency: Option<ConcurrencyConfig>,
approval: Option<ApprovalConfig>,
) {
let pool = state.db.pool();
let exec = match execution::get_by_id(pool, &execution_id).await {
Ok(e) => e,
Err(e) => {
tracing::warn!(execution_id = %execution_id, "failed to fetch dequeued execution: {e}");
let _ = execution_lock::release(pool, &hook_slug).await;
return;
}
};
let app_config = state.config.load();
let hook = match app_config.hooks.iter().find(|h| h.slug == hook_slug) {
Some(h) => h,
None => {
tracing::warn!(
hook_slug = %hook_slug,
"hook not found in config after dequeue, releasing lock"
);
let _ = execution_lock::release(pool, &hook_slug).await;
return;
}
};
let timeout = hook.timeout.unwrap_or(app_config.defaults.timeout);
let resolved_executor = resolve_executor(&hook.executor, &exec.request_payload);
let notification_config = hook.notification.clone();
let hook_snapshot = hook.clone();
let ctx = crate::executor::ExecutionContext {
execution_id: exec.id.clone(),
hook_slug: exec.hook_slug.clone(),
executor: resolved_executor,
env: hook.env.clone(),
cwd: hook.cwd.clone(),
timeout,
logs_dir: app_config.logs.dir.clone(),
payload_json: exec.request_payload.clone(),
http_client: Some(state.http_client.clone()),
};
let retry_config = crate::retry::resolve_retry_config(hook, &app_config.defaults.retries);
let pool_clone = pool.clone();
let execution_id_clone = exec.id.clone();
let result = crate::retry::run_with_retries(&pool_clone, ctx, &retry_config).await;
tracing::info!(
log_dir = %result.log_dir,
status = %result.status,
"dequeued execution completed"
);
if let Some(ref nc) = notification_config
&& let Ok(exec_record) =
crate::models::execution::get_by_id(&pool_clone, &execution_id_clone).await
{
crate::notification::send_notification(
&state.http_client,
nc,
&hook_snapshot,
&result,
&exec_record,
)
.await;
}
on_execution_complete(&state, &hook_slug, concurrency, approval).await;
}
pub async fn recover_barriers(pool: &SqlitePool) {
let now = crate::timestamp::now_utc();
match sqlx::query(
"UPDATE executions SET status = 'failed', completed_at = ? WHERE status = 'running'",
)
.bind(&now)
.execute(pool)
.await
{
Ok(r) if r.rows_affected() > 0 => {
tracing::info!(
count = r.rows_affected(),
"recovered stuck running executions"
);
}
Err(e) => tracing::warn!("failed to recover running executions: {e}"),
_ => {}
}
match sqlx::query(
"DELETE FROM execution_locks WHERE execution_id IN \
(SELECT id FROM executions WHERE status IN ('success', 'failed', 'timed_out', 'rejected', 'expired'))",
)
.execute(pool)
.await
{
Ok(r) if r.rows_affected() > 0 => {
tracing::info!(count = r.rows_affected(), "cleaned orphaned execution locks");
}
Err(e) => tracing::warn!("failed to clean orphaned locks: {e}"),
_ => {}
}
match sqlx::query(
"UPDATE execution_queue SET status = 'expired' \
WHERE status = 'waiting' AND execution_id IN \
(SELECT id FROM executions WHERE status IN ('rejected', 'expired', 'failed'))",
)
.execute(pool)
.await
{
Ok(r) if r.rows_affected() > 0 => {
tracing::info!(count = r.rows_affected(), "expired stale queue entries");
}
Err(e) => tracing::warn!("failed to expire stale queue entries: {e}"),
_ => {}
}
}