1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
//! Manager フェーズの処理
//!
//! - should_run_manager(): Manager 起動判定
//! - run_manager(): Manager 実行(Batch 対応、複数Manager並列処理)
//! - generate_exploration_guidances(): Manager なしでの Guidance 自動生成
use std::sync::Arc;
use tracing::debug;
use crate::agent::{DecisionResponse, Guidance, ManagementStrategy};
use crate::exploration::{GraphMap, ProviderContext};
use crate::types::WorkerId;
use super::Orchestrator;
impl Orchestrator {
/// Manager 起動判定
pub(super) fn should_run_manager(&self) -> bool {
if self.managers.is_empty() {
return false;
}
let current_tick = self.state.shared.tick;
match &self.config.management_strategy {
ManagementStrategy::EveryTick => true,
ManagementStrategy::FixedInterval { interval } => {
current_tick > 0 && current_tick.is_multiple_of(*interval)
}
ManagementStrategy::CompletionBased { max_wait_ticks } => {
// Note: Worker の状態は WorkResult で Orchestrator が管理するため、
// ここでは履歴の最新エントリから判定する(idle アクションの検出)
let all_idle = self.state.workers.iter().all(|ctx| {
ctx.history
.latest()
.map(|e| e.action_name.ends_with(":idle"))
.unwrap_or(true) // 履歴がない場合は idle 扱い
});
// 全Managerの最小last_tickを使用
let min_last_tick = self
.managers
.iter()
.map(|m| self.last_manager_ticks.get(&m.id()).copied().unwrap_or(0))
.min()
.unwrap_or(0);
all_idle || (current_tick - min_last_tick) >= *max_wait_ticks
}
ManagementStrategy::Hybrid {
preferred_interval,
force_after_ticks,
} => {
// 全Managerの最小last_tickを使用
let min_last_tick = self
.managers
.iter()
.map(|m| self.last_manager_ticks.get(&m.id()).copied().unwrap_or(0))
.min()
.unwrap_or(0);
let ticks_since_last = current_tick - min_last_tick;
ticks_since_last >= *preferred_interval || ticks_since_last >= *force_after_ticks
}
ManagementStrategy::EscalationBased {
interval,
immediate_on_escalation,
} => {
// 定期起動チェック
let periodic = current_tick > 0 && current_tick.is_multiple_of(*interval);
// Escalation チェック
let has_escalation = if *immediate_on_escalation {
self.state
.workers
.iter()
.any(|ctx| ctx.escalation.is_some())
} else {
false
};
periodic || has_escalation
}
}
}
/// Manager を実行(Batch 対応、複数Manager並列処理)
///
/// Note: should_run_manager() で起動判定済みの前提
/// Returns: ManagerPhaseSnapshot for recording
///
/// # フロー
///
/// ```text
/// 1. Analyzer.analyze(state) → TaskContext
/// 2. Manager.prepare(context) → BatchDecisionRequest
/// 3. BatchInvoker.invoke(request) → Response
/// 4. Manager.finalize(context, responses) → ManagementDecision
/// ```
pub(super) fn run_manager(&mut self) -> crate::state::ManagerPhaseSnapshot {
let current_tick = self.state.shared.tick;
// Phase 0: Analyzer で TaskContext を生成
let mut context = self.analyzer.analyze(&self.state);
// ========================================================================
// ExplorationSpaceV2 でのノード選択(新アーキテクチャ)
// ========================================================================
let worker_count = self.state.workers.len();
// AdaptiveProvider: Selection を動的に切り替え
// エラー率に応じて UCB1 → Greedy/Thompson を自動選択
if let Some(ref mut space_v2) = self.space_v2 {
// SharedState.stats を使用(Single Source of Truth)
let empty_map = GraphMap::new();
let ctx = ProviderContext::new(&empty_map, &self.state.shared.stats);
self.operator_provider
.reevaluate(space_v2.operator_mut(), &ctx);
}
let v2_guidances: Vec<crate::agent::Guidance> = if let Some(ref space_v2) = self.space_v2 {
// select_nodes() で MapNode を直接取得(SwarmStats を渡す)
let nodes = space_v2.select_nodes(worker_count, &self.state.shared.stats);
// MapNode → Guidance に直接変換
let guidances: Vec<crate::agent::Guidance> = nodes
.iter()
.map(|node| crate::agent::Guidance::from(*node))
.collect();
debug!(
selected_count = nodes.len(),
actions = ?guidances.iter().map(|g| g.actions.first().map(|a| &a.name)).collect::<Vec<_>>(),
frontiers = space_v2.frontiers().len(),
operator = space_v2.operator_name(),
"ExplorationSpaceV2: nodes selected and converted to Guidance"
);
// 完了判定
if space_v2.is_complete() {
debug!(
exhausted = space_v2.is_exhausted(),
completed = space_v2.has_completed(),
"ExplorationSpaceV2: exploration complete"
);
}
guidances
} else {
vec![]
};
// V2 Guidance が生成されていれば、context に設定
if !v2_guidances.is_empty() {
context.v2_guidances = Some(v2_guidances);
}
// 前回の Guidance を TaskContext に注入
// Manager.prepare() でこれを使って ResolvedContext に ManagerInstruction を埋め込む
if !self.current_guidances.is_empty() {
context.previous_guidances = self.current_guidances.clone();
debug!(
count = context.previous_guidances.len(),
"Injected previous guidances into TaskContext"
);
}
// 全 Manager から Guidance を収集してマージ
let mut merged_guidances: std::collections::HashMap<WorkerId, Guidance> =
std::collections::HashMap::new();
// スナップショット用:最後のManagerのデータを記録
let mut last_batch_request = crate::agent::BatchDecisionRequest {
manager_id: crate::agent::ManagerId(0),
requests: vec![],
};
let mut last_responses: Vec<(WorkerId, DecisionResponse)> = vec![];
let mut llm_errors = 0u64;
for manager in &self.managers {
// Partitioning: この Manager に割り当てられた Worker のみの context を作成
let manager_context =
if let Some(assigned_workers) = self.get_assigned_workers(manager.id()) {
context.filter_for_workers(&assigned_workers)
} else {
context.clone()
};
// Phase 1: TaskContext を見て Batch リクエスト生成
let batch_request = manager.prepare(&manager_context);
// Phase 2: LLM Batch Call
let (responses, batch_llm_errors): (Vec<(WorkerId, DecisionResponse)>, u64) =
if let Some(invoker) = &self.batch_invoker {
// BatchInvoker を使用(Extensions を渡す)
let mut error_count = 0u64;
// Worker ID -> query のマッピングを作成(エラー時のprompt保持用)
let query_map: std::collections::HashMap<WorkerId, String> = batch_request
.requests
.iter()
.map(|req| (req.worker_id, req.query.clone()))
.collect();
let responses: Vec<(WorkerId, DecisionResponse)> = invoker
.invoke(batch_request.clone(), &self.state.shared.extensions)
.into_iter()
.map(|(worker_id, result)| {
let response = result.unwrap_or_else(|e| {
error_count += 1;
eprintln!(
"[LLM Error] W{}: Batch invoke error: {}",
worker_id.0, e
);
// エラー時もpromptを保持(スナップショット表示用)
DecisionResponse {
prompt: query_map.get(&worker_id).cloned(),
..Default::default()
}
});
(worker_id, response)
})
.collect();
// LLM 呼び出しを ActionEvent として記録
let current_tick = self.state.shared.tick;
for (worker_id, response) in &responses {
// エラー時は raw_response が None(DecisionResponse::default())
let success = response.raw_response.is_some();
let result = if success {
crate::events::ActionEventResult::success()
} else {
crate::events::ActionEventResult::failure("llm_error")
};
// LLM 呼び出し結果の概要を metadata に追加
// Note: prompt/response の全文が必要な場合は LlmDebugChannel を使用
let context = crate::events::ActionContext::new()
.with_metadata("tool", response.tool.clone())
.with_metadata("target", response.target.clone())
.with_metadata("confidence", response.confidence.to_string());
let event = crate::events::ActionEventBuilder::new(
current_tick,
*worker_id, // Worker ID を使用(どの Worker 向けの LLM 呼び出しか)
"llm_invoke",
)
.result(result)
.context(context)
.build();
self.state.shared.stats.record(&event);
if let Some(ref collector) = self.action_collector {
collector.record(event);
}
}
(responses, error_count)
} else {
// BatchInvoker なし: デフォルトレスポンス
let responses = batch_request
.requests
.iter()
.map(|req| (req.worker_id, DecisionResponse::default()))
.collect();
(responses, 0)
};
llm_errors += batch_llm_errors;
// スナップショット用に保存
last_batch_request = batch_request;
last_responses = responses.clone();
// Phase 3: レスポンス処理(TaskContext と responses を渡す)
let decision = manager.finalize(&manager_context, responses);
// Guidance をマージ(後のManagerが優先、または actions を結合)
for (worker_id, guidance) in decision.guidances {
merged_guidances
.entry(worker_id)
.and_modify(|existing| {
// 既存の Guidance に新しい actions を追加
existing.actions.extend(guidance.actions.clone());
// content は後のManagerで上書き
if guidance.content.is_some() {
existing.content = guidance.content.clone();
}
// props はマージ
existing.props.extend(guidance.props.clone());
})
.or_insert(guidance);
}
// 戦略更新(最後のManagerの決定が優先)
if let Some(new_strategy) = decision.strategy_update {
self.config.management_strategy = new_strategy;
}
// Manager の last_tick を更新
self.last_manager_ticks.insert(manager.id(), current_tick);
}
// マージした Guidance を Arc でラップして保存(クローン時のディープコピー回避)
self.current_guidances = merged_guidances
.iter()
.map(|(id, g)| (*id, Arc::new(g.clone())))
.collect();
// スナップショットを返す
crate::state::ManagerPhaseSnapshot {
batch_request: last_batch_request,
responses: last_responses,
guidances: merged_guidances,
llm_errors,
}
}
/// Manager なしで ExplorationSpaceV2 から Guidance を自動生成
///
/// Manager が起動しない Tick でも、ExplorationSpaceV2 があれば
/// 探索ノードから Guidance を生成して Worker に割り当てる。
/// これにより LLM 呼び出しなしで探索を継続できる。
pub(super) fn generate_exploration_guidances(&mut self) {
// Skip guidance generation if termination is pending
if self.termination_judge.should_skip_guidance() {
debug!("Skipping guidance generation: termination pending");
return;
}
let worker_count = self.state.workers.len();
// ExplorationSpaceV2 がない場合は何もしない
let Some(ref mut space_v2) = self.space_v2 else {
return;
};
// AdaptiveProvider: Selection を動的に切り替え
// SharedState.stats を使用(Single Source of Truth)
let empty_map = GraphMap::new();
let ctx = ProviderContext::new(&empty_map, &self.state.shared.stats);
self.operator_provider
.reevaluate(space_v2.operator_mut(), &ctx);
// select_nodes() で MapNode を取得し、Guidance に変換(SwarmStats を渡す)
let nodes = space_v2.select_nodes(worker_count, &self.state.shared.stats);
let guidances: Vec<Guidance> = nodes.iter().map(|node| Guidance::from(*node)).collect();
if guidances.is_empty() {
return;
}
debug!(
selected_count = nodes.len(),
actions = ?guidances.iter().map(|g| g.actions.first().map(|a| &a.name)).collect::<Vec<_>>(),
"ExplorationSpaceV2: auto-generated guidances (no Manager)"
);
// Worker に Guidance を割り当て
// Manager の場合と同様に、各 Worker に1つずつ割り当て
let mut new_guidances = std::collections::HashMap::new();
for (i, guidance) in guidances.into_iter().enumerate() {
if i < worker_count {
new_guidances.insert(WorkerId(i), Arc::new(guidance));
}
}
// current_guidances を更新
self.current_guidances = new_guidances;
}
}