1use std::sync::Arc;
9
10use chrono::{DateTime, Utc};
11use dashmap::DashMap;
12use tokio::sync::{Semaphore, watch};
13use tokio::task::JoinHandle;
14use tracing::{error, info, warn};
15
16use punch_memory::MemorySubstrate;
17use punch_runtime::{
18 FighterLoopParams, FighterLoopResult, LlmDriver, run_fighter_loop, tools_for_capabilities,
19};
20use punch_types::{
21 FighterId, FighterManifest, GorillaId, GorillaManifest, ModelConfig, PunchResult, WeightClass,
22};
23
24const DEFAULT_LLM_CONCURRENCY: usize = 3;
26
27struct GorillaTask {
29 handle: JoinHandle<()>,
30 #[allow(dead_code)]
31 started_at: DateTime<Utc>,
32}
33
34pub struct BackgroundExecutor {
36 tasks: DashMap<GorillaId, GorillaTask>,
38 llm_semaphore: Arc<Semaphore>,
40 _shutdown_tx: watch::Sender<bool>,
42 shutdown_rx: watch::Receiver<bool>,
44}
45
46pub fn fighter_manifest_from_gorilla(
49 manifest: &GorillaManifest,
50 default_model: &ModelConfig,
51) -> FighterManifest {
52 let model = manifest
53 .model
54 .clone()
55 .unwrap_or_else(|| default_model.clone());
56 let capabilities = manifest.effective_capabilities();
57 let weight_class = manifest.weight_class.unwrap_or(WeightClass::Middleweight);
58 let system_prompt = manifest.effective_system_prompt();
59
60 FighterManifest {
61 name: manifest.name.clone(),
62 description: format!("Autonomous gorilla: {}", manifest.name),
63 model,
64 system_prompt,
65 capabilities,
66 weight_class,
67 tenant_id: None,
68 }
69}
70
71pub async fn run_gorilla_tick(
74 gorilla_id: GorillaId,
75 manifest: &GorillaManifest,
76 default_model: &ModelConfig,
77 memory: &Arc<MemorySubstrate>,
78 driver: &Arc<dyn LlmDriver>,
79) -> PunchResult<FighterLoopResult> {
80 let fighter_manifest = fighter_manifest_from_gorilla(manifest, default_model);
81 let gorilla_name = &manifest.name;
82 let system_prompt = fighter_manifest.system_prompt.clone();
83
84 let autonomous_prompt = format!(
86 "[AUTONOMOUS TICK] You are {}. Review your memory, check your goals, and take the next action. {}",
87 gorilla_name, system_prompt
88 );
89
90 let fighter_id = FighterId::new();
92
93 if let Err(e) = memory
95 .save_fighter(
96 &fighter_id,
97 &fighter_manifest,
98 punch_types::FighterStatus::Idle,
99 )
100 .await
101 {
102 warn!(gorilla_id = %gorilla_id, error = %e, "failed to persist gorilla fighter");
103 }
104
105 let bout_id = memory.create_bout(&fighter_id).await?;
107
108 let available_tools = tools_for_capabilities(&fighter_manifest.capabilities);
109
110 let params = FighterLoopParams {
111 manifest: fighter_manifest,
112 user_message: autonomous_prompt,
113 bout_id,
114 fighter_id,
115 memory: Arc::clone(memory),
116 driver: Arc::clone(driver),
117 available_tools,
118 mcp_tools: Vec::new(),
119 max_iterations: Some(10),
120 context_window: None,
121 tool_timeout_secs: None,
122 coordinator: None,
123 approval_engine: None,
124 sandbox: None,
125 mcp_clients: None,
126 model_routing: None,
127 channel_notifier: None,
128 user_content_parts: vec![],
129 eco_mode: false,
130 };
131
132 run_fighter_loop(params).await
133}
134
135impl BackgroundExecutor {
136 pub fn new() -> Self {
138 let (shutdown_tx, shutdown_rx) = watch::channel(false);
139 Self {
140 tasks: DashMap::new(),
141 llm_semaphore: Arc::new(Semaphore::new(DEFAULT_LLM_CONCURRENCY)),
142 _shutdown_tx: shutdown_tx,
143 shutdown_rx,
144 }
145 }
146
147 pub fn with_shutdown(
149 shutdown_tx: watch::Sender<bool>,
150 shutdown_rx: watch::Receiver<bool>,
151 ) -> Self {
152 Self {
153 tasks: DashMap::new(),
154 llm_semaphore: Arc::new(Semaphore::new(DEFAULT_LLM_CONCURRENCY)),
155 _shutdown_tx: shutdown_tx,
156 shutdown_rx,
157 }
158 }
159
160 pub fn parse_schedule(schedule: &str) -> Option<std::time::Duration> {
167 let s = schedule.trim().to_lowercase();
168
169 if let Some(duration) = Self::parse_human_schedule(&s) {
171 return Some(duration);
172 }
173
174 if let Some(duration) = Self::parse_cron_schedule(&s) {
176 return Some(duration);
177 }
178
179 s.parse::<u64>().ok().map(std::time::Duration::from_secs)
181 }
182
183 fn parse_human_schedule(s: &str) -> Option<std::time::Duration> {
185 let s = s.strip_prefix("every ").unwrap_or(s);
186 let s = s.trim();
187
188 if let Some(num_str) = s.strip_suffix('s') {
189 num_str
190 .trim()
191 .parse::<u64>()
192 .ok()
193 .map(std::time::Duration::from_secs)
194 } else if let Some(num_str) = s.strip_suffix('m') {
195 num_str
196 .trim()
197 .parse::<u64>()
198 .ok()
199 .map(|m| std::time::Duration::from_secs(m * 60))
200 } else if let Some(num_str) = s.strip_suffix('h') {
201 num_str
202 .trim()
203 .parse::<u64>()
204 .ok()
205 .map(|h| std::time::Duration::from_secs(h * 3600))
206 } else if let Some(num_str) = s.strip_suffix('d') {
207 num_str
208 .trim()
209 .parse::<u64>()
210 .ok()
211 .map(|d| std::time::Duration::from_secs(d * 86400))
212 } else {
213 None
214 }
215 }
216
217 fn parse_cron_schedule(s: &str) -> Option<std::time::Duration> {
225 let fields: Vec<&str> = s.split_whitespace().collect();
226 if fields.len() != 5 {
227 return None;
228 }
229
230 let (minute, hour, day, _month, _dow) =
231 (fields[0], fields[1], fields[2], fields[3], fields[4]);
232
233 if let Some(step) = minute.strip_prefix("*/")
235 && hour == "*"
236 && day == "*"
237 && let Ok(n) = step.parse::<u64>()
238 {
239 return Some(std::time::Duration::from_secs(n * 60));
240 }
241
242 if minute == "0"
244 && let Some(step) = hour.strip_prefix("*/")
245 && day == "*"
246 && let Ok(n) = step.parse::<u64>()
247 {
248 return Some(std::time::Duration::from_secs(n * 3600));
249 }
250
251 if minute == "0"
253 && hour == "0"
254 && let Some(step) = day.strip_prefix("*/")
255 && let Ok(n) = step.parse::<u64>()
256 {
257 return Some(std::time::Duration::from_secs(n * 86400));
258 }
259
260 if minute == "0" && hour == "0" && day == "*" {
262 return Some(std::time::Duration::from_secs(86400));
263 }
264
265 if minute == "0" && day == "*" && hour.parse::<u64>().is_ok() {
267 return Some(std::time::Duration::from_secs(86400));
268 }
269
270 None
271 }
272
273 pub fn start_gorilla(
282 &self,
283 id: GorillaId,
284 manifest: GorillaManifest,
285 default_model: ModelConfig,
286 memory: Arc<MemorySubstrate>,
287 driver: Arc<dyn LlmDriver>,
288 ) -> PunchResult<()> {
289 if self.tasks.contains_key(&id) {
290 return Err(punch_types::PunchError::Gorilla(format!(
291 "gorilla {} is already running",
292 id
293 )));
294 }
295
296 let interval = Self::parse_schedule(&manifest.schedule).unwrap_or_else(|| {
297 warn!(
298 gorilla_id = %id,
299 schedule = %manifest.schedule,
300 "could not parse schedule, defaulting to 5m"
301 );
302 std::time::Duration::from_secs(300)
303 });
304
305 let semaphore = Arc::clone(&self.llm_semaphore);
306 let mut shutdown_rx = self.shutdown_rx.clone();
307 let gorilla_name = manifest.name.clone();
308
309 let handle = tokio::spawn(async move {
310 info!(
311 gorilla_id = %id,
312 name = %gorilla_name,
313 interval_secs = interval.as_secs(),
314 "gorilla background task started"
315 );
316
317 let mut tasks_completed: u64 = 0;
318 let mut error_count: u64 = 0;
319
320 loop {
321 tokio::select! {
323 _ = tokio::time::sleep(interval) => {},
324 _ = shutdown_rx.changed() => {
325 if *shutdown_rx.borrow() {
326 info!(gorilla_id = %id, "gorilla received shutdown signal");
327 break;
328 }
329 }
330 }
331
332 if *shutdown_rx.borrow() {
334 break;
335 }
336
337 let _permit = match semaphore.acquire().await {
339 Ok(permit) => permit,
340 Err(_) => {
341 warn!(gorilla_id = %id, "semaphore closed, stopping gorilla");
342 break;
343 }
344 };
345
346 match run_gorilla_tick(id, &manifest, &default_model, &memory, &driver).await {
347 Ok(result) => {
348 tasks_completed += 1;
349 info!(
350 gorilla_id = %id,
351 tasks_completed,
352 tokens = result.usage.total(),
353 "gorilla tick completed successfully"
354 );
355 }
356 Err(e) => {
357 error_count += 1;
358 error!(
359 gorilla_id = %id,
360 error = %e,
361 error_count,
362 "gorilla tick failed"
363 );
364 }
365 }
366 }
367
368 info!(
369 gorilla_id = %id,
370 tasks_completed,
371 "gorilla background task stopped"
372 );
373 });
374
375 self.tasks.insert(
376 id,
377 GorillaTask {
378 handle,
379 started_at: Utc::now(),
380 },
381 );
382
383 Ok(())
384 }
385
386 pub fn stop_gorilla(&self, id: &GorillaId) -> bool {
388 if let Some((_, task)) = self.tasks.remove(id) {
389 task.handle.abort();
390 info!(gorilla_id = %id, "gorilla task stopped");
391 true
392 } else {
393 false
394 }
395 }
396
397 pub fn is_running(&self, id: &GorillaId) -> bool {
399 self.tasks.contains_key(id)
400 }
401
402 pub fn list_running(&self) -> Vec<GorillaId> {
404 self.tasks.iter().map(|entry| *entry.key()).collect()
405 }
406
407 pub fn shutdown_all(&self) {
409 let ids: Vec<GorillaId> = self.tasks.iter().map(|e| *e.key()).collect();
410 for id in &ids {
411 if let Some((_, task)) = self.tasks.remove(id) {
412 task.handle.abort();
413 }
414 }
415 info!(count = ids.len(), "all gorilla tasks shut down");
416 }
417
418 pub fn running_count(&self) -> usize {
420 self.tasks.len()
421 }
422}
423
424impl Default for BackgroundExecutor {
425 fn default() -> Self {
426 Self::new()
427 }
428}
429
430#[cfg(test)]
435mod tests {
436 use super::*;
437
438 #[test]
439 fn parse_schedule_seconds() {
440 assert_eq!(
441 BackgroundExecutor::parse_schedule("every 30s"),
442 Some(std::time::Duration::from_secs(30))
443 );
444 }
445
446 #[test]
447 fn parse_schedule_minutes() {
448 assert_eq!(
449 BackgroundExecutor::parse_schedule("every 5m"),
450 Some(std::time::Duration::from_secs(300))
451 );
452 }
453
454 #[test]
455 fn parse_schedule_hours() {
456 assert_eq!(
457 BackgroundExecutor::parse_schedule("every 1h"),
458 Some(std::time::Duration::from_secs(3600))
459 );
460 }
461
462 #[test]
463 fn parse_schedule_days() {
464 assert_eq!(
465 BackgroundExecutor::parse_schedule("every 1d"),
466 Some(std::time::Duration::from_secs(86400))
467 );
468 }
469
470 #[test]
471 fn parse_schedule_invalid() {
472 assert_eq!(BackgroundExecutor::parse_schedule("invalid"), None);
473 }
474
475 #[test]
476 fn parse_schedule_cron_every_30_minutes() {
477 assert_eq!(
478 BackgroundExecutor::parse_schedule("*/30 * * * *"),
479 Some(std::time::Duration::from_secs(1800))
480 );
481 }
482
483 #[test]
484 fn parse_schedule_cron_every_6_hours() {
485 assert_eq!(
486 BackgroundExecutor::parse_schedule("0 */6 * * *"),
487 Some(std::time::Duration::from_secs(21600))
488 );
489 }
490
491 #[test]
492 fn parse_schedule_cron_every_2_hours() {
493 assert_eq!(
494 BackgroundExecutor::parse_schedule("0 */2 * * *"),
495 Some(std::time::Duration::from_secs(7200))
496 );
497 }
498
499 #[test]
500 fn parse_schedule_cron_every_2_days() {
501 assert_eq!(
502 BackgroundExecutor::parse_schedule("0 0 */2 * *"),
503 Some(std::time::Duration::from_secs(172800))
504 );
505 }
506
507 #[test]
508 fn parse_schedule_cron_daily() {
509 assert_eq!(
510 BackgroundExecutor::parse_schedule("0 0 * * *"),
511 Some(std::time::Duration::from_secs(86400))
512 );
513 }
514
515 #[test]
516 fn parse_schedule_cron_every_3_hours() {
517 assert_eq!(
518 BackgroundExecutor::parse_schedule("0 */3 * * *"),
519 Some(std::time::Duration::from_secs(10800))
520 );
521 }
522
523 #[test]
524 fn parse_schedule_cron_every_4_hours() {
525 assert_eq!(
526 BackgroundExecutor::parse_schedule("0 */4 * * *"),
527 Some(std::time::Duration::from_secs(14400))
528 );
529 }
530
531 #[tokio::test]
532 async fn start_and_stop_gorilla() {
533 let executor = BackgroundExecutor::new();
534 let id = GorillaId::new();
535 let _manifest = GorillaManifest {
536 name: "test-gorilla".to_string(),
537 description: "test".to_string(),
538 schedule: "every 30s".to_string(),
539 moves_required: Vec::new(),
540 settings_schema: None,
541 dashboard_metrics: Vec::new(),
542 system_prompt: None,
543 model: None,
544 capabilities: Vec::new(),
545 weight_class: None,
546 };
547
548 let handle = tokio::spawn(async {
551 futures::future::pending::<()>().await;
552 });
553
554 executor.tasks.insert(
555 id,
556 GorillaTask {
557 handle,
558 started_at: Utc::now(),
559 },
560 );
561
562 assert_eq!(executor.running_count(), 1);
563 assert!(executor.list_running().contains(&id));
564
565 assert!(executor.stop_gorilla(&id));
566 assert_eq!(executor.running_count(), 0);
567 }
568
569 #[tokio::test]
570 async fn shutdown_all_stops_everything() {
571 let executor = BackgroundExecutor::new();
572
573 for _ in 0..3 {
574 let id = GorillaId::new();
575 let handle = tokio::spawn(async {
576 futures::future::pending::<()>().await;
577 });
578 executor.tasks.insert(
579 id,
580 GorillaTask {
581 handle,
582 started_at: Utc::now(),
583 },
584 );
585 }
586
587 assert_eq!(executor.running_count(), 3);
588 executor.shutdown_all();
589 assert_eq!(executor.running_count(), 0);
590 }
591
592 #[tokio::test]
593 async fn stop_nonexistent_gorilla_returns_false() {
594 let executor = BackgroundExecutor::new();
595 let id = GorillaId::new();
596 assert!(!executor.stop_gorilla(&id));
597 }
598
599 #[test]
600 fn parse_schedule_raw_seconds() {
601 assert_eq!(
602 BackgroundExecutor::parse_schedule("60"),
603 Some(std::time::Duration::from_secs(60))
604 );
605 }
606
607 #[test]
608 fn parse_schedule_with_whitespace() {
609 assert_eq!(
610 BackgroundExecutor::parse_schedule(" every 10s "),
611 Some(std::time::Duration::from_secs(10))
612 );
613 }
614
615 #[test]
616 fn parse_schedule_case_insensitive() {
617 assert_eq!(
618 BackgroundExecutor::parse_schedule("Every 2H"),
619 Some(std::time::Duration::from_secs(7200))
620 );
621 }
622
623 #[test]
624 fn parse_schedule_empty_string() {
625 assert_eq!(BackgroundExecutor::parse_schedule(""), None);
626 }
627
628 #[test]
629 fn parse_schedule_just_prefix() {
630 assert_eq!(BackgroundExecutor::parse_schedule("every "), None);
631 }
632
633 #[test]
634 fn default_creates_executor() {
635 let executor = BackgroundExecutor::default();
636 assert_eq!(executor.running_count(), 0);
637 assert!(executor.list_running().is_empty());
638 }
639
640 #[tokio::test]
641 async fn is_running_returns_correct_state() {
642 let executor = BackgroundExecutor::new();
643 let id = GorillaId::new();
644
645 assert!(!executor.is_running(&id));
646
647 let handle = tokio::spawn(async {
648 futures::future::pending::<()>().await;
649 });
650 executor.tasks.insert(
651 id,
652 GorillaTask {
653 handle,
654 started_at: Utc::now(),
655 },
656 );
657
658 assert!(executor.is_running(&id));
659 executor.stop_gorilla(&id);
660 assert!(!executor.is_running(&id));
661 }
662
663 #[tokio::test]
664 async fn multiple_gorillas_tracked_independently() {
665 let executor = BackgroundExecutor::new();
666 let ids: Vec<GorillaId> = (0..5).map(|_| GorillaId::new()).collect();
667
668 for &id in &ids {
669 let handle = tokio::spawn(async {
670 futures::future::pending::<()>().await;
671 });
672 executor.tasks.insert(
673 id,
674 GorillaTask {
675 handle,
676 started_at: Utc::now(),
677 },
678 );
679 }
680
681 assert_eq!(executor.running_count(), 5);
682
683 executor.stop_gorilla(&ids[0]);
685 executor.stop_gorilla(&ids[1]);
686 assert_eq!(executor.running_count(), 3);
687
688 for &id in &ids[2..] {
690 assert!(executor.is_running(&id));
691 }
692
693 executor.shutdown_all();
694 assert_eq!(executor.running_count(), 0);
695 }
696
697 #[tokio::test]
698 async fn with_shutdown_receives_shutdown_signal() {
699 let (tx, rx) = watch::channel(false);
700 let executor = BackgroundExecutor::with_shutdown(tx.clone(), rx);
701
702 let id = GorillaId::new();
703 let handle = tokio::spawn(async {
704 futures::future::pending::<()>().await;
705 });
706 executor.tasks.insert(
707 id,
708 GorillaTask {
709 handle,
710 started_at: Utc::now(),
711 },
712 );
713
714 assert_eq!(executor.running_count(), 1);
715 executor.shutdown_all();
716 assert_eq!(executor.running_count(), 0);
717 }
718
719 #[test]
720 fn fighter_manifest_from_gorilla_uses_default_model() {
721 use punch_types::{ModelConfig, Provider};
722
723 let manifest = GorillaManifest {
724 name: "test-gorilla".to_string(),
725 description: "A test gorilla".to_string(),
726 schedule: "every 30s".to_string(),
727 moves_required: Vec::new(),
728 settings_schema: None,
729 dashboard_metrics: Vec::new(),
730 system_prompt: Some("Custom prompt".to_string()),
731 model: None,
732 capabilities: Vec::new(),
733 weight_class: None,
734 };
735
736 let default_model = ModelConfig {
737 provider: Provider::Anthropic,
738 model: "claude-sonnet-4-20250514".to_string(),
739 api_key_env: None,
740 base_url: None,
741 max_tokens: Some(4096),
742 temperature: Some(0.7),
743 };
744
745 let fighter = fighter_manifest_from_gorilla(&manifest, &default_model);
746 assert_eq!(fighter.name, "test-gorilla");
747 assert_eq!(fighter.model.model, "claude-sonnet-4-20250514");
748 assert_eq!(fighter.system_prompt, "Custom prompt");
749 assert_eq!(fighter.weight_class, punch_types::WeightClass::Middleweight);
750 }
751
752 #[test]
753 fn fighter_manifest_from_gorilla_uses_gorilla_model_if_set() {
754 use punch_types::{ModelConfig, Provider};
755
756 let gorilla_model = ModelConfig {
757 provider: Provider::OpenAI,
758 model: "gpt-4o".to_string(),
759 api_key_env: None,
760 base_url: None,
761 max_tokens: Some(8192),
762 temperature: Some(0.5),
763 };
764
765 let manifest = GorillaManifest {
766 name: "smart-gorilla".to_string(),
767 description: "Uses its own model".to_string(),
768 schedule: "every 1h".to_string(),
769 moves_required: Vec::new(),
770 settings_schema: None,
771 dashboard_metrics: Vec::new(),
772 system_prompt: None,
773 model: Some(gorilla_model),
774 capabilities: Vec::new(),
775 weight_class: Some(punch_types::WeightClass::Heavyweight),
776 };
777
778 let default_model = ModelConfig {
779 provider: Provider::Anthropic,
780 model: "claude-sonnet-4-20250514".to_string(),
781 api_key_env: None,
782 base_url: None,
783 max_tokens: Some(4096),
784 temperature: Some(0.7),
785 };
786
787 let fighter = fighter_manifest_from_gorilla(&manifest, &default_model);
788 assert_eq!(fighter.model.model, "gpt-4o");
789 assert_eq!(fighter.weight_class, punch_types::WeightClass::Heavyweight);
790 assert_eq!(fighter.system_prompt, "Uses its own model");
792 }
793
794 #[tokio::test]
795 async fn list_running_returns_all_ids() {
796 let executor = BackgroundExecutor::new();
797 let mut expected_ids = Vec::new();
798
799 for _ in 0..3 {
800 let id = GorillaId::new();
801 expected_ids.push(id);
802 let handle = tokio::spawn(async {
803 futures::future::pending::<()>().await;
804 });
805 executor.tasks.insert(
806 id,
807 GorillaTask {
808 handle,
809 started_at: Utc::now(),
810 },
811 );
812 }
813
814 let running = executor.list_running();
815 assert_eq!(running.len(), 3);
816 for id in &expected_ids {
817 assert!(running.contains(id));
818 }
819
820 executor.shutdown_all();
821 }
822}