use tracing::debug;
use crate::agent::WorkResult;
use crate::state::CompletedAsyncTask;
use crate::types::WorkerId;
use super::Orchestrator;
impl Orchestrator {
pub(super) fn collect_async_results(&mut self) {
let results = self.async_system.collect_results();
let current_tick = self.state.shared.tick;
for result in results {
debug!(
"Async task {} ({}) completed: {:?}",
result.task_id.0, result.task_type, result.metadata.status
);
let mut worker_id = None;
for (idx, ctx) in self.state.workers.iter_mut().enumerate() {
if ctx.pending_tasks.contains(&result.task_id) {
worker_id = Some(WorkerId(idx));
ctx.complete_task(result.task_id);
break;
}
}
let completed = CompletedAsyncTask {
task_id: result.task_id,
worker_id,
task_type: result.task_type,
completed_at_tick: current_tick,
status: result.metadata.status,
error: result.metadata.error.map(|e| e.message),
};
self.state
.shared
.shared_data
.completed_async_tasks
.push(completed);
if let Some(payload) = result.payload {
if let Some(s) = payload.downcast_ref::<String>() {
let key = format!("async_payload:{}", result.task_id.0);
self.state
.shared
.shared_data
.kv
.insert(key, s.as_bytes().to_vec());
}
}
}
}
pub(super) fn execute_workers(&self) -> Vec<(usize, WorkResult)> {
use rayon::prelude::*;
self.workers
.par_iter()
.enumerate()
.map(|(i, worker)| {
let guidance = self.current_guidances.get(&worker.id());
let result = worker.think_and_act(&self.state, guidance.map(|g| g.as_ref()));
(i, result)
})
.collect()
}
}