1use std::sync::Arc;
10
11use async_trait::async_trait;
12use chrono;
13use dashmap::DashMap;
14use serde::{Deserialize, Serialize};
15use tokio::sync::Mutex;
16use tokio::sync::watch;
17use tokio::task::JoinHandle;
18use tracing::{info, instrument, warn};
19
20use punch_memory::{BoutId, MemorySubstrate};
21use punch_runtime::{FighterLoopParams, FighterLoopResult, LlmDriver, McpClient, run_fighter_loop};
22use punch_types::{
23 AgentCoordinator, AgentInfo, AgentMessageResult, CoordinationStrategy, FighterId,
24 FighterManifest, FighterStatus, GorillaId, GorillaManifest, GorillaMetrics, GorillaStatus,
25 PunchConfig, PunchError, PunchEvent, PunchResult, TenantId, TenantStatus, Troop, TroopId,
26};
27
28use punch_skills::{SkillMarketplace, builtin_skills};
29
30use crate::agent_messaging::MessageRouter;
31use crate::background::BackgroundExecutor;
32use crate::budget::{BudgetEnforcer, BudgetLimit};
33use crate::event_bus::EventBus;
34use crate::heartbeat_scheduler::HeartbeatScheduler;
35use crate::metering::MeteringEngine;
36use crate::metrics::{self, MetricsRegistry};
37use crate::scheduler::{QuotaConfig, Scheduler};
38use crate::swarm::SwarmCoordinator;
39use crate::tenant_registry::TenantRegistry;
40use crate::triggers::{Trigger, TriggerEngine, TriggerId, TriggerSummary};
41use crate::troop::TroopManager;
42use crate::workflow::{Workflow, WorkflowEngine, WorkflowId, WorkflowRunId};
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct FighterEntry {
51 pub manifest: FighterManifest,
53 pub status: FighterStatus,
55 pub current_bout: Option<BoutId>,
57}
58
59pub struct GorillaEntry {
64 pub manifest: GorillaManifest,
66 pub status: GorillaStatus,
68 pub metrics: GorillaMetrics,
70 task_handle: Option<JoinHandle<()>>,
72}
73
74impl std::fmt::Debug for GorillaEntry {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("GorillaEntry")
78 .field("manifest", &self.manifest)
79 .field("status", &self.status)
80 .field("metrics", &self.metrics)
81 .field("has_task", &self.task_handle.is_some())
82 .finish()
83 }
84}
85
86pub struct Ring {
95 fighters: DashMap<FighterId, FighterEntry>,
97 gorillas: DashMap<GorillaId, Mutex<GorillaEntry>>,
99 memory: Arc<MemorySubstrate>,
101 driver: Arc<dyn LlmDriver>,
103 event_bus: EventBus,
105 scheduler: Scheduler,
107 config: PunchConfig,
109 background: BackgroundExecutor,
111 heartbeat_scheduler: HeartbeatScheduler,
113 channel_notifier: Arc<std::sync::RwLock<Option<Arc<dyn punch_types::ChannelNotifier>>>>,
115 workflow_engine: WorkflowEngine,
117 metering: MeteringEngine,
119 budget_enforcer: Arc<BudgetEnforcer>,
121 trigger_engine: TriggerEngine,
123 troop_manager: TroopManager,
125 swarm_coordinator: SwarmCoordinator,
127 message_router: MessageRouter,
129 metrics: Arc<MetricsRegistry>,
131 tenant_registry: TenantRegistry,
133 marketplace: SkillMarketplace,
135 mcp_clients: Arc<DashMap<String, Arc<McpClient>>>,
137 shutdown_tx: watch::Sender<bool>,
139 _shutdown_rx: watch::Receiver<bool>,
141}
142
143fn apply_budget_config(config: &PunchConfig, enforcer: &BudgetEnforcer) {
148 if !config.budget.has_any_limit() {
149 return;
150 }
151 let daily_from_config = config.budget.daily_cost_limit_usd;
152 let daily_from_monthly = config.budget.monthly_cost_limit_usd.map(|m| m / 30.0);
153 let max_cost_per_day_usd = match (daily_from_config, daily_from_monthly) {
154 (Some(d), Some(m)) => Some(d.min(m)),
155 (Some(d), None) => Some(d),
156 (None, Some(m)) => Some(m),
157 (None, None) => None,
158 };
159 let limit = BudgetLimit {
160 warning_threshold_percent: config.budget.eco_mode_threshold_percent,
161 max_cost_per_day_usd,
162 ..Default::default()
163 };
164 enforcer.set_global_limit(limit);
165 info!("budget limits loaded from config");
166}
167
168impl Ring {
169 pub fn new(
175 config: PunchConfig,
176 memory: Arc<MemorySubstrate>,
177 driver: Arc<dyn LlmDriver>,
178 ) -> Self {
179 let (shutdown_tx, shutdown_rx) = watch::channel(false);
180 let background =
181 BackgroundExecutor::with_shutdown(shutdown_tx.clone(), shutdown_rx.clone());
182 let heartbeat_scheduler =
183 HeartbeatScheduler::with_shutdown(shutdown_tx.clone(), shutdown_rx.clone());
184 let metering = MeteringEngine::new(Arc::clone(&memory));
185 let metering_arc = Arc::new(MeteringEngine::new(Arc::clone(&memory)));
186 let budget_enforcer = Arc::new(BudgetEnforcer::new(Arc::clone(&metering_arc)));
187
188 apply_budget_config(&config, &budget_enforcer);
190
191 let metrics_registry = Arc::new(MetricsRegistry::new());
192 metrics::register_default_metrics(&metrics_registry);
193
194 let marketplace = SkillMarketplace::new();
195 for listing in builtin_skills() {
196 marketplace.publish(listing);
197 }
198
199 let mcp_clients = Arc::new(DashMap::new());
200
201 Self {
202 fighters: DashMap::new(),
203 gorillas: DashMap::new(),
204 memory,
205 driver,
206 event_bus: EventBus::new(),
207 scheduler: Scheduler::new(QuotaConfig::default()),
208 config,
209 background,
210 heartbeat_scheduler,
211 channel_notifier: Arc::new(std::sync::RwLock::new(None)),
212 workflow_engine: WorkflowEngine::new(),
213 metering,
214 budget_enforcer,
215 trigger_engine: TriggerEngine::new(),
216 troop_manager: TroopManager::new(),
217 swarm_coordinator: SwarmCoordinator::new(),
218 message_router: MessageRouter::new(),
219 metrics: metrics_registry,
220 tenant_registry: TenantRegistry::new(),
221 marketplace,
222 mcp_clients,
223 shutdown_tx,
224 _shutdown_rx: shutdown_rx,
225 }
226 }
227
228 pub fn with_quota_config(
230 config: PunchConfig,
231 memory: Arc<MemorySubstrate>,
232 driver: Arc<dyn LlmDriver>,
233 quota_config: QuotaConfig,
234 ) -> Self {
235 let (shutdown_tx, shutdown_rx) = watch::channel(false);
236 let background =
237 BackgroundExecutor::with_shutdown(shutdown_tx.clone(), shutdown_rx.clone());
238 let heartbeat_scheduler =
239 HeartbeatScheduler::with_shutdown(shutdown_tx.clone(), shutdown_rx.clone());
240 let metering = MeteringEngine::new(Arc::clone(&memory));
241 let metering_arc = Arc::new(MeteringEngine::new(Arc::clone(&memory)));
242 let budget_enforcer = Arc::new(BudgetEnforcer::new(Arc::clone(&metering_arc)));
243
244 apply_budget_config(&config, &budget_enforcer);
246
247 let metrics_registry = Arc::new(MetricsRegistry::new());
248 metrics::register_default_metrics(&metrics_registry);
249
250 let marketplace = SkillMarketplace::new();
251 for listing in builtin_skills() {
252 marketplace.publish(listing);
253 }
254
255 let mcp_clients = Arc::new(DashMap::new());
256
257 Self {
258 fighters: DashMap::new(),
259 gorillas: DashMap::new(),
260 memory,
261 driver,
262 event_bus: EventBus::new(),
263 scheduler: Scheduler::new(quota_config),
264 config,
265 background,
266 heartbeat_scheduler,
267 channel_notifier: Arc::new(std::sync::RwLock::new(None)),
268 workflow_engine: WorkflowEngine::new(),
269 metering,
270 budget_enforcer,
271 trigger_engine: TriggerEngine::new(),
272 troop_manager: TroopManager::new(),
273 swarm_coordinator: SwarmCoordinator::new(),
274 message_router: MessageRouter::new(),
275 metrics: metrics_registry,
276 tenant_registry: TenantRegistry::new(),
277 marketplace,
278 mcp_clients,
279 shutdown_tx,
280 _shutdown_rx: shutdown_rx,
281 }
282 }
283
284 pub fn event_bus(&self) -> &EventBus {
288 &self.event_bus
289 }
290
291 pub fn scheduler(&self) -> &Scheduler {
293 &self.scheduler
294 }
295
296 pub fn memory(&self) -> &Arc<MemorySubstrate> {
298 &self.memory
299 }
300
301 pub fn config(&self) -> &PunchConfig {
303 &self.config
304 }
305
306 pub fn background(&self) -> &BackgroundExecutor {
308 &self.background
309 }
310
311 pub fn workflow_engine(&self) -> &WorkflowEngine {
313 &self.workflow_engine
314 }
315
316 pub fn metering(&self) -> &MeteringEngine {
318 &self.metering
319 }
320
321 pub fn budget_enforcer(&self) -> &Arc<BudgetEnforcer> {
323 &self.budget_enforcer
324 }
325
326 pub fn trigger_engine(&self) -> &TriggerEngine {
328 &self.trigger_engine
329 }
330
331 pub fn metrics(&self) -> &Arc<MetricsRegistry> {
333 &self.metrics
334 }
335
336 pub fn tenant_registry(&self) -> &TenantRegistry {
338 &self.tenant_registry
339 }
340
341 pub fn marketplace(&self) -> &SkillMarketplace {
343 &self.marketplace
344 }
345
346 pub fn mcp_clients(&self) -> &Arc<DashMap<String, Arc<McpClient>>> {
348 &self.mcp_clients
349 }
350
351 pub async fn spawn_mcp_servers(&self) {
359 for (name, server_config) in &self.config.mcp_servers {
360 info!(server = %name, command = %server_config.command, "spawning MCP server");
361
362 match McpClient::spawn(
363 name.clone(),
364 &server_config.command,
365 &server_config.args,
366 &server_config.env,
367 )
368 .await
369 {
370 Ok(client) => {
371 if let Err(e) = client.initialize().await {
372 warn!(server = %name, error = %e, "MCP server initialization failed, skipping");
373 let _ = client.shutdown().await;
374 continue;
375 }
376
377 match client.list_tools().await {
378 Ok(tools) => {
379 info!(
380 server = %name,
381 tool_count = tools.len(),
382 "MCP server ready with {} tools",
383 tools.len()
384 );
385 }
386 Err(e) => {
387 warn!(server = %name, error = %e, "failed to list MCP tools");
388 }
389 }
390
391 self.mcp_clients.insert(name.clone(), Arc::new(client));
392
393 self.event_bus.publish(PunchEvent::McpServerStarted {
394 server_name: name.clone(),
395 });
396 }
397 Err(e) => {
398 warn!(server = %name, error = %e, "failed to spawn MCP server, skipping");
399 }
400 }
401 }
402 }
403
404 pub async fn shutdown_mcp_servers(&self) {
406 for entry in self.mcp_clients.iter() {
407 let name = entry.key().clone();
408 info!(server = %name, "shutting down MCP server");
409 if let Err(e) = entry.value().shutdown().await {
410 warn!(server = %name, error = %e, "MCP server shutdown error");
411 }
412 }
413 self.mcp_clients.clear();
414 }
415
416 pub async fn mcp_tools(&self) -> Vec<punch_types::ToolDefinition> {
418 let mut tools = Vec::new();
419 for entry in self.mcp_clients.iter() {
420 match entry.value().list_tools().await {
421 Ok(server_tools) => tools.extend(server_tools),
422 Err(e) => {
423 warn!(
424 server = %entry.key(),
425 error = %e,
426 "failed to list tools from MCP server"
427 );
428 }
429 }
430 }
431 tools
432 }
433
434 #[instrument(skip(self, manifest), fields(fighter_name = %manifest.name))]
441 pub async fn spawn_fighter_for_tenant(
442 &self,
443 tenant_id: &TenantId,
444 mut manifest: FighterManifest,
445 ) -> PunchResult<FighterId> {
446 let tenant = self
448 .tenant_registry
449 .get_tenant(tenant_id)
450 .ok_or_else(|| PunchError::Tenant(format!("tenant {} not found", tenant_id)))?;
451
452 if tenant.status == TenantStatus::Suspended {
453 return Err(PunchError::Tenant(format!(
454 "tenant {} is suspended",
455 tenant_id
456 )));
457 }
458
459 let current_count = self
461 .fighters
462 .iter()
463 .filter(|e| e.value().manifest.tenant_id.as_ref() == Some(tenant_id))
464 .count();
465
466 if current_count >= tenant.quota.max_fighters {
467 return Err(PunchError::QuotaExceeded(format!(
468 "tenant {} has reached max fighters limit ({})",
469 tenant_id, tenant.quota.max_fighters
470 )));
471 }
472
473 manifest.tenant_id = Some(*tenant_id);
475 Ok(self.spawn_fighter(manifest).await)
476 }
477
478 pub fn list_fighters_for_tenant(
480 &self,
481 tenant_id: &TenantId,
482 ) -> Vec<(FighterId, FighterManifest, FighterStatus)> {
483 self.fighters
484 .iter()
485 .filter(|entry| entry.value().manifest.tenant_id.as_ref() == Some(tenant_id))
486 .map(|entry| {
487 let id = *entry.key();
488 let e = entry.value();
489 (id, e.manifest.clone(), e.status)
490 })
491 .collect()
492 }
493
494 #[instrument(skip(self), fields(%fighter_id, %tenant_id))]
498 pub fn kill_fighter_for_tenant(
499 &self,
500 fighter_id: &FighterId,
501 tenant_id: &TenantId,
502 ) -> PunchResult<()> {
503 let entry = self
504 .fighters
505 .get(fighter_id)
506 .ok_or_else(|| PunchError::Fighter(format!("fighter {} not found", fighter_id)))?;
507
508 if entry.manifest.tenant_id.as_ref() != Some(tenant_id) {
509 return Err(PunchError::Auth(format!(
510 "fighter {} does not belong to tenant {}",
511 fighter_id, tenant_id
512 )));
513 }
514
515 drop(entry);
516 self.kill_fighter(fighter_id);
517 Ok(())
518 }
519
520 pub fn check_tenant_tool_access(&self, tenant_id: &TenantId, tool_name: &str) -> bool {
525 match self.tenant_registry.get_tenant(tenant_id) {
526 Some(tenant) => {
527 tenant.quota.max_tools.is_empty()
528 || tenant.quota.max_tools.iter().any(|t| t == tool_name)
529 }
530 None => false,
531 }
532 }
533
534 pub fn register_trigger(&self, trigger: Trigger) -> TriggerId {
538 self.trigger_engine.register_trigger(trigger)
539 }
540
541 pub fn remove_trigger(&self, id: &TriggerId) {
543 self.trigger_engine.remove_trigger(id);
544 }
545
546 pub fn list_triggers(&self) -> Vec<(TriggerId, TriggerSummary)> {
548 self.trigger_engine.list_triggers()
549 }
550
551 #[instrument(skip(self, manifest), fields(fighter_name = %manifest.name))]
558 pub async fn spawn_fighter(&self, manifest: FighterManifest) -> FighterId {
559 let id = FighterId::new();
560 let name = manifest.name.clone();
561
562 if let Err(e) = self
564 .memory
565 .save_fighter(&id, &manifest, FighterStatus::Idle)
566 .await
567 {
568 warn!(error = %e, "failed to persist fighter to database (continuing in-memory only)");
569 }
570
571 let entry = FighterEntry {
572 manifest,
573 status: FighterStatus::Idle,
574 current_bout: None,
575 };
576
577 self.fighters.insert(id, entry);
578
579 self.metrics.counter_inc(metrics::FIGHTER_SPAWNS_TOTAL);
581 self.metrics
582 .gauge_set(metrics::ACTIVE_FIGHTERS, self.fighters.len() as i64);
583
584 self.event_bus.publish(PunchEvent::FighterSpawned {
585 fighter_id: id,
586 name: name.clone(),
587 });
588
589 info!(%id, name, "fighter spawned");
590
591 {
595 let memory = Arc::clone(&self.memory);
596 let creed_name = name.clone();
597 let fid = id;
598 let manifest_for_creed = self.fighters.get(&id).map(|e| e.value().manifest.clone());
600 tokio::spawn(async move {
601 match memory.load_creed_by_name(&creed_name).await {
604 Ok(None) if manifest_for_creed.is_some() => {
605 let manifest = manifest_for_creed.as_ref().unwrap();
606 let creed =
607 punch_types::Creed::new(&creed_name).with_self_awareness(manifest);
608 if let Err(e) = memory.save_creed(&creed).await {
609 warn!(error = %e, fighter = %creed_name, "failed to create default creed");
610 } else {
611 info!(fighter = %creed_name, "default creed created on spawn");
612 }
613 }
614 Ok(Some(mut creed)) if manifest_for_creed.is_some() => {
615 let manifest = manifest_for_creed.as_ref().unwrap();
618 let new_model = manifest.model.model.clone();
619 let new_provider = manifest.model.provider.to_string();
620 if creed.self_model.model_name != new_model
621 || creed.self_model.provider != new_provider
622 {
623 info!(
624 fighter = %creed_name,
625 old_model = %creed.self_model.model_name,
626 new_model = %new_model,
627 "updating creed self-model to match current config"
628 );
629 creed = creed.with_self_awareness(manifest);
630 if let Err(e) = memory.save_creed(&creed).await {
631 warn!(error = %e, fighter = %creed_name, "failed to update creed self-model");
632 }
633 }
634 }
635 Ok(_) => {}
636 Err(e) => {
637 warn!(error = %e, fighter = %creed_name, "failed to check creed on spawn");
638 }
639 }
640 if let Err(e) = memory.bind_creed_to_fighter(&creed_name, &fid).await {
642 warn!(error = %e, fighter = %creed_name, "failed to bind creed on spawn");
643 } else {
644 info!(fighter = %creed_name, id = %fid, "creed bound to fighter on spawn");
645 }
646 });
647 }
648
649 if let Some(entry) = self.fighters.get(&id) {
652 let manifest = entry.value().manifest.clone();
653 drop(entry); self.heartbeat_scheduler.start_monitoring(
655 crate::heartbeat_scheduler::HeartbeatStartConfig {
656 fighter_id: id,
657 fighter_name: name.clone(),
658 manifest,
659 memory: Arc::clone(&self.memory),
660 driver: Arc::clone(&self.driver),
661 event_bus: self.event_bus.clone(),
662 channel_notifier: self.channel_notifier.read().ok().and_then(|g| g.clone()),
663 model_routing: Some(self.config.model_routing.clone()),
664 chat_id_hint: None, },
666 );
667 }
668
669 id
670 }
671
672 pub async fn ensure_creed(&self, fighter_name: &str, manifest: &FighterManifest) {
675 match self.memory.load_creed_by_name(fighter_name).await {
676 Ok(Some(_)) => {
677 }
679 Ok(None) => {
680 let creed = punch_types::Creed::new(fighter_name).with_self_awareness(manifest);
682 if let Err(e) = self.memory.save_creed(&creed).await {
683 warn!(error = %e, "failed to create default creed");
684 } else {
685 info!(fighter = %fighter_name, "default creed created with self-awareness");
686 }
687 }
688 Err(e) => {
689 warn!(error = %e, "failed to check for existing creed");
690 }
691 }
692 }
693
694 #[instrument(skip(self, message), fields(%fighter_id))]
699 pub async fn send_message(
700 &self,
701 fighter_id: &FighterId,
702 message: String,
703 ) -> PunchResult<FighterLoopResult> {
704 self.send_message_with_coordinator(fighter_id, message, None, vec![])
705 .await
706 }
707
708 #[instrument(skip(self, message, coordinator, content_parts), fields(%fighter_id))]
717 pub async fn send_message_with_coordinator(
718 &self,
719 fighter_id: &FighterId,
720 message: String,
721 coordinator: Option<Arc<dyn AgentCoordinator>>,
722 content_parts: Vec<punch_types::ContentPart>,
723 ) -> PunchResult<FighterLoopResult> {
724 let mut entry = self
726 .fighters
727 .get_mut(fighter_id)
728 .ok_or_else(|| PunchError::Fighter(format!("fighter {} not found", fighter_id)))?;
729
730 if !self.scheduler.check_quota(fighter_id) {
732 entry.status = FighterStatus::Resting;
733 return Err(PunchError::RateLimited {
734 provider: "scheduler".to_string(),
735 retry_after_ms: 60_000,
736 });
737 }
738
739 let mut eco_mode = false;
742 match self.budget_enforcer.check_budget(fighter_id).await {
743 Ok(crate::budget::BudgetVerdict::Blocked {
744 reason,
745 retry_after_secs,
746 }) => {
747 entry.status = FighterStatus::Resting;
748 return Err(PunchError::RateLimited {
749 provider: format!("budget: {}", reason),
750 retry_after_ms: retry_after_secs * 1000,
751 });
752 }
753 Ok(crate::budget::BudgetVerdict::Warning { message, .. }) => {
754 info!(warning = %message, "budget warning — activating eco mode");
755 eco_mode = true;
756 }
757 Ok(crate::budget::BudgetVerdict::Allowed) => {}
758 Err(e) => {
759 warn!(error = %e, "budget check failed, allowing request");
760 }
761 }
762
763 let bout_id = match entry.current_bout {
765 Some(id) => id,
766 None => {
767 let id = match self.memory.latest_bout_for_fighter(fighter_id).await {
771 Ok(Some(existing)) => existing,
772 _ => {
773 let new_id =
775 self.memory.create_bout(fighter_id).await.map_err(|e| {
776 PunchError::Bout(format!("failed to create bout: {e}"))
777 })?;
778
779 self.event_bus.publish(PunchEvent::BoutStarted {
780 bout_id: new_id.0,
781 fighter_id: *fighter_id,
782 });
783
784 new_id
785 }
786 };
787 entry.current_bout = Some(id);
788
789 id
790 }
791 };
792
793 entry.status = FighterStatus::Fighting;
795 let manifest = entry.manifest.clone();
796 let fighter_name = manifest.name.clone();
797 drop(entry); let mut mcp_tools = Vec::new();
802 let has_mcp_access = manifest
803 .capabilities
804 .iter()
805 .any(|c| matches!(c, punch_types::Capability::McpAccess(_)));
806 if has_mcp_access && !self.mcp_clients.is_empty() {
807 for mcp_entry in self.mcp_clients.iter() {
808 let server_name = mcp_entry.key();
809 let can_access = manifest.capabilities.iter().any(|c| {
810 if let punch_types::Capability::McpAccess(pattern) = c {
811 pattern == "*" || pattern == server_name
812 } else {
813 false
814 }
815 });
816 if can_access {
817 match mcp_entry.value().list_tools().await {
818 Ok(tools) => mcp_tools.extend(tools),
819 Err(e) => {
820 warn!(
821 server = %server_name,
822 error = %e,
823 "failed to list MCP tools for fighter"
824 );
825 }
826 }
827 }
828 }
829 }
830
831 self.event_bus.publish(PunchEvent::FighterMessage {
833 fighter_id: *fighter_id,
834 bout_id: bout_id.0,
835 role: "user".to_string(),
836 content_preview: truncate_preview(&message, 120),
837 });
838
839 let params = FighterLoopParams {
843 manifest: manifest.clone(),
844 user_message: message,
845 bout_id,
846 fighter_id: *fighter_id,
847 memory: Arc::clone(&self.memory),
848 driver: Arc::clone(&self.driver),
849 available_tools: Vec::new(),
850 mcp_tools,
851 max_iterations: None,
852 context_window: None,
853 tool_timeout_secs: None,
854 coordinator,
855 approval_engine: None,
856 sandbox: None,
857 mcp_clients: if self.mcp_clients.is_empty() {
858 None
859 } else {
860 Some(Arc::clone(&self.mcp_clients))
861 },
862 model_routing: if self.config.model_routing.enabled {
863 Some(self.config.model_routing.clone())
864 } else {
865 None
866 },
867 channel_notifier: None,
868 user_content_parts: content_parts,
869 eco_mode,
870 };
871
872 self.metrics.counter_inc(metrics::MESSAGES_TOTAL);
874
875 let result = run_fighter_loop(params).await;
876
877 if let Some(mut entry) = self.fighters.get_mut(fighter_id) {
879 match &result {
880 Ok(loop_result) => {
881 entry.status = FighterStatus::Idle;
882 self.scheduler
883 .record_usage(fighter_id, loop_result.usage.total());
884
885 self.metrics
887 .counter_add(metrics::TOKENS_INPUT_TOTAL, loop_result.usage.input_tokens);
888 self.metrics.counter_add(
889 metrics::TOKENS_OUTPUT_TOTAL,
890 loop_result.usage.output_tokens,
891 );
892
893 if let Err(e) = self
895 .metering
896 .record_usage(
897 fighter_id,
898 &manifest.model.model,
899 loop_result.usage.input_tokens,
900 loop_result.usage.output_tokens,
901 )
902 .await
903 {
904 warn!(error = %e, "failed to record metering usage");
905 }
906
907 let preview = truncate_preview(&loop_result.response, 120);
909 self.event_bus.publish(PunchEvent::FighterMessage {
910 fighter_id: *fighter_id,
911 bout_id: bout_id.0,
912 role: "assistant".to_string(),
913 content_preview: preview,
914 });
915
916 self.event_bus.publish(PunchEvent::BoutEnded {
918 bout_id: bout_id.0,
919 fighter_id: *fighter_id,
920 messages_exchanged: loop_result.usage.total(),
921 });
922 }
923 Err(e) => {
924 entry.status = FighterStatus::KnockedOut;
925 self.metrics.counter_inc(metrics::ERRORS_TOTAL);
926
927 self.event_bus.publish(PunchEvent::Error {
929 source: fighter_name.clone(),
930 message: format!("{e}"),
931 });
932 }
933 }
934 }
935
936 result
937 }
938
939 pub fn heartbeat_scheduler(&self) -> &HeartbeatScheduler {
941 &self.heartbeat_scheduler
942 }
943
944 pub fn set_channel_notifier(&self, notifier: Arc<dyn punch_types::ChannelNotifier>) {
950 if let Ok(mut guard) = self.channel_notifier.write() {
951 *guard = Some(notifier);
952 }
953 }
954
955 #[instrument(skip(self), fields(%fighter_id))]
957 pub fn kill_fighter(&self, fighter_id: &FighterId) {
958 self.heartbeat_scheduler.stop_monitoring(fighter_id);
960
961 if let Some((_, entry)) = self.fighters.remove(fighter_id) {
962 self.scheduler.remove_fighter(fighter_id);
963 self.metrics
964 .gauge_set(metrics::ACTIVE_FIGHTERS, self.fighters.len() as i64);
965
966 self.event_bus.publish(PunchEvent::Error {
967 source: "ring".to_string(),
968 message: format!("Fighter '{}' killed", entry.manifest.name),
969 });
970
971 info!(name = %entry.manifest.name, "fighter killed");
972 } else {
973 warn!("attempted to kill unknown fighter");
974 }
975 }
976
977 pub fn list_fighters(&self) -> Vec<(FighterId, FighterManifest, FighterStatus)> {
979 self.fighters
980 .iter()
981 .map(|entry| {
982 let id = *entry.key();
983 let e = entry.value();
984 (id, e.manifest.clone(), e.status)
985 })
986 .collect()
987 }
988
989 pub fn get_fighter(&self, fighter_id: &FighterId) -> Option<FighterEntry> {
991 self.fighters.get(fighter_id).map(|e| e.value().clone())
992 }
993
994 #[instrument(skip(self, manifest), fields(gorilla_name = %manifest.name))]
1001 pub fn register_gorilla(&self, manifest: GorillaManifest) -> GorillaId {
1002 let id = GorillaId::new();
1003 let name = manifest.name.clone();
1004
1005 let entry = GorillaEntry {
1006 manifest,
1007 status: GorillaStatus::Caged,
1008 metrics: GorillaMetrics::default(),
1009 task_handle: None,
1010 };
1011
1012 self.gorillas.insert(id, Mutex::new(entry));
1013 info!(%id, name, "gorilla registered");
1014 id
1015 }
1016
1017 #[instrument(skip(self), fields(%gorilla_id))]
1022 pub async fn unleash_gorilla(&self, gorilla_id: &GorillaId) -> PunchResult<()> {
1023 let entry_ref = self
1024 .gorillas
1025 .get(gorilla_id)
1026 .ok_or_else(|| PunchError::Gorilla(format!("gorilla {} not found", gorilla_id)))?;
1027
1028 let mut entry = entry_ref.value().lock().await;
1029
1030 if entry.status == GorillaStatus::Unleashed || entry.status == GorillaStatus::Rampaging {
1031 return Err(PunchError::Gorilla(format!(
1032 "gorilla {} is already active",
1033 gorilla_id
1034 )));
1035 }
1036
1037 let gorilla_id_owned = *gorilla_id;
1038 let name = entry.manifest.name.clone();
1039 let manifest = entry.manifest.clone();
1040
1041 self.background.start_gorilla(
1043 gorilla_id_owned,
1044 manifest,
1045 self.config.default_model.clone(),
1046 Arc::clone(&self.memory),
1047 Arc::clone(&self.driver),
1048 )?;
1049
1050 entry.status = GorillaStatus::Unleashed;
1051 drop(entry);
1052 drop(entry_ref);
1053
1054 self.metrics.counter_inc(metrics::GORILLA_RUNS_TOTAL);
1056 self.metrics.gauge_inc(metrics::ACTIVE_GORILLAS);
1057
1058 self.event_bus.publish(PunchEvent::GorillaUnleashed {
1059 gorilla_id: gorilla_id_owned,
1060 name,
1061 });
1062
1063 Ok(())
1064 }
1065
1066 #[instrument(skip(self), fields(%gorilla_id))]
1068 pub async fn cage_gorilla(&self, gorilla_id: &GorillaId) -> PunchResult<()> {
1069 let entry_ref = self
1070 .gorillas
1071 .get(gorilla_id)
1072 .ok_or_else(|| PunchError::Gorilla(format!("gorilla {} not found", gorilla_id)))?;
1073
1074 let mut entry = entry_ref.value().lock().await;
1075
1076 self.background.stop_gorilla(gorilla_id);
1078
1079 if let Some(handle) = entry.task_handle.take() {
1081 handle.abort();
1082 }
1083
1084 let name = entry.manifest.name.clone();
1085 entry.status = GorillaStatus::Caged;
1086 drop(entry);
1087 drop(entry_ref);
1088
1089 self.metrics.gauge_dec(metrics::ACTIVE_GORILLAS);
1090
1091 self.event_bus.publish(PunchEvent::GorillaPaused {
1092 gorilla_id: *gorilla_id,
1093 reason: "manually caged".to_string(),
1094 });
1095
1096 info!(name, "gorilla caged");
1097 Ok(())
1098 }
1099
1100 pub async fn list_gorillas(
1102 &self,
1103 ) -> Vec<(GorillaId, GorillaManifest, GorillaStatus, GorillaMetrics)> {
1104 let mut result = Vec::new();
1105
1106 for entry in self.gorillas.iter() {
1107 let id = *entry.key();
1108 let inner = entry.value().lock().await;
1109 result.push((
1110 id,
1111 inner.manifest.clone(),
1112 inner.status,
1113 inner.metrics.clone(),
1114 ));
1115 }
1116
1117 result
1118 }
1119
1120 pub async fn get_gorilla_manifest(&self, gorilla_id: &GorillaId) -> Option<GorillaManifest> {
1122 let entry_ref = self.gorillas.get(gorilla_id)?;
1123 let entry = entry_ref.value().lock().await;
1124 Some(entry.manifest.clone())
1125 }
1126
1127 pub async fn find_gorilla_by_name(&self, name: &str) -> Option<GorillaId> {
1129 for entry in self.gorillas.iter() {
1130 let inner = entry.value().lock().await;
1131 if inner.manifest.name.eq_ignore_ascii_case(name) {
1132 return Some(*entry.key());
1133 }
1134 }
1135 None
1136 }
1137
1138 #[instrument(skip(self), fields(%gorilla_id))]
1143 pub async fn run_gorilla_tick(
1144 &self,
1145 gorilla_id: &GorillaId,
1146 ) -> PunchResult<punch_runtime::FighterLoopResult> {
1147 let entry_ref = self
1148 .gorillas
1149 .get(gorilla_id)
1150 .ok_or_else(|| PunchError::Gorilla(format!("gorilla {} not found", gorilla_id)))?;
1151
1152 let entry = entry_ref.value().lock().await;
1153 let manifest = entry.manifest.clone();
1154 drop(entry);
1155 drop(entry_ref);
1156
1157 crate::background::run_gorilla_tick(
1158 *gorilla_id,
1159 &manifest,
1160 &self.config.default_model,
1161 &self.memory,
1162 &self.driver,
1163 )
1164 .await
1165 }
1166
1167 pub fn driver(&self) -> &Arc<dyn LlmDriver> {
1169 &self.driver
1170 }
1171
1172 #[instrument(skip(self, message), fields(%source_id, %target_id))]
1181 pub async fn fighter_to_fighter(
1182 &self,
1183 source_id: &FighterId,
1184 target_id: &FighterId,
1185 message: String,
1186 ) -> PunchResult<FighterLoopResult> {
1187 let source_name = self
1189 .fighters
1190 .get(source_id)
1191 .map(|entry| entry.value().manifest.name.clone())
1192 .ok_or_else(|| {
1193 PunchError::Fighter(format!("source fighter {} not found", source_id))
1194 })?;
1195
1196 if self.fighters.get(target_id).is_none() {
1198 return Err(PunchError::Fighter(format!(
1199 "target fighter {} not found",
1200 target_id
1201 )));
1202 }
1203
1204 let enriched_message = format!(
1206 "[Message from fighter '{}' (id: {})]\n\n{}",
1207 source_name, source_id, message
1208 );
1209
1210 self.send_message(target_id, enriched_message).await
1212 }
1213
1214 pub fn find_fighter_by_name_sync(&self, name: &str) -> Option<(FighterId, FighterManifest)> {
1218 self.fighters.iter().find_map(|entry| {
1219 if entry.value().manifest.name.eq_ignore_ascii_case(name) {
1220 Some((*entry.key(), entry.value().manifest.clone()))
1221 } else {
1222 None
1223 }
1224 })
1225 }
1226
1227 pub async fn update_fighter_relationships(&self, fighter_a_name: &str, fighter_b_name: &str) {
1233 let memory = Arc::clone(&self.memory);
1234 let a_name = fighter_a_name.to_string();
1235 let b_name = fighter_b_name.to_string();
1236
1237 tokio::spawn(async move {
1239 if let Ok(Some(mut creed_a)) = memory.load_creed_by_name(&a_name).await {
1241 update_relationship(&mut creed_a, &b_name, None);
1242 if let Err(e) = memory.save_creed(&creed_a).await {
1243 warn!(error = %e, fighter = %a_name, "failed to save creed relationship update");
1244 }
1245 }
1246
1247 if let Ok(Some(mut creed_b)) = memory.load_creed_by_name(&b_name).await {
1249 update_relationship(&mut creed_b, &a_name, None);
1250 if let Err(e) = memory.save_creed(&creed_b).await {
1251 warn!(error = %e, fighter = %b_name, "failed to save creed relationship update");
1252 }
1253 }
1254 });
1255 }
1256
1257 pub fn troop_manager(&self) -> &TroopManager {
1261 &self.troop_manager
1262 }
1263
1264 pub fn swarm_coordinator(&self) -> &SwarmCoordinator {
1266 &self.swarm_coordinator
1267 }
1268
1269 pub fn message_router(&self) -> &MessageRouter {
1271 &self.message_router
1272 }
1273
1274 #[instrument(skip(self, members), fields(troop_name = %name))]
1278 pub fn form_troop(
1279 &self,
1280 name: String,
1281 leader: FighterId,
1282 members: Vec<FighterId>,
1283 strategy: CoordinationStrategy,
1284 ) -> PunchResult<TroopId> {
1285 if self.fighters.get(&leader).is_none() {
1287 return Err(PunchError::Troop(format!(
1288 "leader fighter {} not found",
1289 leader
1290 )));
1291 }
1292
1293 for member in &members {
1295 if self.fighters.get(member).is_none() {
1296 return Err(PunchError::Troop(format!(
1297 "member fighter {} not found",
1298 member
1299 )));
1300 }
1301 }
1302
1303 let member_count = members.len() + 1; let troop_id = self
1305 .troop_manager
1306 .form_troop(name.clone(), leader, members, strategy);
1307
1308 self.event_bus.publish(PunchEvent::TroopFormed {
1309 troop_id,
1310 name,
1311 member_count,
1312 });
1313
1314 Ok(troop_id)
1315 }
1316
1317 #[instrument(skip(self), fields(%troop_id))]
1319 pub fn disband_troop(&self, troop_id: &TroopId) -> PunchResult<()> {
1320 let name = self.troop_manager.disband_troop(troop_id)?;
1321
1322 self.event_bus.publish(PunchEvent::TroopDisbanded {
1323 troop_id: *troop_id,
1324 name,
1325 });
1326
1327 Ok(())
1328 }
1329
1330 pub fn assign_troop_task(
1332 &self,
1333 troop_id: &TroopId,
1334 task_description: &str,
1335 ) -> PunchResult<Vec<FighterId>> {
1336 self.troop_manager.assign_task(troop_id, task_description)
1337 }
1338
1339 pub async fn assign_troop_task_async(
1342 &self,
1343 troop_id: &TroopId,
1344 task: &str,
1345 ) -> PunchResult<crate::troop::TaskAssignmentResult> {
1346 self.troop_manager.assign_task_async(troop_id, task).await
1347 }
1348
1349 pub fn get_troop_status(&self, troop_id: &TroopId) -> Option<Troop> {
1351 self.troop_manager.get_troop(troop_id)
1352 }
1353
1354 pub fn list_troops(&self) -> Vec<Troop> {
1356 self.troop_manager.list_troops()
1357 }
1358
1359 pub fn recruit_to_troop(&self, troop_id: &TroopId, fighter_id: FighterId) -> PunchResult<()> {
1361 if self.fighters.get(&fighter_id).is_none() {
1363 return Err(PunchError::Troop(format!(
1364 "fighter {} not found",
1365 fighter_id
1366 )));
1367 }
1368 self.troop_manager.recruit(troop_id, fighter_id)
1369 }
1370
1371 pub fn dismiss_from_troop(
1373 &self,
1374 troop_id: &TroopId,
1375 fighter_id: &FighterId,
1376 ) -> PunchResult<()> {
1377 self.troop_manager.dismiss(troop_id, fighter_id)
1378 }
1379
1380 #[instrument(skip(self), fields(%fighter_id))]
1387 pub fn kill_fighter_safe(&self, fighter_id: &FighterId) {
1388 let troop_ids = self.troop_manager.get_fighter_troops(fighter_id);
1390 for troop_id in troop_ids {
1391 if let Err(e) = self.troop_manager.dismiss(&troop_id, fighter_id) {
1392 warn!(
1393 %troop_id,
1394 %fighter_id,
1395 error = %e,
1396 "failed to dismiss fighter from troop before kill"
1397 );
1398 }
1399 }
1400 self.kill_fighter(fighter_id);
1401 }
1402
1403 pub fn register_workflow(&self, workflow: Workflow) -> WorkflowId {
1407 self.workflow_engine.register_workflow(workflow)
1408 }
1409
1410 pub async fn execute_workflow(
1412 &self,
1413 workflow_id: &WorkflowId,
1414 input: String,
1415 ) -> PunchResult<WorkflowRunId> {
1416 self.workflow_engine
1417 .execute_workflow(
1418 workflow_id,
1419 input,
1420 Arc::clone(&self.memory),
1421 Arc::clone(&self.driver),
1422 &self.config.default_model,
1423 )
1424 .await
1425 }
1426
1427 pub fn shutdown(&self) {
1431 info!("Ring shutdown initiated");
1432
1433 let _ = self.shutdown_tx.send(true);
1435
1436 self.background.shutdown_all();
1438
1439 info!("Ring shutdown complete");
1440 }
1441}
1442
1443fn truncate_preview(s: &str, max_len: usize) -> String {
1449 if s.len() <= max_len {
1450 return s.to_string();
1451 }
1452 let end = s
1454 .char_indices()
1455 .take_while(|(i, _)| *i <= max_len.saturating_sub(3))
1456 .last()
1457 .map(|(i, c)| i + c.len_utf8())
1458 .unwrap_or(0);
1459 format!("{}...", &s[..end])
1460}
1461
1462fn update_relationship(creed: &mut punch_types::Creed, peer_name: &str, trust_nudge: Option<f64>) {
1464 if let Some(rel) = creed
1465 .relationships
1466 .iter_mut()
1467 .find(|r| r.entity == peer_name && r.entity_type == "fighter")
1468 {
1469 rel.interaction_count += 1;
1470 if let Some(nudge) = trust_nudge {
1471 rel.trust = (rel.trust * 0.9 + nudge * 0.1).clamp(0.0, 1.0);
1472 }
1473 rel.notes = format!(
1474 "Last interaction: {}",
1475 chrono::Utc::now().format("%Y-%m-%d %H:%M UTC")
1476 );
1477 } else {
1478 creed.relationships.push(punch_types::Relationship {
1479 entity: peer_name.to_string(),
1480 entity_type: "fighter".to_string(),
1481 nature: "peer".to_string(),
1482 trust: trust_nudge.unwrap_or(0.5),
1483 interaction_count: 1,
1484 notes: format!(
1485 "First interaction: {}",
1486 chrono::Utc::now().format("%Y-%m-%d %H:%M UTC")
1487 ),
1488 });
1489 }
1490 creed.updated_at = chrono::Utc::now();
1491 creed.version += 1;
1492}
1493
1494#[async_trait]
1499impl AgentCoordinator for Ring {
1500 async fn spawn_fighter(&self, manifest: FighterManifest) -> PunchResult<FighterId> {
1501 Ok(Ring::spawn_fighter(self, manifest).await)
1502 }
1503
1504 async fn send_message_to_agent(
1505 &self,
1506 target: &FighterId,
1507 message: String,
1508 ) -> PunchResult<AgentMessageResult> {
1509 let result = self.send_message(target, message).await?;
1510 Ok(AgentMessageResult {
1511 response: result.response,
1512 tokens_used: result.usage.total(),
1513 })
1514 }
1515
1516 async fn find_fighter_by_name(&self, name: &str) -> PunchResult<Option<FighterId>> {
1517 let found = self.fighters.iter().find_map(|entry| {
1518 if entry.value().manifest.name.eq_ignore_ascii_case(name) {
1519 Some(*entry.key())
1520 } else {
1521 None
1522 }
1523 });
1524 Ok(found)
1525 }
1526
1527 async fn list_fighters(&self) -> PunchResult<Vec<AgentInfo>> {
1528 let fighters = self
1529 .fighters
1530 .iter()
1531 .map(|entry| AgentInfo {
1532 id: *entry.key(),
1533 name: entry.value().manifest.name.clone(),
1534 status: entry.value().status,
1535 })
1536 .collect();
1537 Ok(fighters)
1538 }
1539}
1540
1541const _: () = {
1547 fn _assert_send<T: Send>() {}
1548 fn _assert_sync<T: Sync>() {}
1549 fn _assert() {
1550 _assert_send::<Ring>();
1551 _assert_sync::<Ring>();
1552 }
1553};