mod adapter;
mod builder;
mod config;
mod execution;
mod lifecycle;
mod manager;
mod merge;
pub mod termination;
pub use adapter::WorkResultAdapter;
pub use builder::OrchestratorBuilder;
pub use config::{SwarmConfig, SwarmResult};
pub use termination::{TerminationConfig, TerminationJudge, TerminationVerdict};
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, info};
use crate::actions::ActionDef;
use crate::agent::{Analyzer, BatchInvoker, DefaultAnalyzer, ManagerAgent, WorkerAgent};
use crate::async_task::AsyncTaskSystem;
use crate::error::SwarmError;
use crate::events::{ActionEventPublisher, LearningEventChannel, LifecycleHook};
use crate::exploration::{
ActionNodeData, AdaptiveOperatorProvider, ConfigurableSpace, DependencyGraphProvider,
MapNodeState, NodeRules, OperatorProvider,
};
use crate::state::SwarmState;
use crate::types::{SwarmTask, WorkerId};
pub struct Orchestrator {
pub(crate) state: SwarmState,
pub(crate) workers: Vec<Box<dyn WorkerAgent>>,
pub(crate) managers: Vec<Box<dyn ManagerAgent>>,
pub(crate) analyzer: Box<dyn Analyzer>,
pub(crate) batch_invoker: Option<Box<dyn BatchInvoker>>,
pub(crate) dependency_provider: Option<Box<dyn DependencyGraphProvider>>,
pub(crate) async_system: AsyncTaskSystem,
pub(crate) config: SwarmConfig,
pub(crate) termination_judge: TerminationJudge,
pub(crate) last_manager_ticks: std::collections::HashMap<crate::agent::ManagerId, u64>,
pub(crate) current_guidances: std::collections::HashMap<WorkerId, Arc<crate::agent::Guidance>>,
pub(crate) worker_assignments:
Option<std::collections::HashMap<crate::agent::ManagerId, Vec<WorkerId>>>,
pub(crate) space_v2: Option<ConfigurableSpace<NodeRules>>,
pub(crate) operator_provider: Box<dyn OperatorProvider<NodeRules>>,
pub(crate) action_collector: Option<ActionEventPublisher>,
pub(crate) learned_provider: Option<crate::learn::SharedLearnedProvider>,
pub(crate) lifecycle_hook: Option<Box<dyn LifecycleHook>>,
}
impl Orchestrator {
pub fn new(
workers: Vec<Box<dyn WorkerAgent>>,
config: SwarmConfig,
runtime: tokio::runtime::Handle,
) -> Self {
let agent_count = workers.len();
let termination_config = TerminationConfig::with_max_ticks(config.max_ticks);
Self {
state: SwarmState::new(agent_count),
workers,
managers: Vec::new(),
analyzer: Box::new(DefaultAnalyzer::new()),
batch_invoker: None,
dependency_provider: None,
async_system: AsyncTaskSystem::new(runtime),
config,
termination_judge: TerminationJudge::new(termination_config, agent_count),
last_manager_ticks: std::collections::HashMap::new(),
current_guidances: std::collections::HashMap::new(),
worker_assignments: None,
space_v2: None,
operator_provider: Box::new(AdaptiveOperatorProvider::default()),
action_collector: None,
learned_provider: None,
lifecycle_hook: None,
}
}
pub fn with_analyzer(mut self, analyzer: Box<dyn Analyzer>) -> Self {
self.analyzer = analyzer;
self
}
pub fn add_manager(mut self, manager: Box<dyn ManagerAgent>) -> Self {
self.managers.push(manager);
self
}
pub fn with_batch_invoker(mut self, invoker: Box<dyn BatchInvoker>) -> Self {
self.batch_invoker = Some(invoker);
self
}
pub fn with_dependency_provider(mut self, provider: Box<dyn DependencyGraphProvider>) -> Self {
self.dependency_provider = Some(provider);
self
}
pub fn enable_partitioning(&mut self) {
if self.managers.is_empty() {
return;
}
let worker_count = self.workers.len();
let manager_count = self.managers.len();
let workers_per_manager = worker_count.div_ceil(manager_count);
let mut assignments = std::collections::HashMap::new();
let all_worker_ids: Vec<WorkerId> = (0..worker_count).map(WorkerId).collect();
for (i, manager) in self.managers.iter().enumerate() {
let start = i * workers_per_manager;
let end = ((i + 1) * workers_per_manager).min(worker_count);
let assigned: Vec<WorkerId> = all_worker_ids[start..end].to_vec();
assignments.insert(manager.id(), assigned);
}
self.worker_assignments = Some(assignments);
}
pub(crate) fn get_assigned_workers(
&self,
manager_id: crate::agent::ManagerId,
) -> Option<Vec<WorkerId>> {
self.worker_assignments
.as_ref()
.and_then(|assignments| assignments.get(&manager_id).cloned())
}
pub fn dependency_graph(&self) -> Option<&crate::exploration::DependencyGraph> {
self.space_v2
.as_ref()
.and_then(|space| space.dependency_graph())
}
pub fn run_task(&mut self, task: SwarmTask) -> Result<SwarmResult, SwarmError> {
self.state.shared.extensions.insert(task.clone());
let actions: Vec<ActionDef> = self
.state
.shared
.extensions
.get::<crate::actions::ActionsConfig>()
.map(|cfg| cfg.all_actions().cloned().collect())
.unwrap_or_default();
self.ensure_exploration_space(&task.goal, &actions)?;
let initial_contexts: Vec<String> = task
.context
.get("initial_context")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_else(|| {
task.context
.get("target_service")
.and_then(|v| v.as_str())
.map(|s| vec![s.to_string()])
.unwrap_or_default()
});
if let Some(ref mut space_v2) = self.space_v2 {
let root_id = space_v2.create_root(ActionNodeData::new("root"));
debug!(root_id = ?root_id, "ExplorationSpaceV2 root node created");
if !initial_contexts.is_empty() {
let ctx_refs: Vec<&str> = initial_contexts.iter().map(|s| s.as_str()).collect();
let results = space_v2.initialize(&ctx_refs);
info!(
initial_contexts = ?initial_contexts,
expanded_nodes = results.len(),
"ExplorationSpaceV2: initial nodes expanded via Rules"
);
if !results.is_empty() {
space_v2.map_mut().set_state(root_id, MapNodeState::Closed);
debug!(root_id = ?root_id, "ExplorationSpaceV2 root node closed after initialization");
}
}
}
Ok(self.run())
}
pub fn run(&mut self) -> SwarmResult {
let start = Instant::now();
let worker_count = self.workers.len();
info!(worker_count = worker_count, "system_start");
if let Some(ref mut hook) = self.lifecycle_hook {
hook.on_start(worker_count);
}
loop {
let tick_start = Instant::now();
let current_tick = self.state.shared.tick;
self.termination_judge.set_tick(current_tick);
info!(tick = current_tick, "tick_start");
LearningEventChannel::global().set_tick(current_tick);
{
let event = crate::events::ActionEventBuilder::new(
current_tick,
crate::types::WorkerId::MANAGER,
"tick_start",
)
.result(crate::events::ActionEventResult::success())
.build();
self.state.shared.stats.record(&event);
if let Some(ref collector) = self.action_collector {
collector.record(event);
}
}
self.collect_async_results();
if self.should_run_manager() {
let _ = self.run_manager();
} else {
self.generate_exploration_guidances();
}
let results = self.execute_workers();
self.merge_results(&results);
self.state.shared.shared_data.cleanup_env_entries();
self.state.advance_tick();
self.termination_judge.set_tick(self.state.shared.tick);
let elapsed = tick_start.elapsed();
if elapsed < self.config.tick_duration {
std::thread::sleep(self.config.tick_duration - elapsed);
}
let current_ns = elapsed.as_nanos() as u64;
let prev_avg = self.state.shared.avg_tick_duration_ns;
self.state.shared.avg_tick_duration_ns = if prev_avg == 0 {
current_ns } else {
(current_ns + 9 * prev_avg) / 10
};
let _ = LearningEventChannel::global().drain_sync();
{
let event = crate::events::ActionEventBuilder::new(
current_tick,
crate::types::WorkerId::MANAGER,
"tick_end",
)
.duration(elapsed)
.result(crate::events::ActionEventResult::success())
.context(
crate::events::ActionContext::new()
.with_metadata("duration_ns", elapsed.as_nanos().to_string()),
)
.build();
self.state.shared.stats.record(&event);
if let Some(ref collector) = self.action_collector {
collector.record(event);
}
}
info!(
tick = current_tick,
duration_ns = elapsed.as_nanos() as u64,
total_actions = self.state.shared.stats.total_visits(),
successful_actions = self.state.shared.stats.total_successes(),
failed_actions = self.state.shared.stats.total_failures(),
active_workers = self.workers.len() as u64,
"tick_complete"
);
if self.should_terminate() {
break;
}
}
let total_duration = start.elapsed();
info!(
total_ticks = self.state.shared.tick,
total_duration_ms = total_duration.as_millis() as u64,
"system_stop"
);
let result = SwarmResult {
total_ticks: self.state.shared.tick,
total_duration,
completed: true,
};
if let Some(ref mut hook) = self.lifecycle_hook {
hook.on_terminate(&self.state, &result);
}
result
}
pub fn request_terminate(&mut self) {
self.termination_judge.request_terminate("External request");
}
pub fn termination_judge(&self) -> &TerminationJudge {
&self.termination_judge
}
pub fn termination_judge_mut(&mut self) -> &mut TerminationJudge {
&mut self.termination_judge
}
pub fn state(&self) -> &SwarmState {
&self.state
}
pub fn async_system(&self) -> &AsyncTaskSystem {
&self.async_system
}
pub fn learned_provider(&self) -> Option<&crate::learn::SharedLearnedProvider> {
self.learned_provider.as_ref()
}
}