pub mod eval_viewer;
use std::collections::VecDeque;
use std::sync::{Arc, RwLock};
use eframe::egui;
use egui_cha_ds::{
CapacityGauge, CellState, HeatmapGrid, LogStream, LogStreamState, Sparkline, Status,
StatusIndicator, Theme,
};
use tokio::sync::broadcast;
use swarm_engine_core::telemetry::{ManagerState, SwarmEvent};
use swarm_engine_llm::{ModelInfo, ModelRegistry};
use eval_viewer::EvalViewer;
const HISTORY_LEN: usize = 60;
#[derive(Debug, Clone)]
pub struct BrainInfo {
pub status: Status,
pub task_title: String,
pub prompt_preview: String,
pub manager_range: (usize, usize),
}
impl Default for BrainInfo {
fn default() -> Self {
Self {
status: Status::Idle,
task_title: String::new(),
prompt_preview: String::new(),
manager_range: (0, 0),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ManagerInfo {
pub state: CellState,
pub assigned_brain: usize,
pub worker_ids: Vec<usize>,
pub current_task: String,
pub last_error: Option<String>,
pub tasks_completed: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AppTab {
#[default]
Swarm,
Eval,
}
pub struct App {
current_tab: AppTab,
system_status: Status,
tick: u64,
running: bool,
available_models: Vec<ModelInfo>,
selected_model: Option<String>,
models_loaded: bool,
brain_count: usize,
brain_infos: Vec<BrainInfo>,
hovered_brain: Option<usize>,
manager_states: Vec<CellState>,
manager_infos: Vec<ManagerInfo>,
selected_manager: Option<(usize, usize)>,
worker_count: usize,
active_workers: usize,
cpu_history: VecDeque<f32>,
latency_history: VecDeque<f32>,
throughput_history: VecDeque<f32>,
current_latency_ms: f32,
tasks_per_sec: f32,
memory_used_mb: f32,
memory_total_mb: f32,
runtime: tokio::runtime::Runtime,
pub shared: Arc<RwLock<SharedState>>,
event_rx: Option<broadcast::Receiver<SwarmEvent>>,
log_state: LogStreamState,
eval_viewer: EvalViewer,
}
#[derive(Debug, Default, Clone)]
pub struct SharedState {
pub tick: u64,
pub manager_states: Vec<CellState>,
pub cpu_percent: f32,
pub latency_ms: f32,
pub tasks_per_sec: f32,
pub running: bool,
}
impl Default for App {
fn default() -> Self {
let brain_count = 5;
let brain_infos: Vec<BrainInfo> = (0..brain_count)
.map(|i| BrainInfo {
status: Status::Idle,
task_title: format!("Brain #{} Task", i),
prompt_preview: String::new(),
manager_range: (i * 20, (i + 1) * 20),
})
.collect();
let manager_infos: Vec<ManagerInfo> = (0..100)
.map(|i| ManagerInfo {
state: CellState::Idle,
assigned_brain: i / 20, worker_ids: vec![],
current_task: String::new(),
last_error: None,
tasks_completed: 0,
})
.collect();
Self {
current_tab: AppTab::default(),
system_status: Status::Idle,
tick: 0,
running: false,
available_models: Vec::new(),
selected_model: None,
models_loaded: false,
brain_count,
brain_infos,
hovered_brain: None,
manager_states: vec![CellState::Idle; 100],
manager_infos,
selected_manager: None,
worker_count: 1000,
active_workers: 0,
cpu_history: VecDeque::with_capacity(HISTORY_LEN),
latency_history: VecDeque::with_capacity(HISTORY_LEN),
throughput_history: VecDeque::with_capacity(HISTORY_LEN),
current_latency_ms: 0.0,
tasks_per_sec: 0.0,
memory_used_mb: 8.0,
memory_total_mb: 128.0 * 1024.0,
runtime: tokio::runtime::Runtime::new().expect("Failed to create runtime"),
shared: Arc::new(RwLock::new(SharedState::default())),
event_rx: None,
log_state: LogStreamState::new().with_max_entries(500),
eval_viewer: EvalViewer::new(),
}
}
}
fn manager_state_to_cell_state(state: ManagerState) -> CellState {
match state {
ManagerState::Idle => CellState::Idle,
ManagerState::Processing => CellState::Processing,
ManagerState::Delegated => CellState::Delegated,
ManagerState::Escalated => CellState::Escalated,
ManagerState::Error => CellState::Error,
}
}
impl App {
pub fn with_shared(shared: Arc<RwLock<SharedState>>) -> Self {
Self {
shared,
..Default::default()
}
}
pub fn with_receiver(rx: broadcast::Receiver<SwarmEvent>) -> Self {
Self {
event_rx: Some(rx),
running: true, system_status: Status::Active,
..Default::default()
}
}
fn process_events(&mut self) {
let Some(mut rx) = self.event_rx.take() else {
return;
};
let mut events = Vec::new();
let mut closed = false;
let mut lagged = 0u64;
loop {
match rx.try_recv() {
Ok(event) => events.push(event),
Err(broadcast::error::TryRecvError::Empty) => break,
Err(broadcast::error::TryRecvError::Lagged(n)) => {
lagged += n;
}
Err(broadcast::error::TryRecvError::Closed) => {
closed = true;
break;
}
}
}
self.event_rx = Some(rx);
if lagged > 0 {
self.log_state
.push_warn("UI", format!("Dropped {} events (UI too slow)", lagged));
}
for event in events {
self.apply_event(event);
}
if closed {
self.running = false;
self.system_status = Status::Idle;
self.log_state
.push_info("System", "Swarm connection closed");
}
}
fn apply_event(&mut self, event: SwarmEvent) {
match event {
SwarmEvent::TickStart { tick } => {
self.tick = tick;
}
SwarmEvent::TickComplete {
tick,
duration_ns,
metrics,
} => {
self.tick = tick;
let latency_ms = duration_ns as f32 / 1_000_000.0;
self.current_latency_ms = latency_ms;
push_history(&mut self.latency_history, latency_ms);
let tick_duration_sec = duration_ns as f32 / 1_000_000_000.0;
if tick_duration_sec > 0.0 {
self.tasks_per_sec = metrics.total_actions as f32 / tick_duration_sec;
push_history(&mut self.throughput_history, self.tasks_per_sec);
}
self.active_workers = metrics.active_workers;
}
SwarmEvent::ManagerStateChange {
manager_id,
new_state,
} => {
if manager_id < self.manager_states.len() {
self.manager_states[manager_id] = manager_state_to_cell_state(new_state);
self.manager_infos[manager_id].state = manager_state_to_cell_state(new_state);
}
}
SwarmEvent::WorkerAction {
worker_id,
action,
success,
} => {
if !success {
self.log_state.push_warn(
&format!("Worker #{}", worker_id),
format!("Action failed: {}", action),
);
}
}
SwarmEvent::AsyncTaskComplete {
task_id,
duration_ms,
} => {
self.log_state.push_debug(
"AsyncTask",
format!("Task {:?} completed in {}ms", task_id, duration_ms),
);
}
SwarmEvent::SystemStart { worker_count } => {
self.worker_count = worker_count;
self.running = true;
self.system_status = Status::Active;
self.log_state
.push_info("System", format!("Started with {} workers", worker_count));
}
SwarmEvent::SystemStop {
total_ticks,
total_duration_ms,
} => {
self.running = false;
self.system_status = Status::Idle;
self.log_state.push_info(
"System",
format!(
"Stopped after {} ticks ({:.2}s)",
total_ticks,
total_duration_ms as f64 / 1000.0
),
);
}
}
}
fn load_models(&mut self) {
let registry = ModelRegistry::default_local();
if let Ok(models) = self.runtime.block_on(registry.discover()) {
self.available_models = models;
if let Some(first) = self.available_models.first() {
self.selected_model = Some(first.name.clone());
}
}
self.models_loaded = true;
}
fn simulate_tick(&mut self) {
self.tick += 1;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
self.tick.hash(&mut hasher);
let hash = hasher.finish();
for i in 0..100 {
let activity = ((hash >> (i % 64)) & 0x3) as u8;
self.manager_states[i] = match activity {
0 => CellState::Idle,
1 => CellState::Processing,
2 => CellState::Delegated,
_ => CellState::Escalated,
};
}
let t = self.tick as f32 * 0.1;
let cpu = 30.0 + 20.0 * t.sin() + 10.0 * (t * 2.3).cos();
let latency = 15.0 + 5.0 * (t * 1.5).sin();
let throughput = 10000.0 + 2000.0 * t.cos();
push_history(&mut self.cpu_history, cpu);
push_history(&mut self.latency_history, latency);
push_history(&mut self.throughput_history, throughput);
self.current_latency_ms = latency;
self.tasks_per_sec = throughput;
self.active_workers = (500.0 + 300.0 * t.sin()) as usize;
let sample_tasks = [
(
"Analyzing sentiment",
"Analyze the following customer feedback...",
),
(
"Code review",
"Review this pull request for security issues...",
),
(
"Data extraction",
"Extract key metrics from the quarterly report...",
),
(
"Summarization",
"Summarize the main points of this article...",
),
("Translation", "Translate the following text to Japanese..."),
];
for (i, brain) in self.brain_infos.iter_mut().enumerate() {
brain.status = if (hash >> (i * 8)) & 1 == 1 {
Status::Active
} else {
Status::Idle
};
if brain.status == Status::Active {
let (title, preview) = sample_tasks[i % sample_tasks.len()];
brain.task_title = title.to_string();
brain.prompt_preview = preview.to_string();
} else {
brain.task_title = "Idle".to_string();
brain.prompt_preview.clear();
}
}
for (i, info) in self.manager_infos.iter_mut().enumerate() {
info.state = self.manager_states[i];
info.tasks_completed = (hash >> (i % 32)) & 0xFF;
if info.state == CellState::Processing {
info.current_task = format!("Processing batch #{}", (self.tick + i as u64) % 100);
info.worker_ids = vec![i * 10, i * 10 + 1, i * 10 + 2];
} else {
info.current_task.clear();
info.worker_ids.clear();
}
if (hash >> (i % 16)) & 0xF == 0xF {
info.last_error = Some("Timeout after 30s".to_string());
}
}
if self.tick.is_multiple_of(10) {
let log_type = (hash >> 4) % 5;
match log_type {
0 => self.log_state.push_info(
&format!("Brain #{}", hash % 5),
format!("Processing task batch #{}", self.tick / 10),
),
1 => self.log_state.push_debug(
&format!("Manager [{},{}]", (hash % 10), (hash >> 3) % 10),
format!("Queue depth: {}", (hash >> 8) % 100),
),
2 => self.log_state.push_warn(
&format!("Worker #{}", hash % 1000),
format!("High latency detected: {:.1}ms", latency),
),
3 => self.log_state.push_error(
&format!("Manager [{},{}]", (hash >> 2) % 10, (hash >> 5) % 10),
"Connection timeout - retrying",
),
_ => self.log_state.push_info(
"System",
format!(
"Tick {} completed, {} tasks/sec",
self.tick, throughput as u64
),
),
}
}
}
fn sync_from_shared(&mut self) {
if let Ok(state) = self.shared.read() {
if state.running {
self.tick = state.tick;
if !state.manager_states.is_empty() {
self.manager_states = state.manager_states.clone();
}
push_history(&mut self.cpu_history, state.cpu_percent);
push_history(&mut self.latency_history, state.latency_ms);
push_history(&mut self.throughput_history, state.tasks_per_sec);
self.current_latency_ms = state.latency_ms;
self.tasks_per_sec = state.tasks_per_sec;
}
}
}
}
impl eframe::App for App {
fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
if !self.models_loaded {
self.load_models();
}
let theme = Theme::dark();
theme.apply(ctx);
if self.event_rx.is_some() {
self.process_events();
ctx.request_repaint_after(std::time::Duration::from_millis(16)); } else if self.running {
self.sync_from_shared();
let should_simulate = self
.shared
.read()
.map(|state| !state.running)
.unwrap_or(false);
if should_simulate {
self.simulate_tick();
}
ctx.request_repaint_after(std::time::Duration::from_millis(100));
}
egui::TopBottomPanel::top("top_panel").show(ctx, |ui| {
ui.horizontal(|ui| {
ui.heading("SwarmEngine");
ui.add_space(16.0);
ui.selectable_value(&mut self.current_tab, AppTab::Swarm, "Swarm");
ui.selectable_value(&mut self.current_tab, AppTab::Eval, "Eval");
ui.add_space(16.0);
ui.separator();
ui.add_space(8.0);
if self.current_tab == AppTab::Swarm {
StatusIndicator::new(self.system_status)
.label(if self.running { "Running" } else { "Stopped" })
.show(ui);
ui.add_space(16.0);
ui.label(format!("Tick: {}", self.tick));
ui.with_layout(egui::Layout::right_to_left(egui::Align::Center), |ui| {
if self.running {
if ui.button("Stop").clicked() {
self.running = false;
self.system_status = Status::Idle;
}
} else if ui.button("Start").clicked() {
self.running = true;
self.system_status = Status::Active;
}
if ui.button("Reset").clicked() {
self.tick = 0;
self.manager_states = vec![CellState::Idle; 100];
self.cpu_history.clear();
self.latency_history.clear();
self.throughput_history.clear();
self.running = false;
self.system_status = Status::Idle;
}
});
}
});
});
if self.current_tab == AppTab::Eval {
egui::CentralPanel::default().show(ctx, |ui| {
self.eval_viewer.show(ui);
});
return; }
let mut new_hovered_brain: Option<usize> = None;
egui::SidePanel::left("left_panel")
.min_width(220.0)
.show(ctx, |ui| {
ui.heading("Tier 1: Brains");
ui.label(format!("{} agents", self.brain_count));
ui.add_space(8.0);
for (i, brain) in self.brain_infos.iter().enumerate() {
let response = ui.horizontal(|ui| {
StatusIndicator::new(brain.status).size(10.0).show(ui);
ui.vertical(|ui| {
ui.label(format!("Brain #{}", i));
let task_text =
if brain.task_title.is_empty() || brain.task_title == "Idle" {
"Idle"
} else {
&brain.task_title
};
ui.label(
egui::RichText::new(task_text)
.small()
.color(theme.text_muted),
);
});
});
if response.response.hovered() {
new_hovered_brain = Some(i);
}
}
ui.add_space(16.0);
ui.separator();
ui.add_space(8.0);
ui.heading("LLM Model");
if self.available_models.is_empty() {
ui.label("No models available");
ui.label("Run: ollama serve");
} else {
for m in &self.available_models {
let selected = self.selected_model.as_ref() == Some(&m.name);
if ui.selectable_label(selected, &m.name).clicked() {
self.selected_model = Some(m.name.clone());
}
}
}
});
egui::SidePanel::right("right_panel")
.min_width(280.0)
.default_width(320.0)
.show(ctx, |ui| {
if let Some((row, col)) = self.selected_manager {
let idx = row * 10 + col;
let info = &self.manager_infos[idx];
let brain = &self.brain_infos[info.assigned_brain];
ui.horizontal(|ui| {
ui.heading(format!("Manager [{},{}]", row, col));
if ui.small_button("✕ Close").clicked() {
self.selected_manager = None;
}
});
ui.add_space(8.0);
ui.horizontal(|ui| {
ui.label("State:");
let state_text = format!("{:?}", info.state);
let color = match info.state {
CellState::Idle => theme.text_muted,
CellState::Processing => theme.state_info,
CellState::Delegated => theme.state_success,
CellState::Escalated => theme.state_warning,
CellState::Error => theme.state_danger,
CellState::Custom(_) => theme.text_primary,
};
ui.colored_label(color, state_text);
});
ui.add_space(8.0);
ui.separator();
ui.add_space(8.0);
ui.label(egui::RichText::new("Assigned Brain").strong());
ui.horizontal(|ui| {
StatusIndicator::new(brain.status).size(10.0).show(ui);
ui.label(format!("Brain #{}", info.assigned_brain));
});
if !brain.task_title.is_empty() {
ui.label(format!("Task: {}", brain.task_title));
}
ui.add_space(8.0);
ui.separator();
ui.add_space(8.0);
ui.label(egui::RichText::new("Current Task").strong());
if info.current_task.is_empty() {
ui.label(egui::RichText::new("None").color(theme.text_muted));
} else {
ui.label(&info.current_task);
}
ui.add_space(8.0);
ui.label(egui::RichText::new("Assigned Workers").strong());
if info.worker_ids.is_empty() {
ui.label(egui::RichText::new("None").color(theme.text_muted));
} else {
ui.label(format!("{:?}", info.worker_ids));
}
ui.add_space(8.0);
ui.label(format!("Tasks completed: {}", info.tasks_completed));
ui.add_space(8.0);
ui.separator();
ui.add_space(8.0);
ui.label(egui::RichText::new("Last Error").strong());
if let Some(err) = &info.last_error {
ui.colored_label(theme.state_danger, err);
} else {
ui.label(egui::RichText::new("None").color(theme.text_muted));
}
} else {
ui.heading("Metrics");
ui.add_space(8.0);
ui.label("CPU Usage");
let cpu_data: Vec<f32> = self.cpu_history.iter().copied().collect();
if !cpu_data.is_empty() {
Sparkline::new(&cpu_data)
.height(48.0)
.fill(true)
.bounds(0.0, 100.0)
.show(ui);
}
ui.add_space(12.0);
ui.label(format!("Latency: {:.1}ms", self.current_latency_ms));
let latency_data: Vec<f32> = self.latency_history.iter().copied().collect();
if !latency_data.is_empty() {
Sparkline::new(&latency_data)
.height(48.0)
.color(theme.state_warning)
.show(ui);
}
ui.add_space(12.0);
ui.label(format!("Tasks/sec: {:.0}", self.tasks_per_sec));
let throughput_data: Vec<f32> =
self.throughput_history.iter().copied().collect();
if !throughput_data.is_empty() {
Sparkline::new(&throughput_data)
.height(48.0)
.color(theme.state_success)
.show(ui);
}
ui.add_space(16.0);
ui.separator();
ui.add_space(8.0);
ui.heading("Memory");
let mem_percent = self.memory_used_mb / self.memory_total_mb * 100.0;
ui.label(format!(
"{:.1} GB / {:.0} GB ({:.2}%)",
self.memory_used_mb / 1024.0,
self.memory_total_mb / 1024.0,
mem_percent
));
ui.add_space(8.0);
ui.heading("Workers (Tier 3)");
CapacityGauge::from_fraction(
self.active_workers as u64,
self.worker_count as u64,
)
.thresholds(0.7, 0.9)
.height(16.0)
.show(ui);
}
});
egui::TopBottomPanel::bottom("log_panel")
.min_height(120.0)
.default_height(150.0)
.resizable(true)
.show(ctx, |ui| {
LogStream::new(&mut self.log_state)
.show_toolbar(true)
.show_timestamp(true)
.show_source(true)
.show(ui);
});
egui::CentralPanel::default().show(ctx, |ui| {
ui.heading("Tier 2: Managers (100)");
let message = if let Some(brain_idx) = self.hovered_brain {
let brain = &self.brain_infos[brain_idx];
(
format!(
"Highlighting Brain #{} managers (rows {}-{})",
brain_idx,
brain.manager_range.0 / 10,
(brain.manager_range.1 - 1) / 10
),
theme.state_info,
)
} else if self.selected_manager.is_some() {
(
"Click manager in grid for details".to_string(),
theme.text_muted,
)
} else {
(
"Hover over Brain to highlight managers".to_string(),
theme.text_muted,
)
};
ui.label(egui::RichText::new(message.0).small().color(message.1));
ui.add_space(8.0);
ui.horizontal(|ui| {
ui.label("Legend:");
ui.colored_label(theme.text_muted, "Idle");
ui.colored_label(theme.state_info, "Processing");
ui.colored_label(theme.state_success, "Delegated");
ui.colored_label(theme.state_warning, "Escalated");
});
ui.add_space(8.0);
let display_states: Vec<CellState> = if let Some(brain_idx) = self.hovered_brain {
let brain = &self.brain_infos[brain_idx];
self.manager_states
.iter()
.enumerate()
.map(|(i, state)| {
if i >= brain.manager_range.0 && i < brain.manager_range.1 {
*state
} else {
CellState::Idle
}
})
.collect()
} else {
self.manager_states.clone()
};
let spacing = 3.0;
let cols = 10.0;
let available_width = ui.available_width();
let cell_size = ((available_width - spacing * (cols + 1.0)) / cols).clamp(20.0, 60.0);
if let Some((row, col)) = HeatmapGrid::new(10, 10)
.data(&display_states)
.cell_size(cell_size)
.spacing(spacing)
.on_hover(|row, col| {
let idx = row * 10 + col;
let info = &self.manager_infos[idx];
format!(
"Manager [{}, {}]\nState: {:?}\nBrain: #{}\nTasks: {}",
row, col, info.state, info.assigned_brain, info.tasks_completed
)
})
.show(ui)
{
self.selected_manager = Some((row, col));
}
});
self.hovered_brain = new_hovered_brain;
}
}
fn push_history(history: &mut VecDeque<f32>, value: f32) {
if history.len() >= HISTORY_LEN {
history.pop_front();
}
history.push_back(value);
}
pub fn run() -> eframe::Result<()> {
let options = eframe::NativeOptions {
viewport: egui::ViewportBuilder::default()
.with_inner_size([1200.0, 800.0])
.with_title("SwarmEngine"),
..Default::default()
};
eframe::run_native(
"SwarmEngine",
options,
Box::new(|_cc| Ok(Box::new(App::default()))),
)
}
pub fn run_with_shared(shared: Arc<RwLock<SharedState>>) -> eframe::Result<()> {
let options = eframe::NativeOptions {
viewport: egui::ViewportBuilder::default()
.with_inner_size([1200.0, 800.0])
.with_title("SwarmEngine"),
..Default::default()
};
eframe::run_native(
"SwarmEngine",
options,
Box::new(move |_cc| Ok(Box::new(App::with_shared(shared.clone())))),
)
}
pub fn run_with_receiver(rx: broadcast::Receiver<SwarmEvent>) -> eframe::Result<()> {
let options = eframe::NativeOptions {
viewport: egui::ViewportBuilder::default()
.with_inner_size([1200.0, 800.0])
.with_title("SwarmEngine"),
..Default::default()
};
eframe::run_native(
"SwarmEngine",
options,
Box::new(move |cc| {
egui_cha_ds::setup_fonts(&cc.egui_ctx);
Ok(Box::new(App::with_receiver(rx)))
}),
)
}