1use std::collections::HashMap;
26use std::sync::atomic::{AtomicU64, Ordering};
27
28use crate::types::{Action, ActionParams, WorkerId};
29
30use std::sync::Arc;
31
32use super::batch::DecisionResponse;
33use super::manager::{
34 BatchDecisionRequest, ManagementDecision, ManagerAgent, ManagerId, WorkerDecisionRequest,
35};
36use super::worker::{FixedScopeStrategy, Guidance, ScopeStrategy, WorkerScope};
37use crate::context::{
38 ContextResolver, ContextStore, GlobalContext, ManagerContext, TaskContext,
39 WorkerContext as WorkerCtx,
40};
41
42#[derive(Clone)]
48pub struct DefaultManagerConfig {
49 pub process_interval_ticks: u64,
51 pub immediate_on_escalation: bool,
53 pub confidence_threshold: f64,
55 pub scope_strategy: Arc<dyn ScopeStrategy>,
57}
58
59impl std::fmt::Debug for DefaultManagerConfig {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("DefaultManagerConfig")
62 .field("process_interval_ticks", &self.process_interval_ticks)
63 .field("immediate_on_escalation", &self.immediate_on_escalation)
64 .field("confidence_threshold", &self.confidence_threshold)
65 .field("scope_strategy", &"<dyn ScopeStrategy>")
66 .finish()
67 }
68}
69
70impl Default for DefaultManagerConfig {
71 fn default() -> Self {
72 Self {
73 process_interval_ticks: 5,
74 immediate_on_escalation: true,
75 confidence_threshold: 0.3,
76 scope_strategy: Arc::new(FixedScopeStrategy::minimal()),
77 }
78 }
79}
80
81pub struct DefaultBatchManagerAgent {
93 id: ManagerId,
94 name: String,
95 config: DefaultManagerConfig,
96 last_process_tick: AtomicU64,
97}
98
99impl DefaultBatchManagerAgent {
100 pub fn new(id: ManagerId) -> Self {
102 Self {
103 id,
104 name: format!("DefaultManager_{}", id.0),
105 config: DefaultManagerConfig::default(),
106 last_process_tick: AtomicU64::new(0),
107 }
108 }
109
110 pub fn with_name(mut self, name: impl Into<String>) -> Self {
112 self.name = name.into();
113 self
114 }
115
116 pub fn with_config(mut self, config: DefaultManagerConfig) -> Self {
118 self.config = config;
119 self
120 }
121
122 pub fn with_interval(mut self, ticks: u64) -> Self {
124 self.config.process_interval_ticks = ticks;
125 self
126 }
127
128 fn build_context_store(&self, context: &TaskContext) -> ContextStore {
137 let candidates = self.build_candidates(context);
138
139 let mut global = GlobalContext::new(context.tick)
141 .with_max_ticks(context.get_i64("max_ticks").unwrap_or(100) as u64)
142 .with_progress(context.progress)
143 .with_success_rate(context.success_rate);
144
145 if let Some(task) = context.get_str("task") {
146 global = global.with_task(task);
147 }
148 if let Some(hint) = context.get_str("hint") {
149 global = global.with_hint(hint);
150 }
151
152 let mut store = ContextStore::new(context.tick);
154 store.global = global;
155
156 for (&worker_id, summary) in &context.workers {
158 let mut worker_ctx = WorkerCtx::new(worker_id)
159 .with_failures(summary.consecutive_failures)
160 .with_history_len(summary.history_len)
161 .with_escalation(summary.has_escalation)
162 .with_candidates(candidates.clone());
163
164 if let Some(ref action) = summary.last_action {
165 worker_ctx =
166 worker_ctx.with_last_action(action, summary.last_success.unwrap_or(false));
167 }
168
169 if let Some(ref output) = summary.last_output {
171 worker_ctx.metadata.insert(
172 "last_output".to_string(),
173 serde_json::Value::String(output.clone()),
174 );
175 }
176
177 store.workers.insert(worker_id, worker_ctx);
178 }
179
180 store.managers.insert(
182 self.id,
183 ManagerContext::new(self.id)
184 .with_name(&self.name)
185 .with_last_tick(self.last_process_tick.load(Ordering::Relaxed)),
186 );
187
188 for (worker_id, escalation) in &context.escalations {
190 store.escalations.push((*worker_id, escalation.clone()));
191 }
192
193 if let Some(ref actions) = context.available_actions {
195 store.actions = Some(actions.clone());
196 }
197
198 if let Some(task) = context.get_str("task") {
200 store = store.insert("task", task);
201 }
202 if let Some(hint) = context.get_str("hint") {
203 store = store.insert("hint", hint);
204 }
205
206 store
207 }
208 fn should_process(&self, context: &TaskContext) -> bool {
214 let tick = context.tick;
215 let last_tick = self.last_process_tick.load(Ordering::Relaxed);
216
217 if self.config.immediate_on_escalation && context.has_escalations() {
219 return true;
220 }
221
222 tick >= last_tick + self.config.process_interval_ticks
224 }
225
226 fn build_candidates(&self, context: &TaskContext) -> Vec<String> {
228 let all_actions = context
229 .available_actions
230 .as_ref()
231 .map(|cfg| cfg.all_action_names())
232 .unwrap_or_else(|| vec!["Continue".to_string()]);
233
234 let excluded_names: std::collections::HashSet<String> = context
236 .excluded_actions
237 .iter()
238 .filter_map(|s| s.split('(').next().map(|n| n.to_string()))
239 .collect();
240
241 let filtered: Vec<String> = all_actions
243 .into_iter()
244 .filter(|name| !excluded_names.contains(name))
245 .collect();
246
247 if filtered.is_empty() {
249 vec!["Continue".to_string()]
250 } else {
251 filtered
252 }
253 }
254
255 fn response_to_guidance(&self, response: &DecisionResponse) -> Guidance {
257 let action_name = if response.confidence < self.config.confidence_threshold {
259 "Continue"
260 } else {
261 &response.tool
262 };
263
264 let action = Action {
265 name: action_name.to_string(),
266 params: ActionParams {
267 target: if response.target.is_empty() {
268 None
269 } else {
270 Some(response.target.clone())
271 },
272 args: response.args.clone(),
273 data: Vec::new(),
274 },
275 };
276
277 Guidance {
278 actions: vec![action],
279 content: response.reasoning.clone(),
280 props: HashMap::new(),
281 exploration_target: None,
282 scope: WorkerScope::default(),
283 }
284 }
285
286 fn default_guidance(&self) -> Guidance {
288 Guidance {
289 actions: vec![Action {
290 name: "Continue".to_string(),
291 params: ActionParams::default(),
292 }],
293 content: None,
294 props: HashMap::new(),
295 exploration_target: None,
296 scope: WorkerScope::default(),
297 }
298 }
299}
300
301impl ManagerAgent for DefaultBatchManagerAgent {
302 fn prepare(&self, context: &TaskContext) -> BatchDecisionRequest {
303 if context.v2_guidances.is_some() {
305 return BatchDecisionRequest {
306 manager_id: self.id,
307 requests: vec![],
308 };
309 }
310
311 if !self.should_process(context) {
313 return BatchDecisionRequest {
314 manager_id: self.id,
315 requests: vec![],
316 };
317 }
318
319 let store = self.build_context_store(context);
321
322 let worker_ids: Vec<WorkerId> = context
324 .worker_ids()
325 .into_iter()
326 .filter(|id| !context.done_workers.contains(id))
327 .collect();
328
329 if worker_ids.is_empty() {
331 return BatchDecisionRequest {
332 manager_id: self.id,
333 requests: vec![],
334 };
335 }
336
337 let task_goal = context
339 .get_str("task")
340 .unwrap_or("Continue current work")
341 .to_string();
342
343 let requests: Vec<WorkerDecisionRequest> = worker_ids
345 .iter()
346 .map(|&worker_id| {
347 let scope = self
349 .config
350 .scope_strategy
351 .determine_scope(context, worker_id);
352
353 let mut resolved = ContextResolver::resolve_with_scope(&store, worker_id, &scope);
356
357 let mut instruction = super::worker::ManagerInstruction::new();
359
360 if let Some(prev_guidance) = context.previous_guidances.get(&worker_id) {
362 instruction = super::worker::ManagerInstruction::from_guidance(prev_guidance);
363 }
364
365 if instruction.has_content() {
366 resolved.manager_instruction = Some(instruction);
367 }
368
369 WorkerDecisionRequest {
370 worker_id,
371 query: task_goal.clone(),
372 context: resolved,
373 lora: None,
374 }
375 })
376 .collect();
377
378 BatchDecisionRequest {
379 manager_id: self.id,
380 requests,
381 }
382 }
383
384 fn finalize(
385 &self,
386 context: &TaskContext,
387 responses: Vec<(WorkerId, DecisionResponse)>,
388 ) -> ManagementDecision {
389 let tick = context.tick;
390
391 if let Some(ref v2_guidances) = context.v2_guidances {
393 let worker_ids = context.worker_ids();
394 let mut guidances = HashMap::new();
395
396 for (i, worker_id) in worker_ids.iter().enumerate() {
397 if context.done_workers.contains(worker_id) {
398 continue;
399 }
400
401 let mut guidance = v2_guidances
403 .get(i)
404 .cloned()
405 .unwrap_or_else(|| self.default_guidance());
406
407 guidance.scope = self
409 .config
410 .scope_strategy
411 .determine_scope(context, *worker_id);
412
413 guidances.insert(*worker_id, guidance);
414 }
415
416 return ManagementDecision {
417 guidances,
418 strategy_update: None,
419 async_tasks: vec![],
420 };
421 }
422
423 if responses.is_empty() {
425 let mut guidances = HashMap::new();
426 for worker_id in context.worker_ids().iter() {
427 let mut guidance = self.default_guidance();
428
429 guidance.scope = self
431 .config
432 .scope_strategy
433 .determine_scope(context, *worker_id);
434
435 guidances.insert(*worker_id, guidance);
436 }
437
438 return ManagementDecision {
439 guidances,
440 strategy_update: None,
441 async_tasks: vec![],
442 };
443 }
444
445 self.last_process_tick.store(tick, Ordering::Relaxed);
447
448 let mut guidances = HashMap::new();
450 for (worker_id, response) in responses.iter() {
451 let mut guidance = self.response_to_guidance(response);
452
453 guidance.scope = self
455 .config
456 .scope_strategy
457 .determine_scope(context, *worker_id);
458
459 guidances.insert(*worker_id, guidance);
460 }
461
462 ManagementDecision {
463 guidances,
464 strategy_update: None,
465 async_tasks: vec![],
466 }
467 }
468
469 fn id(&self) -> ManagerId {
470 self.id
471 }
472
473 fn name(&self) -> &str {
474 &self.name
475 }
476}
477
478pub struct DefaultBatchManagerAgentBuilder {
484 id: ManagerId,
485 name: Option<String>,
486 config: DefaultManagerConfig,
487}
488
489impl DefaultBatchManagerAgentBuilder {
490 pub fn new(id: ManagerId) -> Self {
491 Self {
492 id,
493 name: None,
494 config: DefaultManagerConfig::default(),
495 }
496 }
497
498 pub fn name(mut self, name: impl Into<String>) -> Self {
499 self.name = Some(name.into());
500 self
501 }
502
503 pub fn config(mut self, config: DefaultManagerConfig) -> Self {
504 self.config = config;
505 self
506 }
507
508 pub fn interval(mut self, ticks: u64) -> Self {
509 self.config.process_interval_ticks = ticks;
510 self
511 }
512
513 pub fn immediate_on_escalation(mut self, enabled: bool) -> Self {
514 self.config.immediate_on_escalation = enabled;
515 self
516 }
517
518 pub fn confidence_threshold(mut self, threshold: f64) -> Self {
519 self.config.confidence_threshold = threshold;
520 self
521 }
522
523 pub fn build(self) -> DefaultBatchManagerAgent {
524 let mut agent = DefaultBatchManagerAgent::new(self.id).with_config(self.config);
525
526 if let Some(name) = self.name {
527 agent = agent.with_name(name);
528 }
529
530 agent
531 }
532}
533
534#[cfg(test)]
539mod tests {
540 use super::*;
541 use crate::context::WorkerSummary;
542
543 fn sample_context() -> TaskContext {
544 TaskContext::new(10)
545 .with_worker(WorkerSummary::new(WorkerId(0)))
546 .with_worker(WorkerSummary::new(WorkerId(1)).with_escalation(true))
547 .with_success_rate(0.8)
548 .with_progress(0.5)
549 }
550
551 #[test]
552 fn test_default_manager_new() {
553 let manager = DefaultBatchManagerAgent::new(ManagerId(0));
554 assert_eq!(manager.id(), ManagerId(0));
555 assert_eq!(manager.name(), "DefaultManager_0");
556 }
557
558 #[test]
559 fn test_default_manager_with_name() {
560 let manager = DefaultBatchManagerAgent::new(ManagerId(1)).with_name("TestManager");
561 assert_eq!(manager.name(), "TestManager");
562 }
563
564 #[test]
565 fn test_prepare_with_context() {
566 let manager = DefaultBatchManagerAgent::new(ManagerId(0));
567 let context = sample_context();
568 let request = manager.prepare(&context);
569
570 assert_eq!(request.manager_id, ManagerId(0));
571 assert_eq!(request.requests.len(), 2); }
573
574 #[test]
575 fn test_finalize_empty_responses() {
576 let manager = DefaultBatchManagerAgent::new(ManagerId(0));
577 let context = sample_context();
578 let decision = manager.finalize(&context, vec![]);
579
580 assert_eq!(decision.guidances.len(), 2);
582 for guidance in decision.guidances.values() {
583 assert_eq!(guidance.actions.len(), 1);
584 assert_eq!(guidance.actions[0].name, "Continue");
585 }
586 }
587
588 #[test]
589 fn test_response_to_guidance() {
590 let manager = DefaultBatchManagerAgent::new(ManagerId(0));
591
592 let response = DecisionResponse {
593 tool: "Read".to_string(),
594 target: "/path/to/file".to_string(),
595 args: HashMap::new(),
596 reasoning: Some("Need to read file".to_string()),
597 confidence: 0.8,
598 prompt: None,
599 raw_response: None,
600 };
601
602 let guidance = manager.response_to_guidance(&response);
603 assert_eq!(guidance.actions.len(), 1);
604 assert_eq!(guidance.actions[0].name, "Read");
605 assert_eq!(
606 guidance.actions[0].params.target,
607 Some("/path/to/file".to_string())
608 );
609 }
610
611 #[test]
612 fn test_low_confidence_falls_back_to_continue() {
613 let manager = DefaultBatchManagerAgent::new(ManagerId(0));
614
615 let response = DecisionResponse {
616 tool: "Read".to_string(),
617 target: "/path".to_string(),
618 args: HashMap::new(),
619 reasoning: None,
620 confidence: 0.1, prompt: None,
622 raw_response: None,
623 };
624
625 let guidance = manager.response_to_guidance(&response);
626 assert_eq!(guidance.actions[0].name, "Continue");
627 }
628
629 #[test]
630 fn test_builder() {
631 let manager = DefaultBatchManagerAgentBuilder::new(ManagerId(2))
632 .name("CustomManager")
633 .interval(10)
634 .confidence_threshold(0.5)
635 .build();
636
637 assert_eq!(manager.id(), ManagerId(2));
638 assert_eq!(manager.name(), "CustomManager");
639 assert_eq!(manager.config.process_interval_ticks, 10);
640 assert!((manager.config.confidence_threshold - 0.5).abs() < 0.001);
641 }
642}