Skip to main content

st/web_dashboard/
state_sync.rs

1//! Real-time state synchronization between MCP and browser dashboard
2//!
3//! Provides WebSocket endpoint `/ws/state` for:
4//! - Pushing MCP activity updates to browser at 60fps
5//! - Receiving user hints/nudges from browser
6
7use axum::{
8    extract::{
9        ws::{Message, WebSocket, WebSocketUpgrade},
10        State,
11    },
12    response::Response,
13};
14use chrono::{DateTime, Utc};
15use futures_util::{SinkExt, StreamExt};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashSet, VecDeque};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use super::SharedState;
23
24// ============================================================================
25// MCP Activity Tracking Types
26// ============================================================================
27
28/// Currently executing MCP tool
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct ActiveTool {
31    /// Tool name (e.g., "search", "read_file")
32    pub name: String,
33    /// When execution started (serialized as epoch millis)
34    #[serde(with = "instant_serde")]
35    pub started_at: Instant,
36    /// Tool parameters (simplified for display)
37    pub parameters: serde_json::Value,
38    /// Estimated progress 0.0 to 1.0 (if available)
39    pub progress: Option<f32>,
40}
41
42/// File access event for Wave Compass visualization
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct FileAccessEvent {
45    /// File path accessed
46    pub path: PathBuf,
47    /// Type of access
48    pub access_type: AccessType,
49    /// When access occurred (serialized as epoch millis)
50    #[serde(with = "instant_serde")]
51    pub timestamp: Instant,
52    /// Which MCP tool accessed it
53    pub tool_name: String,
54}
55
56/// Type of file access
57#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
58#[serde(rename_all = "snake_case")]
59pub enum AccessType {
60    Read,
61    Write,
62    Analyze,
63    Search,
64}
65
66/// Completed tool execution record
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct ToolExecution {
69    /// Tool name
70    pub name: String,
71    /// When execution completed
72    pub completed_at: DateTime<Utc>,
73    /// How long it took
74    pub duration_ms: u64,
75    /// Whether it succeeded
76    pub success: bool,
77    /// Brief result summary
78    pub summary: String,
79}
80
81/// Real-time MCP activity state for dashboard visualization
82#[derive(Debug, Default)]
83pub struct McpActivityState {
84    /// Currently executing tool (if any)
85    pub active_tool: Option<ActiveTool>,
86    /// Recent file access events (ring buffer, last 100)
87    pub file_access_log: VecDeque<FileAccessEvent>,
88    /// Human-readable current operation
89    pub current_operation: String,
90    /// Files touched this session
91    pub files_touched: HashSet<PathBuf>,
92    /// Directories explored this session
93    pub directories_explored: HashSet<PathBuf>,
94    /// Recent tool executions (last 20)
95    pub tool_history: VecDeque<ToolExecution>,
96    /// Last state update timestamp
97    pub last_update: Option<Instant>,
98}
99
100impl McpActivityState {
101    /// Maximum file access events to keep
102    const MAX_FILE_EVENTS: usize = 100;
103    /// Maximum tool history entries
104    const MAX_TOOL_HISTORY: usize = 20;
105
106    /// Record start of a tool execution
107    pub fn start_tool(&mut self, name: &str, parameters: serde_json::Value) {
108        self.active_tool = Some(ActiveTool {
109            name: name.to_string(),
110            started_at: Instant::now(),
111            parameters,
112            progress: None,
113        });
114        self.current_operation = format!("Executing {}...", name);
115        self.last_update = Some(Instant::now());
116    }
117
118    /// Update tool progress
119    pub fn update_progress(&mut self, progress: f32) {
120        if let Some(ref mut tool) = self.active_tool {
121            tool.progress = Some(progress.clamp(0.0, 1.0));
122            self.last_update = Some(Instant::now());
123        }
124    }
125
126    /// Record file access event
127    pub fn record_file_access(&mut self, path: PathBuf, access_type: AccessType, tool_name: &str) {
128        let event = FileAccessEvent {
129            path: path.clone(),
130            access_type,
131            timestamp: Instant::now(),
132            tool_name: tool_name.to_string(),
133        };
134
135        self.file_access_log.push_back(event);
136        if self.file_access_log.len() > Self::MAX_FILE_EVENTS {
137            self.file_access_log.pop_front();
138        }
139
140        self.files_touched.insert(path.clone());
141        if let Some(parent) = path.parent() {
142            self.directories_explored.insert(parent.to_path_buf());
143        }
144
145        self.last_update = Some(Instant::now());
146    }
147
148    /// Complete tool execution
149    pub fn complete_tool(&mut self, success: bool, summary: &str) {
150        if let Some(tool) = self.active_tool.take() {
151            let duration = tool.started_at.elapsed();
152            let execution = ToolExecution {
153                name: tool.name,
154                completed_at: Utc::now(),
155                duration_ms: duration.as_millis() as u64,
156                success,
157                summary: summary.to_string(),
158            };
159
160            self.tool_history.push_back(execution);
161            if self.tool_history.len() > Self::MAX_TOOL_HISTORY {
162                self.tool_history.pop_front();
163            }
164        }
165
166        self.current_operation = if success {
167            "Ready".to_string()
168        } else {
169            "Error occurred".to_string()
170        };
171        self.last_update = Some(Instant::now());
172    }
173
174    /// Get recent file events with age in milliseconds
175    pub fn recent_file_events(&self, max_age_ms: u64) -> Vec<FileEventDto> {
176        let now = Instant::now();
177        self.file_access_log
178            .iter()
179            .filter_map(|event| {
180                let age = now.duration_since(event.timestamp).as_millis() as u64;
181                if age <= max_age_ms {
182                    Some(FileEventDto {
183                        path: event.path.to_string_lossy().to_string(),
184                        access_type: event.access_type,
185                        age_ms: age,
186                        tool_name: event.tool_name.clone(),
187                    })
188                } else {
189                    None
190                }
191            })
192            .collect()
193    }
194}
195
196// ============================================================================
197// User Hints Types
198// ============================================================================
199
200/// User hint from browser to AI
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct UserHint {
203    /// Type of hint
204    pub hint_type: HintType,
205    /// Content/description
206    pub content: String,
207    /// When hint was sent
208    pub timestamp: DateTime<Utc>,
209    /// Whether hint has been consumed by MCP
210    #[serde(default)]
211    pub consumed: bool,
212}
213
214/// Type of user hint
215#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(tag = "type", rename_all = "snake_case")]
217pub enum HintType {
218    /// User clicked on Wave Compass region
219    Click { target: String },
220    /// User typed text hint
221    Text { message: String },
222    /// User voice input (transcribed)
223    Voice { transcript: String, salience: f32 },
224}
225
226/// Queue of user hints waiting to be consumed by MCP
227#[derive(Debug, Default)]
228pub struct UserHintsQueue {
229    pub hints: VecDeque<UserHint>,
230    pub max_size: usize,
231}
232
233impl UserHintsQueue {
234    const DEFAULT_MAX_SIZE: usize = 50;
235
236    pub fn new() -> Self {
237        Self {
238            hints: VecDeque::new(),
239            max_size: Self::DEFAULT_MAX_SIZE,
240        }
241    }
242
243    /// Add a new hint
244    pub fn push(&mut self, hint: UserHint) {
245        self.hints.push_back(hint);
246        while self.hints.len() > self.max_size {
247            self.hints.pop_front();
248        }
249    }
250
251    /// Get and consume the next unconsumed hint
252    pub fn consume_next(&mut self) -> Option<UserHint> {
253        for hint in &mut self.hints {
254            if !hint.consumed {
255                hint.consumed = true;
256                return Some(hint.clone());
257            }
258        }
259        None
260    }
261
262    /// Peek at unconsumed hints without consuming
263    pub fn peek_unconsumed(&self) -> Vec<&UserHint> {
264        self.hints.iter().filter(|h| !h.consumed).collect()
265    }
266
267    /// Count unconsumed hints
268    pub fn unconsumed_count(&self) -> usize {
269        self.hints.iter().filter(|h| !h.consumed).count()
270    }
271
272    /// Clear consumed hints older than threshold
273    pub fn gc(&mut self, max_age: Duration) {
274        let now = Utc::now();
275        self.hints.retain(|h| {
276            !h.consumed || (now - h.timestamp).num_milliseconds() < max_age.as_millis() as i64
277        });
278    }
279}
280
281// ============================================================================
282// WebSocket Protocol DTOs
283// ============================================================================
284
285/// State update sent to browser (60fps)
286#[derive(Debug, Serialize)]
287pub struct StateUpdateDto {
288    #[serde(rename = "type")]
289    pub msg_type: &'static str,
290    pub timestamp: i64,
291    pub mcp: McpStateDto,
292    pub file_log: Vec<FileEventDto>,
293    pub wave_compass: WaveCompassDto,
294    pub hints_pending: usize,
295}
296
297#[derive(Debug, Serialize)]
298pub struct McpStateDto {
299    pub active_tool: Option<String>,
300    pub current_operation: String,
301    pub progress: Option<f32>,
302    pub tools_executed: usize,
303}
304
305#[derive(Debug, Serialize)]
306pub struct FileEventDto {
307    pub path: String,
308    pub access_type: AccessType,
309    pub age_ms: u64,
310    pub tool_name: String,
311}
312
313#[derive(Debug, Serialize)]
314pub struct WaveCompassDto {
315    pub hot_regions: Vec<HotRegion>,
316    pub trail: Vec<[f32; 2]>,
317}
318
319#[derive(Debug, Serialize)]
320pub struct HotRegion {
321    pub x: f32,
322    pub y: f32,
323    pub intensity: f32,
324    pub label: String,
325}
326
327/// Hint message from browser
328#[derive(Debug, Deserialize)]
329pub struct HintMessageDto {
330    #[serde(rename = "type")]
331    pub msg_type: String,
332    pub hint_type: String,
333    #[serde(default)]
334    pub target: Option<String>,
335    #[serde(default)]
336    pub content: Option<String>,
337    #[serde(default)]
338    pub transcript: Option<String>,
339    #[serde(default)]
340    pub salience: Option<f32>,
341}
342
343// ============================================================================
344// WebSocket Handler
345// ============================================================================
346
347/// Handle WebSocket upgrade for state sync
348pub async fn state_handler(
349    ws: WebSocketUpgrade,
350    State(state): State<SharedState>,
351) -> Response {
352    ws.on_upgrade(|socket| handle_state_socket(socket, state))
353}
354
355/// Handle the WebSocket connection for real-time state sync
356async fn handle_state_socket(socket: WebSocket, state: SharedState) {
357    let (mut sender, mut receiver) = socket.split();
358
359    // Increment connection count
360    {
361        let mut dashboard = state.write().await;
362        dashboard.connections += 1;
363    }
364
365    // Clone handles for tasks
366    let mcp_activity = {
367        let dashboard = state.read().await;
368        Arc::clone(&dashboard.mcp_activity)
369    };
370    let user_hints = {
371        let dashboard = state.read().await;
372        Arc::clone(&dashboard.user_hints)
373    };
374
375    // Task: Send state updates at 60fps
376    let mcp_activity_send = Arc::clone(&mcp_activity);
377    let user_hints_send = Arc::clone(&user_hints);
378    let send_task = tokio::spawn(async move {
379        let mut interval = tokio::time::interval(Duration::from_millis(16)); // ~60fps
380        let mut last_update: Option<Instant> = None;
381
382        loop {
383            interval.tick().await;
384
385            let activity = mcp_activity_send.read().await;
386
387            // Skip if no updates since last send
388            if activity.last_update == last_update {
389                continue;
390            }
391            last_update = activity.last_update;
392
393            // Build state update
394            let update = build_state_update(&activity, &*user_hints_send.read().await);
395            drop(activity);
396
397            let json = match serde_json::to_string(&update) {
398                Ok(j) => j,
399                Err(_) => continue,
400            };
401
402            if sender.send(Message::Text(json)).await.is_err() {
403                break; // Client disconnected
404            }
405        }
406    });
407
408    // Task: Receive hints from browser
409    let recv_task = tokio::spawn(async move {
410        while let Some(Ok(msg)) = receiver.next().await {
411            if let Message::Text(text) = msg {
412                if let Ok(hint_msg) = serde_json::from_str::<HintMessageDto>(&text) {
413                    if hint_msg.msg_type == "hint" {
414                        let hint = parse_hint_message(hint_msg);
415                        user_hints.write().await.push(hint);
416                    }
417                }
418            }
419        }
420    });
421
422    // Wait for either task to complete
423    tokio::select! {
424        _ = send_task => {},
425        _ = recv_task => {},
426    }
427
428    // Decrement connection count
429    {
430        let mut dashboard = state.write().await;
431        dashboard.connections = dashboard.connections.saturating_sub(1);
432    }
433}
434
435/// Build state update DTO from current activity state
436fn build_state_update(activity: &McpActivityState, hints: &UserHintsQueue) -> StateUpdateDto {
437    let file_log = activity.recent_file_events(10_000); // Last 10 seconds
438
439    // Build wave compass data from file events
440    let wave_compass = build_wave_compass(&file_log, &activity.directories_explored);
441
442    StateUpdateDto {
443        msg_type: "state_update",
444        timestamp: Utc::now().timestamp_millis(),
445        mcp: McpStateDto {
446            active_tool: activity.active_tool.as_ref().map(|t| t.name.clone()),
447            current_operation: activity.current_operation.clone(),
448            progress: activity.active_tool.as_ref().and_then(|t| t.progress),
449            tools_executed: activity.tool_history.len(),
450        },
451        file_log,
452        wave_compass,
453        hints_pending: hints.unconsumed_count(),
454    }
455}
456
457/// Build Wave Compass visualization data
458fn build_wave_compass(file_log: &[FileEventDto], _directories: &HashSet<PathBuf>) -> WaveCompassDto {
459    use std::collections::HashMap;
460
461    // Aggregate intensity by directory
462    let mut dir_intensity: HashMap<String, f32> = HashMap::new();
463    for event in file_log {
464        let dir = PathBuf::from(&event.path)
465            .parent()
466            .map(|p| p.to_string_lossy().to_string())
467            .unwrap_or_default();
468
469        // Decay intensity by age (newer = brighter)
470        let intensity = 1.0 - (event.age_ms as f32 / 10_000.0).min(1.0);
471        *dir_intensity.entry(dir).or_default() += intensity * 0.3;
472    }
473
474    // Convert to hot regions with pseudo-random but stable coordinates
475    let hot_regions: Vec<HotRegion> = dir_intensity
476        .into_iter()
477        .map(|(dir, intensity)| {
478            let (x, y) = path_to_coords(&dir);
479            HotRegion {
480                x,
481                y,
482                intensity: intensity.min(1.0),
483                label: dir.split('/').next_back().unwrap_or(&dir).to_string(),
484            }
485        })
486        .filter(|r| r.intensity > 0.05)
487        .collect();
488
489    // Build trail from recent file accesses
490    let trail: Vec<[f32; 2]> = file_log
491        .iter()
492        .rev()
493        .take(20)
494        .map(|e| {
495            let (x, y) = path_to_coords(&e.path);
496            [x, y]
497        })
498        .collect();
499
500    WaveCompassDto { hot_regions, trail }
501}
502
503/// Convert file path to Wave Compass coordinates using directory clustering
504fn path_to_coords(path: &str) -> (f32, f32) {
505    // Simple but stable hash-based positioning with directory clustering
506    let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
507
508    if parts.is_empty() {
509        return (0.5, 0.5);
510    }
511
512    // First directory determines quadrant
513    let quadrant = match parts.first().copied() {
514        Some("src") => (0.0, 0.0),   // Upper-left
515        Some("tests") => (0.5, 0.0), // Upper-right
516        Some("docs") => (0.0, 0.5),  // Lower-left
517        Some("scripts") => (0.5, 0.5), // Lower-right
518        Some("examples") => (0.25, 0.25),
519        _ => (0.25, 0.75),
520    };
521
522    // Sub-path determines position within quadrant
523    let sub_hash = simple_hash(&parts[1..].join("/"));
524    let x = quadrant.0 + (sub_hash % 100) as f32 / 200.0;
525    let y = quadrant.1 + ((sub_hash / 100) % 100) as f32 / 200.0;
526
527    (x.clamp(0.02, 0.98), y.clamp(0.02, 0.98))
528}
529
530/// Simple string hash for stable positioning
531fn simple_hash(s: &str) -> u32 {
532    s.bytes().fold(0u32, |acc, b| acc.wrapping_mul(31).wrapping_add(b as u32))
533}
534
535/// Parse hint message from browser
536fn parse_hint_message(msg: HintMessageDto) -> UserHint {
537    let content = msg.content.unwrap_or_default();
538
539    let hint_type = match msg.hint_type.as_str() {
540        "click" => HintType::Click {
541            target: msg.target.unwrap_or_default(),
542        },
543        "text" => HintType::Text {
544            message: content.clone(),
545        },
546        "voice" => HintType::Voice {
547            transcript: msg.transcript.unwrap_or_default(),
548            salience: msg.salience.unwrap_or(0.5),
549        },
550        _ => HintType::Text {
551            message: content.clone(),
552        },
553    };
554
555    UserHint {
556        hint_type,
557        content,
558        timestamp: Utc::now(),
559        consumed: false,
560    }
561}
562
563// ============================================================================
564// Instant serde helper (serialize as duration since UNIX_EPOCH-ish)
565// ============================================================================
566
567mod instant_serde {
568    use serde::{Deserialize, Deserializer, Serialize, Serializer};
569    use std::time::Instant;
570
571    // We can't serialize Instant to absolute time, so we store "age" in ms
572    // When deserializing, we approximate by subtracting from now
573
574    pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
575    where
576        S: Serializer,
577    {
578        // Serialize as milliseconds ago
579        let age_ms = instant.elapsed().as_millis() as u64;
580        age_ms.serialize(serializer)
581    }
582
583    pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
584    where
585        D: Deserializer<'de>,
586    {
587        let age_ms = u64::deserialize(deserializer)?;
588        Ok(Instant::now() - std::time::Duration::from_millis(age_ms))
589    }
590}