swarm_engine_core/orchestrator/
mod.rs1mod adapter;
35mod builder;
36mod config;
37mod execution;
38mod lifecycle;
39mod manager;
40mod merge;
41pub mod termination;
42
43pub use adapter::WorkResultAdapter;
44pub use builder::OrchestratorBuilder;
45pub use config::{SwarmConfig, SwarmResult};
46pub use termination::{TerminationConfig, TerminationJudge, TerminationVerdict};
47
48use std::sync::Arc;
49use std::time::Instant;
50
51use tracing::{debug, info};
52
53use crate::actions::ActionDef;
54use crate::agent::{Analyzer, BatchInvoker, DefaultAnalyzer, ManagerAgent, WorkerAgent};
55use crate::async_task::AsyncTaskSystem;
56use crate::error::SwarmError;
57use crate::events::{ActionEventPublisher, LearningEventChannel, LifecycleHook};
58use crate::exploration::{
59 ActionNodeData, AdaptiveOperatorProvider, ConfigurableSpace, DependencyGraphProvider,
60 MapNodeState, NodeRules, OperatorProvider,
61};
62use crate::state::SwarmState;
63use crate::types::{SwarmTask, WorkerId};
64
65pub struct Orchestrator {
67 pub(crate) state: SwarmState,
69 pub(crate) workers: Vec<Box<dyn WorkerAgent>>,
71 pub(crate) managers: Vec<Box<dyn ManagerAgent>>,
73 pub(crate) analyzer: Box<dyn Analyzer>,
75 pub(crate) batch_invoker: Option<Box<dyn BatchInvoker>>,
77 pub(crate) dependency_provider: Option<Box<dyn DependencyGraphProvider>>,
79 pub(crate) async_system: AsyncTaskSystem,
81 pub(crate) config: SwarmConfig,
83 pub(crate) termination_judge: TerminationJudge,
85 pub(crate) last_manager_ticks: std::collections::HashMap<crate::agent::ManagerId, u64>,
87 pub(crate) current_guidances: std::collections::HashMap<WorkerId, Arc<crate::agent::Guidance>>,
90 pub(crate) worker_assignments:
93 Option<std::collections::HashMap<crate::agent::ManagerId, Vec<WorkerId>>>,
94
95 pub(crate) space_v2: Option<ConfigurableSpace<NodeRules>>,
101
102 pub(crate) operator_provider: Box<dyn OperatorProvider<NodeRules>>,
107
108 pub(crate) action_collector: Option<ActionEventPublisher>,
110
111 pub(crate) learned_provider: Option<crate::learn::SharedLearnedProvider>,
113
114 pub(crate) lifecycle_hook: Option<Box<dyn LifecycleHook>>,
116}
117
118impl Orchestrator {
119 pub fn new(
121 workers: Vec<Box<dyn WorkerAgent>>,
122 config: SwarmConfig,
123 runtime: tokio::runtime::Handle,
124 ) -> Self {
125 let agent_count = workers.len();
126 let termination_config = TerminationConfig::with_max_ticks(config.max_ticks);
127 Self {
128 state: SwarmState::new(agent_count),
129 workers,
130 managers: Vec::new(),
131 analyzer: Box::new(DefaultAnalyzer::new()),
132 batch_invoker: None,
133 dependency_provider: None,
134 async_system: AsyncTaskSystem::new(runtime),
135 config,
136 termination_judge: TerminationJudge::new(termination_config, agent_count),
137 last_manager_ticks: std::collections::HashMap::new(),
138 current_guidances: std::collections::HashMap::new(),
139 worker_assignments: None,
140 space_v2: None,
141 operator_provider: Box::new(AdaptiveOperatorProvider::default()),
142 action_collector: None,
143 learned_provider: None,
144 lifecycle_hook: None,
145 }
146 }
147
148 pub fn with_analyzer(mut self, analyzer: Box<dyn Analyzer>) -> Self {
150 self.analyzer = analyzer;
151 self
152 }
153
154 pub fn add_manager(mut self, manager: Box<dyn ManagerAgent>) -> Self {
156 self.managers.push(manager);
157 self
158 }
159
160 pub fn with_batch_invoker(mut self, invoker: Box<dyn BatchInvoker>) -> Self {
162 self.batch_invoker = Some(invoker);
163 self
164 }
165
166 pub fn with_dependency_provider(mut self, provider: Box<dyn DependencyGraphProvider>) -> Self {
171 self.dependency_provider = Some(provider);
172 self
173 }
174
175 pub fn enable_partitioning(&mut self) {
185 if self.managers.is_empty() {
186 return;
187 }
188
189 let worker_count = self.workers.len();
190 let manager_count = self.managers.len();
191 let workers_per_manager = worker_count.div_ceil(manager_count);
192
193 let mut assignments = std::collections::HashMap::new();
194 let all_worker_ids: Vec<WorkerId> = (0..worker_count).map(WorkerId).collect();
195
196 for (i, manager) in self.managers.iter().enumerate() {
197 let start = i * workers_per_manager;
198 let end = ((i + 1) * workers_per_manager).min(worker_count);
199 let assigned: Vec<WorkerId> = all_worker_ids[start..end].to_vec();
200 assignments.insert(manager.id(), assigned);
201 }
202
203 self.worker_assignments = Some(assignments);
204 }
205
206 pub(crate) fn get_assigned_workers(
208 &self,
209 manager_id: crate::agent::ManagerId,
210 ) -> Option<Vec<WorkerId>> {
211 self.worker_assignments
212 .as_ref()
213 .and_then(|assignments| assignments.get(&manager_id).cloned())
214 }
215
216 pub fn dependency_graph(&self) -> Option<&crate::exploration::DependencyGraph> {
221 self.space_v2
222 .as_ref()
223 .and_then(|space| space.dependency_graph())
224 }
225
226 pub fn run_task(&mut self, task: SwarmTask) -> Result<SwarmResult, SwarmError> {
244 self.state.shared.extensions.insert(task.clone());
246
247 let actions: Vec<ActionDef> = self
249 .state
250 .shared
251 .extensions
252 .get::<crate::actions::ActionsConfig>()
253 .map(|cfg| cfg.all_actions().cloned().collect())
254 .unwrap_or_default();
255
256 self.ensure_exploration_space(&task.goal, &actions)?;
259
260 let initial_contexts: Vec<String> = task
263 .context
264 .get("initial_context")
265 .and_then(|v| v.as_array())
266 .map(|arr| {
267 arr.iter()
268 .filter_map(|v| v.as_str().map(|s| s.to_string()))
269 .collect()
270 })
271 .unwrap_or_else(|| {
272 task.context
274 .get("target_service")
275 .and_then(|v| v.as_str())
276 .map(|s| vec![s.to_string()])
277 .unwrap_or_default()
278 });
279
280 if let Some(ref mut space_v2) = self.space_v2 {
284 let root_id = space_v2.create_root(ActionNodeData::new("root"));
286 debug!(root_id = ?root_id, "ExplorationSpaceV2 root node created");
287
288 if !initial_contexts.is_empty() {
290 let ctx_refs: Vec<&str> = initial_contexts.iter().map(|s| s.as_str()).collect();
291 let results = space_v2.initialize(&ctx_refs);
292 info!(
293 initial_contexts = ?initial_contexts,
294 expanded_nodes = results.len(),
295 "ExplorationSpaceV2: initial nodes expanded via Rules"
296 );
297
298 if !results.is_empty() {
301 space_v2.map_mut().set_state(root_id, MapNodeState::Closed);
302 debug!(root_id = ?root_id, "ExplorationSpaceV2 root node closed after initialization");
303 }
304 }
305 }
306
307 Ok(self.run())
308 }
309
310 pub fn run(&mut self) -> SwarmResult {
312 let start = Instant::now();
313 let worker_count = self.workers.len();
314 info!(worker_count = worker_count, "system_start");
315
316 if let Some(ref mut hook) = self.lifecycle_hook {
318 hook.on_start(worker_count);
319 }
320
321 loop {
322 let tick_start = Instant::now();
323 let current_tick = self.state.shared.tick;
324
325 self.termination_judge.set_tick(current_tick);
327
328 info!(tick = current_tick, "tick_start");
330
331 LearningEventChannel::global().set_tick(current_tick);
333
334 {
336 let event = crate::events::ActionEventBuilder::new(
337 current_tick,
338 crate::types::WorkerId::MANAGER,
339 "tick_start",
340 )
341 .result(crate::events::ActionEventResult::success())
342 .build();
343 self.state.shared.stats.record(&event);
344 if let Some(ref collector) = self.action_collector {
345 collector.record(event);
346 }
347 }
348
349 self.collect_async_results();
351
352 if self.should_run_manager() {
356 let _ = self.run_manager();
357 } else {
358 self.generate_exploration_guidances();
361 }
362
363 let results = self.execute_workers();
365
366 self.merge_results(&results);
368
369 self.state.shared.shared_data.cleanup_env_entries();
371
372 self.state.advance_tick();
374 self.termination_judge.set_tick(self.state.shared.tick);
376
377 let elapsed = tick_start.elapsed();
379 if elapsed < self.config.tick_duration {
380 std::thread::sleep(self.config.tick_duration - elapsed);
381 }
382
383 let current_ns = elapsed.as_nanos() as u64;
386 let prev_avg = self.state.shared.avg_tick_duration_ns;
387 self.state.shared.avg_tick_duration_ns = if prev_avg == 0 {
388 current_ns } else {
390 (current_ns + 9 * prev_avg) / 10
391 };
392
393 let _ = LearningEventChannel::global().drain_sync();
396
397 {
399 let event = crate::events::ActionEventBuilder::new(
400 current_tick,
401 crate::types::WorkerId::MANAGER,
402 "tick_end",
403 )
404 .duration(elapsed)
405 .result(crate::events::ActionEventResult::success())
406 .context(
407 crate::events::ActionContext::new()
408 .with_metadata("duration_ns", elapsed.as_nanos().to_string()),
409 )
410 .build();
411 self.state.shared.stats.record(&event);
412 if let Some(ref collector) = self.action_collector {
413 collector.record(event);
414 }
415 }
416
417 info!(
419 tick = current_tick,
420 duration_ns = elapsed.as_nanos() as u64,
421 total_actions = self.state.shared.stats.total_visits(),
422 successful_actions = self.state.shared.stats.total_successes(),
423 failed_actions = self.state.shared.stats.total_failures(),
424 active_workers = self.workers.len() as u64,
425 "tick_complete"
426 );
427
428 if self.should_terminate() {
430 break;
431 }
432 }
433
434 let total_duration = start.elapsed();
435
436 info!(
438 total_ticks = self.state.shared.tick,
439 total_duration_ms = total_duration.as_millis() as u64,
440 "system_stop"
441 );
442
443 let result = SwarmResult {
444 total_ticks: self.state.shared.tick,
445 total_duration,
446 completed: true,
447 };
448
449 if let Some(ref mut hook) = self.lifecycle_hook {
451 hook.on_terminate(&self.state, &result);
452 }
453
454 result
455 }
456
457 pub fn request_terminate(&mut self) {
459 self.termination_judge.request_terminate("External request");
460 }
461
462 pub fn termination_judge(&self) -> &TerminationJudge {
464 &self.termination_judge
465 }
466
467 pub fn termination_judge_mut(&mut self) -> &mut TerminationJudge {
469 &mut self.termination_judge
470 }
471
472 pub fn state(&self) -> &SwarmState {
474 &self.state
475 }
476
477 pub fn async_system(&self) -> &AsyncTaskSystem {
479 &self.async_system
480 }
481
482 pub fn learned_provider(&self) -> Option<&crate::learn::SharedLearnedProvider> {
484 self.learned_provider.as_ref()
485 }
486}