1use axum::{
8 extract::{
9 ws::{Message, WebSocket, WebSocketUpgrade},
10 State,
11 },
12 response::Response,
13};
14use chrono::{DateTime, Utc};
15use futures_util::{SinkExt, StreamExt};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashSet, VecDeque};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use super::SharedState;
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct ActiveTool {
31 pub name: String,
33 #[serde(with = "instant_serde")]
35 pub started_at: Instant,
36 pub parameters: serde_json::Value,
38 pub progress: Option<f32>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct FileAccessEvent {
45 pub path: PathBuf,
47 pub access_type: AccessType,
49 #[serde(with = "instant_serde")]
51 pub timestamp: Instant,
52 pub tool_name: String,
54}
55
56#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
58#[serde(rename_all = "snake_case")]
59pub enum AccessType {
60 Read,
61 Write,
62 Analyze,
63 Search,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct ToolExecution {
69 pub name: String,
71 pub completed_at: DateTime<Utc>,
73 pub duration_ms: u64,
75 pub success: bool,
77 pub summary: String,
79}
80
81#[derive(Debug, Default)]
83pub struct McpActivityState {
84 pub active_tool: Option<ActiveTool>,
86 pub file_access_log: VecDeque<FileAccessEvent>,
88 pub current_operation: String,
90 pub files_touched: HashSet<PathBuf>,
92 pub directories_explored: HashSet<PathBuf>,
94 pub tool_history: VecDeque<ToolExecution>,
96 pub last_update: Option<Instant>,
98}
99
100impl McpActivityState {
101 const MAX_FILE_EVENTS: usize = 100;
103 const MAX_TOOL_HISTORY: usize = 20;
105
106 pub fn start_tool(&mut self, name: &str, parameters: serde_json::Value) {
108 self.active_tool = Some(ActiveTool {
109 name: name.to_string(),
110 started_at: Instant::now(),
111 parameters,
112 progress: None,
113 });
114 self.current_operation = format!("Executing {}...", name);
115 self.last_update = Some(Instant::now());
116 }
117
118 pub fn update_progress(&mut self, progress: f32) {
120 if let Some(ref mut tool) = self.active_tool {
121 tool.progress = Some(progress.clamp(0.0, 1.0));
122 self.last_update = Some(Instant::now());
123 }
124 }
125
126 pub fn record_file_access(&mut self, path: PathBuf, access_type: AccessType, tool_name: &str) {
128 let event = FileAccessEvent {
129 path: path.clone(),
130 access_type,
131 timestamp: Instant::now(),
132 tool_name: tool_name.to_string(),
133 };
134
135 self.file_access_log.push_back(event);
136 if self.file_access_log.len() > Self::MAX_FILE_EVENTS {
137 self.file_access_log.pop_front();
138 }
139
140 self.files_touched.insert(path.clone());
141 if let Some(parent) = path.parent() {
142 self.directories_explored.insert(parent.to_path_buf());
143 }
144
145 self.last_update = Some(Instant::now());
146 }
147
148 pub fn complete_tool(&mut self, success: bool, summary: &str) {
150 if let Some(tool) = self.active_tool.take() {
151 let duration = tool.started_at.elapsed();
152 let execution = ToolExecution {
153 name: tool.name,
154 completed_at: Utc::now(),
155 duration_ms: duration.as_millis() as u64,
156 success,
157 summary: summary.to_string(),
158 };
159
160 self.tool_history.push_back(execution);
161 if self.tool_history.len() > Self::MAX_TOOL_HISTORY {
162 self.tool_history.pop_front();
163 }
164 }
165
166 self.current_operation = if success {
167 "Ready".to_string()
168 } else {
169 "Error occurred".to_string()
170 };
171 self.last_update = Some(Instant::now());
172 }
173
174 pub fn recent_file_events(&self, max_age_ms: u64) -> Vec<FileEventDto> {
176 let now = Instant::now();
177 self.file_access_log
178 .iter()
179 .filter_map(|event| {
180 let age = now.duration_since(event.timestamp).as_millis() as u64;
181 if age <= max_age_ms {
182 Some(FileEventDto {
183 path: event.path.to_string_lossy().to_string(),
184 access_type: event.access_type,
185 age_ms: age,
186 tool_name: event.tool_name.clone(),
187 })
188 } else {
189 None
190 }
191 })
192 .collect()
193 }
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct UserHint {
203 pub hint_type: HintType,
205 pub content: String,
207 pub timestamp: DateTime<Utc>,
209 #[serde(default)]
211 pub consumed: bool,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(tag = "type", rename_all = "snake_case")]
217pub enum HintType {
218 Click { target: String },
220 Text { message: String },
222 Voice { transcript: String, salience: f32 },
224}
225
226#[derive(Debug, Default)]
228pub struct UserHintsQueue {
229 pub hints: VecDeque<UserHint>,
230 pub max_size: usize,
231}
232
233impl UserHintsQueue {
234 const DEFAULT_MAX_SIZE: usize = 50;
235
236 pub fn new() -> Self {
237 Self {
238 hints: VecDeque::new(),
239 max_size: Self::DEFAULT_MAX_SIZE,
240 }
241 }
242
243 pub fn push(&mut self, hint: UserHint) {
245 self.hints.push_back(hint);
246 while self.hints.len() > self.max_size {
247 self.hints.pop_front();
248 }
249 }
250
251 pub fn consume_next(&mut self) -> Option<UserHint> {
253 for hint in &mut self.hints {
254 if !hint.consumed {
255 hint.consumed = true;
256 return Some(hint.clone());
257 }
258 }
259 None
260 }
261
262 pub fn peek_unconsumed(&self) -> Vec<&UserHint> {
264 self.hints.iter().filter(|h| !h.consumed).collect()
265 }
266
267 pub fn unconsumed_count(&self) -> usize {
269 self.hints.iter().filter(|h| !h.consumed).count()
270 }
271
272 pub fn gc(&mut self, max_age: Duration) {
274 let now = Utc::now();
275 self.hints.retain(|h| {
276 !h.consumed || (now - h.timestamp).num_milliseconds() < max_age.as_millis() as i64
277 });
278 }
279}
280
281#[derive(Debug, Serialize)]
287pub struct StateUpdateDto {
288 #[serde(rename = "type")]
289 pub msg_type: &'static str,
290 pub timestamp: i64,
291 pub mcp: McpStateDto,
292 pub file_log: Vec<FileEventDto>,
293 pub wave_compass: WaveCompassDto,
294 pub hints_pending: usize,
295}
296
297#[derive(Debug, Serialize)]
298pub struct McpStateDto {
299 pub active_tool: Option<String>,
300 pub current_operation: String,
301 pub progress: Option<f32>,
302 pub tools_executed: usize,
303}
304
305#[derive(Debug, Serialize)]
306pub struct FileEventDto {
307 pub path: String,
308 pub access_type: AccessType,
309 pub age_ms: u64,
310 pub tool_name: String,
311}
312
313#[derive(Debug, Serialize)]
314pub struct WaveCompassDto {
315 pub hot_regions: Vec<HotRegion>,
316 pub trail: Vec<[f32; 2]>,
317}
318
319#[derive(Debug, Serialize)]
320pub struct HotRegion {
321 pub x: f32,
322 pub y: f32,
323 pub intensity: f32,
324 pub label: String,
325}
326
327#[derive(Debug, Deserialize)]
329pub struct HintMessageDto {
330 #[serde(rename = "type")]
331 pub msg_type: String,
332 pub hint_type: String,
333 #[serde(default)]
334 pub target: Option<String>,
335 #[serde(default)]
336 pub content: Option<String>,
337 #[serde(default)]
338 pub transcript: Option<String>,
339 #[serde(default)]
340 pub salience: Option<f32>,
341}
342
343pub async fn state_handler(
349 ws: WebSocketUpgrade,
350 State(state): State<SharedState>,
351) -> Response {
352 ws.on_upgrade(|socket| handle_state_socket(socket, state))
353}
354
355async fn handle_state_socket(socket: WebSocket, state: SharedState) {
357 let (mut sender, mut receiver) = socket.split();
358
359 {
361 let mut dashboard = state.write().await;
362 dashboard.connections += 1;
363 }
364
365 let mcp_activity = {
367 let dashboard = state.read().await;
368 Arc::clone(&dashboard.mcp_activity)
369 };
370 let user_hints = {
371 let dashboard = state.read().await;
372 Arc::clone(&dashboard.user_hints)
373 };
374
375 let mcp_activity_send = Arc::clone(&mcp_activity);
377 let user_hints_send = Arc::clone(&user_hints);
378 let send_task = tokio::spawn(async move {
379 let mut interval = tokio::time::interval(Duration::from_millis(16)); let mut last_update: Option<Instant> = None;
381
382 loop {
383 interval.tick().await;
384
385 let activity = mcp_activity_send.read().await;
386
387 if activity.last_update == last_update {
389 continue;
390 }
391 last_update = activity.last_update;
392
393 let update = build_state_update(&activity, &*user_hints_send.read().await);
395 drop(activity);
396
397 let json = match serde_json::to_string(&update) {
398 Ok(j) => j,
399 Err(_) => continue,
400 };
401
402 if sender.send(Message::Text(json)).await.is_err() {
403 break; }
405 }
406 });
407
408 let recv_task = tokio::spawn(async move {
410 while let Some(Ok(msg)) = receiver.next().await {
411 if let Message::Text(text) = msg {
412 if let Ok(hint_msg) = serde_json::from_str::<HintMessageDto>(&text) {
413 if hint_msg.msg_type == "hint" {
414 let hint = parse_hint_message(hint_msg);
415 user_hints.write().await.push(hint);
416 }
417 }
418 }
419 }
420 });
421
422 tokio::select! {
424 _ = send_task => {},
425 _ = recv_task => {},
426 }
427
428 {
430 let mut dashboard = state.write().await;
431 dashboard.connections = dashboard.connections.saturating_sub(1);
432 }
433}
434
435fn build_state_update(activity: &McpActivityState, hints: &UserHintsQueue) -> StateUpdateDto {
437 let file_log = activity.recent_file_events(10_000); let wave_compass = build_wave_compass(&file_log, &activity.directories_explored);
441
442 StateUpdateDto {
443 msg_type: "state_update",
444 timestamp: Utc::now().timestamp_millis(),
445 mcp: McpStateDto {
446 active_tool: activity.active_tool.as_ref().map(|t| t.name.clone()),
447 current_operation: activity.current_operation.clone(),
448 progress: activity.active_tool.as_ref().and_then(|t| t.progress),
449 tools_executed: activity.tool_history.len(),
450 },
451 file_log,
452 wave_compass,
453 hints_pending: hints.unconsumed_count(),
454 }
455}
456
457fn build_wave_compass(file_log: &[FileEventDto], _directories: &HashSet<PathBuf>) -> WaveCompassDto {
459 use std::collections::HashMap;
460
461 let mut dir_intensity: HashMap<String, f32> = HashMap::new();
463 for event in file_log {
464 let dir = PathBuf::from(&event.path)
465 .parent()
466 .map(|p| p.to_string_lossy().to_string())
467 .unwrap_or_default();
468
469 let intensity = 1.0 - (event.age_ms as f32 / 10_000.0).min(1.0);
471 *dir_intensity.entry(dir).or_default() += intensity * 0.3;
472 }
473
474 let hot_regions: Vec<HotRegion> = dir_intensity
476 .into_iter()
477 .map(|(dir, intensity)| {
478 let (x, y) = path_to_coords(&dir);
479 HotRegion {
480 x,
481 y,
482 intensity: intensity.min(1.0),
483 label: dir.split('/').next_back().unwrap_or(&dir).to_string(),
484 }
485 })
486 .filter(|r| r.intensity > 0.05)
487 .collect();
488
489 let trail: Vec<[f32; 2]> = file_log
491 .iter()
492 .rev()
493 .take(20)
494 .map(|e| {
495 let (x, y) = path_to_coords(&e.path);
496 [x, y]
497 })
498 .collect();
499
500 WaveCompassDto { hot_regions, trail }
501}
502
503fn path_to_coords(path: &str) -> (f32, f32) {
505 let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
507
508 if parts.is_empty() {
509 return (0.5, 0.5);
510 }
511
512 let quadrant = match parts.first().copied() {
514 Some("src") => (0.0, 0.0), Some("tests") => (0.5, 0.0), Some("docs") => (0.0, 0.5), Some("scripts") => (0.5, 0.5), Some("examples") => (0.25, 0.25),
519 _ => (0.25, 0.75),
520 };
521
522 let sub_hash = simple_hash(&parts[1..].join("/"));
524 let x = quadrant.0 + (sub_hash % 100) as f32 / 200.0;
525 let y = quadrant.1 + ((sub_hash / 100) % 100) as f32 / 200.0;
526
527 (x.clamp(0.02, 0.98), y.clamp(0.02, 0.98))
528}
529
530fn simple_hash(s: &str) -> u32 {
532 s.bytes().fold(0u32, |acc, b| acc.wrapping_mul(31).wrapping_add(b as u32))
533}
534
535fn parse_hint_message(msg: HintMessageDto) -> UserHint {
537 let content = msg.content.unwrap_or_default();
538
539 let hint_type = match msg.hint_type.as_str() {
540 "click" => HintType::Click {
541 target: msg.target.unwrap_or_default(),
542 },
543 "text" => HintType::Text {
544 message: content.clone(),
545 },
546 "voice" => HintType::Voice {
547 transcript: msg.transcript.unwrap_or_default(),
548 salience: msg.salience.unwrap_or(0.5),
549 },
550 _ => HintType::Text {
551 message: content.clone(),
552 },
553 };
554
555 UserHint {
556 hint_type,
557 content,
558 timestamp: Utc::now(),
559 consumed: false,
560 }
561}
562
563mod instant_serde {
568 use serde::{Deserialize, Deserializer, Serialize, Serializer};
569 use std::time::Instant;
570
571 pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
575 where
576 S: Serializer,
577 {
578 let age_ms = instant.elapsed().as_millis() as u64;
580 age_ms.serialize(serializer)
581 }
582
583 pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
584 where
585 D: Deserializer<'de>,
586 {
587 let age_ms = u64::deserialize(deserializer)?;
588 Ok(Instant::now() - std::time::Duration::from_millis(age_ms))
589 }
590}