use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use uuid::Uuid;
use crate::agent::{Task, TaskResult};
use crate::config::ClaudeConfig;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AgentMessage {
Coordination {
from_agent: String,
to_agent: String,
message_type: CoordinationType,
payload: serde_json::Value,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CoordinationType {
StatusUpdate,
TaskRequest,
SessionManagement,
}
use crate::identity::AgentRole;
use crate::session::session_pool::{PoolStatistics, SessionPool, SessionPoolConfig};
use crate::session::worktree_session::WorktreeSessionConfig;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SessionCoordinationMessage {
TaskRequest {
task: Task,
preferred_session: Option<String>,
batch_compatible: bool,
priority_boost: f64,
},
BatchTaskRequest {
tasks: Vec<Task>,
role: AgentRole,
deadline: Option<DateTime<Utc>>,
},
SessionStatusUpdate {
session_id: String,
agent_id: String,
load: f64,
tasks_completed: usize,
efficiency_metrics: EfficiencyMetrics,
},
SessionScalingRequest {
role: AgentRole,
min_sessions: usize,
reason: ScalingReason,
},
EfficiencyReport {
timestamp: DateTime<Utc>,
token_savings: TokenSavingsReport,
performance_metrics: PerformanceReport,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ScalingReason {
HighLoad,
TaskBacklog,
PerformanceOptimization,
UserRequest,
PredictiveScaling,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EfficiencyMetrics {
pub sessions_reused: usize,
pub tokens_saved: usize,
pub context_continuity_score: f64,
pub average_response_time: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TokenSavingsReport {
pub total_tasks_processed: usize,
pub sessions_created: usize,
pub estimated_traditional_tokens: usize,
pub actual_tokens_used: usize,
pub savings_percentage: f64,
pub efficiency_score: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceReport {
pub total_execution_time: std::time::Duration,
pub average_task_time: std::time::Duration,
pub throughput_tasks_per_minute: f64,
pub error_rate_percentage: f64,
pub session_utilization: f64,
}
#[derive(Debug)]
pub struct SessionCoordinator {
session_pool: Arc<Mutex<SessionPool>>,
#[allow(dead_code)] coordination_bus: Arc<RwLock<CoordinationBus>>,
active_tasks: Arc<RwLock<HashMap<String, TaskExecution>>>,
performance_tracker: Arc<Mutex<PerformanceTracker>>,
config: SessionCoordinatorConfig,
}
#[derive(Debug)]
struct CoordinationBus {
#[allow(dead_code)] outbound_queue: Vec<AgentMessage>,
#[allow(dead_code)] coordination_dir: PathBuf,
}
#[allow(dead_code)] trait MessageHandler: Send + Sync {
fn handle_message_sync(&self, message: &AgentMessage) -> Result<Option<AgentMessage>>;
}
#[derive(Debug, Clone)]
struct TaskExecution {
#[allow(dead_code)] pub task: Task,
#[allow(dead_code)] pub assigned_session: String,
#[allow(dead_code)] pub started_at: DateTime<Utc>,
#[allow(dead_code)] pub estimated_completion: Option<DateTime<Utc>>,
pub status: TaskExecutionStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum TaskExecutionStatus {
Queued,
Executing,
Completed,
Failed(String),
Cancelled,
}
#[derive(Debug)]
struct PerformanceTracker {
#[allow(dead_code)] start_time: DateTime<Utc>,
total_tasks: usize,
total_sessions_created: usize,
total_tokens_saved: usize,
total_execution_time: std::time::Duration,
error_count: usize,
}
#[derive(Debug, Clone)]
pub struct SessionCoordinatorConfig {
pub coordination_dir: PathBuf,
pub reporting_interval: std::time::Duration,
pub efficiency_optimization: bool,
pub efficiency_baseline: f64,
pub batch_optimization: BatchOptimizationConfig,
}
#[derive(Debug, Clone)]
pub struct BatchOptimizationConfig {
pub enabled: bool,
pub max_batch_size: usize,
pub batch_timeout: std::time::Duration,
pub compatibility_threshold: f64,
}
impl Default for SessionCoordinatorConfig {
fn default() -> Self {
Self {
coordination_dir: PathBuf::from("coordination"),
reporting_interval: std::time::Duration::from_secs(60),
efficiency_optimization: true,
efficiency_baseline: 1.0, batch_optimization: BatchOptimizationConfig {
enabled: true,
max_batch_size: 10,
batch_timeout: std::time::Duration::from_secs(30),
compatibility_threshold: 0.8,
},
}
}
}
impl SessionCoordinator {
pub async fn new(
worktree_config: WorktreeSessionConfig,
pool_config: SessionPoolConfig,
coordinator_config: SessionCoordinatorConfig,
) -> Result<Self> {
let mut session_pool = SessionPool::new(worktree_config, pool_config).await?;
session_pool.start().await?;
let coordination_bus =
CoordinationBus::new(coordinator_config.coordination_dir.clone()).await?;
let performance_tracker = PerformanceTracker {
start_time: Utc::now(),
total_tasks: 0,
total_sessions_created: 0,
total_tokens_saved: 0,
total_execution_time: std::time::Duration::ZERO,
error_count: 0,
};
Ok(Self {
session_pool: Arc::new(Mutex::new(session_pool)),
coordination_bus: Arc::new(RwLock::new(coordination_bus)),
active_tasks: Arc::new(RwLock::new(HashMap::new())),
performance_tracker: Arc::new(Mutex::new(performance_tracker)),
config: coordinator_config,
})
}
pub async fn start(&mut self) -> Result<()> {
tracing::info!("Starting session coordinator");
if self.config.efficiency_optimization {
self.start_efficiency_monitoring().await;
}
self.start_coordination_processing().await;
tracing::info!("Session coordinator started successfully");
Ok(())
}
pub async fn execute_coordinated_task(
&self,
role: AgentRole,
task: Task,
claude_config: ClaudeConfig,
) -> Result<TaskResult> {
let task_id = task.id.clone();
let start_time = std::time::Instant::now();
if self.config.batch_optimization.enabled {
if let Some(batch_result) = self.try_batch_optimization(&role, &task).await? {
return Ok(batch_result);
}
}
let execution = TaskExecution {
task: task.clone(),
assigned_session: "pending".to_string(),
started_at: Utc::now(),
estimated_completion: None,
status: TaskExecutionStatus::Queued,
};
{
let mut active_tasks = self.active_tasks.write().await;
active_tasks.insert(task_id.clone(), execution);
}
let result = {
let session_pool = self.session_pool.lock().await;
session_pool
.execute_task(role.clone(), task, claude_config)
.await?
};
{
let mut tracker = self.performance_tracker.lock().await;
tracker.total_tasks += 1;
tracker.total_execution_time += start_time.elapsed();
if !result.success {
tracker.error_count += 1;
}
let estimated_savings = self.calculate_token_savings(&result).await;
tracker.total_tokens_saved += estimated_savings;
}
{
let mut active_tasks = self.active_tasks.write().await;
if let Some(mut execution) = active_tasks.remove(&task_id) {
execution.status = if result.success {
TaskExecutionStatus::Completed
} else {
TaskExecutionStatus::Failed(
result
.error
.as_deref()
.unwrap_or("Unknown error")
.to_string(),
)
};
}
}
self.send_coordination_update(&role, &result).await?;
Ok(result)
}
pub async fn execute_coordinated_batch(
&self,
role: AgentRole,
tasks: Vec<Task>,
claude_config: ClaudeConfig,
) -> Result<Vec<TaskResult>> {
if tasks.is_empty() {
return Ok(Vec::new());
}
tracing::info!(
"Executing coordinated batch of {} tasks for role: {}",
tasks.len(),
role.name()
);
let start_time = std::time::Instant::now();
for task in &tasks {
let execution = TaskExecution {
task: task.clone(),
assigned_session: "batch".to_string(),
started_at: Utc::now(),
estimated_completion: None,
status: TaskExecutionStatus::Executing,
};
let mut active_tasks = self.active_tasks.write().await;
active_tasks.insert(task.id.clone(), execution);
}
let results = {
let session_pool = self.session_pool.lock().await;
session_pool
.execute_task_batch(role.clone(), tasks.clone(), claude_config)
.await?
};
let estimated_savings = (tasks.len().saturating_sub(1)) * 3400;
{
let mut tracker = self.performance_tracker.lock().await;
tracker.total_tasks += results.len();
tracker.total_execution_time += start_time.elapsed();
tracker.total_tokens_saved += estimated_savings;
let error_count = results.iter().filter(|r| !r.success).count();
tracker.error_count += error_count;
}
self.send_efficiency_report(estimated_savings, results.len())
.await?;
tracing::info!(
"Batch execution completed - estimated {} tokens saved",
estimated_savings
);
Ok(results)
}
async fn try_batch_optimization(
&self,
_role: &AgentRole,
_task: &Task,
) -> Result<Option<TaskResult>> {
Ok(None)
}
async fn calculate_token_savings(&self, result: &TaskResult) -> usize {
if let Some(output) = result.output.as_object() {
if output.contains_key("session_id") {
return 3400; }
}
0 }
async fn send_coordination_update(&self, _role: &AgentRole, result: &TaskResult) -> Result<()> {
let message = SessionCoordinationMessage::SessionStatusUpdate {
session_id: "session_id".to_string(), agent_id: "agent_id".to_string(), load: 0.5, tasks_completed: 1,
efficiency_metrics: EfficiencyMetrics {
sessions_reused: if result.success { 1 } else { 0 },
tokens_saved: 3400,
context_continuity_score: 0.95,
average_response_time: result.duration.as_secs_f64(),
},
};
self.send_coordination_message(message).await?;
Ok(())
}
async fn send_efficiency_report(
&self,
_tokens_saved: usize,
_tasks_processed: usize,
) -> Result<()> {
let tracker = self.performance_tracker.lock().await;
let report = SessionCoordinationMessage::EfficiencyReport {
timestamp: Utc::now(),
token_savings: TokenSavingsReport {
total_tasks_processed: tracker.total_tasks,
sessions_created: tracker.total_sessions_created,
estimated_traditional_tokens: tracker.total_tasks * 3600, actual_tokens_used: (tracker.total_tasks * 3600)
.saturating_sub(tracker.total_tokens_saved),
savings_percentage: if tracker.total_tasks > 0 {
(tracker.total_tokens_saved as f64 / (tracker.total_tasks * 3600) as f64)
* 100.0
} else {
0.0
},
efficiency_score: if tracker.total_tasks > 0 {
tracker.total_tokens_saved as f64 / tracker.total_tasks as f64
} else {
0.0
},
},
performance_metrics: PerformanceReport {
total_execution_time: tracker.total_execution_time,
average_task_time: if tracker.total_tasks > 0 {
tracker.total_execution_time / tracker.total_tasks as u32
} else {
std::time::Duration::ZERO
},
throughput_tasks_per_minute: if tracker.total_execution_time.as_secs() > 0 {
(tracker.total_tasks as f64 * 60.0) / tracker.total_execution_time.as_secs_f64()
} else {
0.0
},
error_rate_percentage: if tracker.total_tasks > 0 {
(tracker.error_count as f64 / tracker.total_tasks as f64) * 100.0
} else {
0.0
},
session_utilization: 0.75, },
};
self.send_coordination_message(report).await?;
Ok(())
}
async fn send_coordination_message(&self, message: SessionCoordinationMessage) -> Result<()> {
let coordination_message = AgentMessage::Coordination {
from_agent: "session-coordinator".to_string(),
to_agent: "system".to_string(),
message_type: CoordinationType::StatusUpdate, payload: serde_json::to_value(message)?,
};
let message_file = self
.config
.coordination_dir
.join("messages")
.join(format!("session-{}.json", Uuid::new_v4()));
if let Some(parent) = message_file.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(
&message_file,
serde_json::to_string_pretty(&coordination_message)?,
)
.await
.context("Failed to write coordination message")?;
Ok(())
}
async fn start_efficiency_monitoring(&self) {
let performance_tracker = Arc::clone(&self.performance_tracker);
let config = self.config.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(config.reporting_interval);
loop {
interval.tick().await;
let tracker = performance_tracker.lock().await;
if tracker.total_tasks > 0 {
let efficiency_percentage = (tracker.total_tokens_saved as f64
/ (tracker.total_tasks * 3600) as f64)
* 100.0;
let efficiency_score = if tracker.total_tasks > 0 {
tracker.total_tokens_saved as f64 / tracker.total_tasks as f64
} else {
0.0
};
tracing::info!(
"Session Efficiency Report - Tasks: {}, Context Optimized: {}, Efficiency: {:.1}%, Score: {:.2}",
tracker.total_tasks,
tracker.total_tokens_saved,
efficiency_percentage,
efficiency_score
);
}
}
});
}
async fn start_coordination_processing(&self) {
let coordination_dir = self.config.coordination_dir.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
interval.tick().await;
if let Err(e) = Self::process_coordination_messages(&coordination_dir).await {
tracing::error!("Failed to process coordination messages: {}", e);
}
}
});
}
async fn process_coordination_messages(coordination_dir: &Path) -> Result<()> {
let messages_dir = coordination_dir.join("messages");
if !messages_dir.exists() {
return Ok(());
}
Ok(())
}
pub async fn get_coordination_statistics(&self) -> CoordinationStatistics {
let pool_stats = {
let session_pool = self.session_pool.lock().await;
session_pool.get_pool_statistics().await
};
let performance_tracker = self.performance_tracker.lock().await;
let active_tasks = self.active_tasks.read().await;
CoordinationStatistics {
pool_statistics: pool_stats,
efficiency_metrics: EfficiencyMetrics {
sessions_reused: performance_tracker
.total_tasks
.saturating_sub(performance_tracker.total_sessions_created),
tokens_saved: performance_tracker.total_tokens_saved,
context_continuity_score: 0.95, average_response_time: if performance_tracker.total_tasks > 0 {
performance_tracker.total_execution_time.as_secs_f64()
/ performance_tracker.total_tasks as f64
} else {
0.0
},
},
active_task_count: active_tasks.len(),
total_coordination_messages: 0, }
}
pub async fn shutdown(&mut self) -> Result<()> {
tracing::info!("Shutting down session coordinator");
{
let mut session_pool = self.session_pool.lock().await;
session_pool.shutdown().await?;
}
tracing::info!("Session coordinator shutdown complete");
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CoordinationStatistics {
pub pool_statistics: PoolStatistics,
pub efficiency_metrics: EfficiencyMetrics,
pub active_task_count: usize,
pub total_coordination_messages: usize,
}
impl CoordinationBus {
async fn new(coordination_dir: PathBuf) -> Result<Self> {
tokio::fs::create_dir_all(&coordination_dir).await?;
Ok(Self {
outbound_queue: Vec::new(),
coordination_dir,
})
}
}