use anyhow::Result;
use tracing::{info, warn};
use crate::runtime::events::{Event, EventBuilder, EventKind, TaskId, WorkerId};
use crate::runtime::scheduler::runner::TeamRunner;
use crate::runtime::scheduler::task::{Task, TaskState};
use crate::runtime::worker::WorkerSpec;
fn task_extra_string_list(task: &Task, key: &str) -> Vec<String> {
task.extra
.get(key)
.and_then(|value| value.as_array())
.map(|items| {
items
.iter()
.filter_map(|item| item.as_str().map(str::to_string))
.collect()
})
.unwrap_or_default()
}
fn task_extra_u64(task: &Task, key: &str) -> Option<u64> {
task.extra.get(key).and_then(|value| value.as_u64())
}
fn task_dispatch_context(task: &Task) -> Option<String> {
let mut lines = Vec::new();
if let Some(budget_secs) = task_extra_u64(task, "budget_secs") {
lines.push(format!("Budget: {budget_secs} seconds"));
}
if !task.read_set.is_empty() {
lines.push(format!("Read set: {}", task.read_set.join(", ")));
}
if !task.write_set.is_empty() {
lines.push(format!("Write set: {}", task.write_set.join(", ")));
}
(!lines.is_empty()).then(|| lines.join("\n"))
}
impl TeamRunner {
pub fn seed_task(&mut self, description: &str) {
let task = Task::new("task-1", "seed").with_description(description);
self.claim_store.insert(task.clone());
self.manifest.tasks.push(task);
}
pub fn seed_tasks(&mut self, descriptions: Vec<(String, String)>) {
for (task_id, description) in descriptions {
let task = Task::new(&task_id, "seed").with_description(&description);
self.claim_store.insert(task.clone());
self.manifest.tasks.push(task);
}
}
pub async fn dispatch_to_workers(&mut self, worker_specs: &[WorkerSpec]) -> Result<()> {
let busy_workers: std::collections::HashSet<String> = self
.claim_store
.tasks()
.values()
.filter(|t| matches!(t.state, TaskState::Claimed | TaskState::Running))
.filter_map(|t| t.owner.clone())
.collect();
let mut available_workers: Vec<&WorkerSpec> = worker_specs
.iter()
.filter(|w| !busy_workers.contains(&w.name))
.filter(|w| !self.dead_workers.contains(&w.name))
.collect();
if available_workers.is_empty() {
return Ok(());
}
let ready_ids: Vec<String> = {
let ready = self.claim_store.ready_tasks();
ready.iter().map(|t| t.id.clone()).collect()
};
for task_id in ready_ids {
if available_workers.is_empty() {
break;
}
let task = match self.claim_store.get(&task_id) {
Some(t) => t.clone(),
None => continue,
};
let assigned_idx = self.pick_worker_for_task(&task, &available_workers);
let Some(idx) = assigned_idx else {
warn!(
task = %task.id,
write_set = ?task.write_set,
"Task dispatch blocked by active ownership conflicts"
);
continue;
};
let worker = available_workers.remove(idx);
if !self.claim_store.claim(&task_id, &worker.name) {
continue;
}
self.stale_task_owners.remove(&task_id);
if let Some(task_ref) = self.claim_store.get(&task_id) {
self.ownership.register_task(task_ref);
}
let lease_secs = self
.claim_store
.tasks()
.get(&task_id)
.and_then(|t| t.lease_expires)
.and_then(|dt| u64::try_from(dt.timestamp()).ok())
.unwrap_or(300);
let claimed_event = EventBuilder::new(self.run_id.clone()).task_claimed(
TaskId(task_id.clone()),
WorkerId(worker.name.clone()),
lease_secs,
)?;
self.event_writer.append(&claimed_event).await?;
let started_event = Event::new(self.run_id.clone(), EventKind::TaskStarted)
.with_actor(&worker.name)
.with_payload(serde_json::json!({
"task_id": task_id,
"worker_id": worker.name,
"budget_secs": task_extra_u64(&task, "budget_secs"),
}))?;
self.event_writer.append(&started_event).await?;
self.claim_store.start(&task_id, &worker.name);
let worker_task = crate::runtime::worker::WorkerTask {
id: task_id.clone(),
task: task.description.clone(),
acceptance_criteria: task_extra_string_list(&task, "acceptance"),
context: task_dispatch_context(&task),
budget_secs: task_extra_u64(&task, "budget_secs"),
};
worker.send_task(&worker_task).await?;
info!(task = %task_id, worker = %worker.name, "Dispatched task to worker");
}
Ok(())
}
fn pick_worker_for_task(
&self,
task: &Task,
available_workers: &[&WorkerSpec],
) -> Option<usize> {
let stale_owner = self.stale_task_owners.get(&task.id);
self.pick_non_conflicting_worker(task, available_workers, stale_owner, false)
.or_else(|| {
self.pick_non_conflicting_worker(task, available_workers, stale_owner, true)
})
}
fn pick_non_conflicting_worker(
&self,
task: &Task,
available_workers: &[&WorkerSpec],
stale_owner: Option<&String>,
allow_stale_owner: bool,
) -> Option<usize> {
for (idx, worker) in available_workers.iter().enumerate() {
if !allow_stale_owner && stale_owner.is_some_and(|owner| owner == &worker.name) {
continue;
}
let conflicts = self.ownership.would_conflict(task, &worker.name);
if conflicts.is_empty() {
return Some(idx);
}
warn!(
task = %task.id,
worker = %worker.name,
conflicts = ?conflicts,
"Ownership conflict detected"
);
}
None
}
}