use std::sync::Arc;
use crate::step::{StepDef, StepResult, StepStatus};
use super::{ExecCtx, step_exec::execute_step_with_handler};
#[tracing::instrument(skip_all, fields(steps = steps.len()))]
pub(crate) async fn run_queued(
steps: &[StepDef],
queue: &Arc<majra::queue::ManagedQueue<StepDef>>,
ctx: &ExecCtx<'_>,
) -> Vec<StepResult> {
let total = steps.len();
tracing::info!(count = total, "enqueuing steps into managed queue");
for step in steps {
let task_id = queue
.enqueue(majra::queue::Priority::Normal, step.clone(), None)
.await;
tracing::debug!(step = %step.name, %task_id, "step enqueued");
}
let pool = majra::queue::ResourcePool {
gpu_count: 0,
vram_mb: 0,
};
let mut results = Vec::with_capacity(total);
loop {
let item = match queue.dequeue(&pool).await {
Some(item) => item,
None => {
if queue.queued_count().await == 0 && queue.running_count() == 0 {
tracing::debug!("queue drained, worker loop exiting");
break;
}
tokio::task::yield_now().await;
continue;
}
};
let task_id = item.id;
let step = &item.payload;
tracing::info!(step = %step.name, %task_id, "dequeued step for execution");
let result = execute_step_with_handler(
step,
ctx.handler,
ctx.event_sink,
ctx.flow,
ctx.metrics,
ctx.step_type_metrics,
ctx.progress_sink,
)
.await;
match result.status {
StepStatus::Completed => {
if let Err(e) = queue.complete(task_id) {
tracing::warn!(%task_id, error = %e, "failed to mark queue item complete");
}
tracing::debug!(%task_id, "queue item marked complete");
}
_ => {
if let Err(e) = queue.fail(task_id) {
tracing::warn!(%task_id, error = %e, "failed to mark queue item failed");
}
tracing::debug!(
%task_id,
status = %result.status,
error = result.error.as_deref().unwrap_or("none"),
"queue item marked failed",
);
}
}
results.push(result);
if results.len() >= total {
tracing::info!(
processed = results.len(),
"all steps processed, exiting worker loop"
);
break;
}
}
results
}