Skip to main content

chasm/api/
recording.rs

1// Copyright (c) 2024-2026 Nervosys LLC
2// SPDX-License-Identifier: AGPL-3.0-only
3//! Real-time Session Recording API
4//!
5//! This module provides endpoints for recording chat sessions in real-time,
6//! preventing data loss from editor crashes. It accepts incremental updates
7//! via WebSocket or REST endpoints and persists them to the universal database.
8//!
9//! ## Architecture
10//!
11//! ```text
12//! VS Code Extension ─┬─> WebSocket ─┬─> RecordingService ─> Database
13//!                    │              │
14//!                    └─> REST API ──┘
15//! ```
16//!
17//! ## Recording Modes
18//!
19//! - **Live**: Real-time event streaming via WebSocket
20//! - **Batch**: Periodic sync via REST (fallback)
21//! - **Hybrid**: WebSocket with REST checkpoint backup
22
23use actix_web::{web, Error, HttpRequest, HttpResponse, Responder};
24use chrono::{DateTime, Utc};
25use futures_util::StreamExt;
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::sync::{Arc, RwLock};
29use std::time::{Duration, Instant};
30use tokio::sync::broadcast;
31use uuid::Uuid;
32
33// =============================================================================
34// Recording Event Types
35// =============================================================================
36
37/// Events sent from client for real-time recording
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(tag = "type", rename_all = "snake_case")]
40pub enum RecordingEvent {
41    /// Start recording a new session
42    SessionStart {
43        session_id: String,
44        workspace_id: Option<String>,
45        workspace_path: Option<String>,
46        provider: String,
47        title: Option<String>,
48        model: Option<String>,
49        metadata: Option<serde_json::Value>,
50    },
51
52    /// End a recording session
53    SessionEnd {
54        session_id: String,
55        final_message_count: Option<i32>,
56    },
57
58    /// Add a new message to the session
59    MessageAdd {
60        session_id: String,
61        message_id: String,
62        role: String, // "user", "assistant", "system"
63        content: String,
64        model: Option<String>,
65        parent_id: Option<String>,
66        metadata: Option<serde_json::Value>,
67    },
68
69    /// Update message content (for streaming responses)
70    MessageUpdate {
71        session_id: String,
72        message_id: String,
73        content: String,
74        is_complete: bool,
75    },
76
77    /// Append to message content (for streaming tokens)
78    MessageAppend {
79        session_id: String,
80        message_id: String,
81        content_delta: String,
82    },
83
84    /// Update session metadata (title, tags, etc.)
85    SessionUpdate {
86        session_id: String,
87        title: Option<String>,
88        model: Option<String>,
89        metadata: Option<serde_json::Value>,
90    },
91
92    /// Heartbeat to keep connection alive
93    Heartbeat {
94        session_id: Option<String>,
95        timestamp: i64,
96    },
97
98    /// Full session snapshot for recovery
99    SessionSnapshot {
100        session_id: String,
101        provider: String,
102        workspace_path: Option<String>,
103        title: Option<String>,
104        messages: Vec<RecordedMessage>,
105        metadata: Option<serde_json::Value>,
106    },
107}
108
109/// A recorded message in a session
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct RecordedMessage {
112    pub message_id: String,
113    pub role: String,
114    pub content: String,
115    pub model: Option<String>,
116    pub created_at: i64,
117    pub parent_id: Option<String>,
118    pub metadata: Option<serde_json::Value>,
119}
120
121/// Response sent back to client
122#[derive(Debug, Clone, Serialize, Deserialize)]
123#[serde(tag = "type", rename_all = "snake_case")]
124pub enum RecordingResponse {
125    /// Acknowledgement of received event
126    Ack {
127        event_id: String,
128        session_id: String,
129        status: String,
130    },
131
132    /// Error response
133    Error {
134        event_id: Option<String>,
135        code: String,
136        message: String,
137    },
138
139    /// Session recovery data
140    Recovery {
141        session_id: String,
142        last_message_id: Option<String>,
143        message_count: i32,
144    },
145}
146
147// =============================================================================
148// Recording State
149// =============================================================================
150
151/// Active recording session state
152#[derive(Debug, Clone)]
153pub struct ActiveSession {
154    pub session_id: String,
155    pub workspace_id: Option<String>,
156    pub workspace_path: Option<String>,
157    pub provider: String,
158    pub title: Option<String>,
159    pub model: Option<String>,
160    pub messages: Vec<RecordedMessage>,
161    pub started_at: DateTime<Utc>,
162    pub last_activity: DateTime<Utc>,
163    pub last_persisted_at: DateTime<Utc>,
164    pub is_dirty: bool,
165}
166
167/// Recording service state
168pub struct RecordingState {
169    /// Active recording sessions (session_id -> session)
170    pub active_sessions: RwLock<HashMap<String, ActiveSession>>,
171    /// Event broadcast channel for WebSocket distribution
172    pub event_tx: broadcast::Sender<RecordingEvent>,
173    /// Configuration
174    pub config: RecordingConfig,
175}
176
177/// Recording configuration
178#[derive(Debug, Clone)]
179pub struct RecordingConfig {
180    /// How often to persist dirty sessions (seconds)
181    pub persist_interval_secs: u64,
182    /// Maximum messages to keep in memory before forced persist
183    pub max_memory_messages: usize,
184    /// Session timeout for inactivity (seconds)
185    pub session_timeout_secs: u64,
186    /// Enable debug logging
187    pub debug: bool,
188}
189
190impl Default for RecordingConfig {
191    fn default() -> Self {
192        Self {
193            persist_interval_secs: 5,
194            max_memory_messages: 100,
195            session_timeout_secs: 3600, // 1 hour
196            debug: false,
197        }
198    }
199}
200
201impl RecordingState {
202    pub fn new(config: RecordingConfig) -> Self {
203        let (event_tx, _) = broadcast::channel(1024);
204        Self {
205            active_sessions: RwLock::new(HashMap::new()),
206            event_tx,
207            config,
208        }
209    }
210
211    /// Process a recording event
212    pub fn process_event(&self, event: &RecordingEvent) -> RecordingResponse {
213        match event {
214            RecordingEvent::SessionStart {
215                session_id,
216                workspace_id,
217                workspace_path,
218                provider,
219                title,
220                model,
221                metadata: _,
222            } => {
223                let session = ActiveSession {
224                    session_id: session_id.clone(),
225                    workspace_id: workspace_id.clone(),
226                    workspace_path: workspace_path.clone(),
227                    provider: provider.clone(),
228                    title: title.clone(),
229                    model: model.clone(),
230                    messages: Vec::new(),
231                    started_at: Utc::now(),
232                    last_activity: Utc::now(),
233                    last_persisted_at: Utc::now(),
234                    is_dirty: false,
235                };
236
237                if let Ok(mut sessions) = self.active_sessions.write() {
238                    sessions.insert(session_id.clone(), session);
239                }
240
241                RecordingResponse::Ack {
242                    event_id: Uuid::new_v4().to_string(),
243                    session_id: session_id.clone(),
244                    status: "session_started".to_string(),
245                }
246            }
247
248            RecordingEvent::SessionEnd {
249                session_id,
250                final_message_count: _,
251            } => {
252                // Persist and remove from active sessions
253                if let Ok(mut sessions) = self.active_sessions.write() {
254                    sessions.remove(session_id);
255                }
256
257                RecordingResponse::Ack {
258                    event_id: Uuid::new_v4().to_string(),
259                    session_id: session_id.clone(),
260                    status: "session_ended".to_string(),
261                }
262            }
263
264            RecordingEvent::MessageAdd {
265                session_id,
266                message_id,
267                role,
268                content,
269                model,
270                parent_id,
271                metadata,
272            } => {
273                let message = RecordedMessage {
274                    message_id: message_id.clone(),
275                    role: role.clone(),
276                    content: content.clone(),
277                    model: model.clone(),
278                    created_at: Utc::now().timestamp_millis(),
279                    parent_id: parent_id.clone(),
280                    metadata: metadata.clone(),
281                };
282
283                if let Ok(mut sessions) = self.active_sessions.write() {
284                    if let Some(session) = sessions.get_mut(session_id) {
285                        session.messages.push(message);
286                        session.last_activity = Utc::now();
287                        session.is_dirty = true;
288                    }
289                }
290
291                RecordingResponse::Ack {
292                    event_id: Uuid::new_v4().to_string(),
293                    session_id: session_id.clone(),
294                    status: "message_added".to_string(),
295                }
296            }
297
298            RecordingEvent::MessageUpdate {
299                session_id,
300                message_id,
301                content,
302                is_complete: _,
303            } => {
304                if let Ok(mut sessions) = self.active_sessions.write() {
305                    if let Some(session) = sessions.get_mut(session_id) {
306                        if let Some(msg) = session
307                            .messages
308                            .iter_mut()
309                            .find(|m| m.message_id == *message_id)
310                        {
311                            msg.content = content.clone();
312                            session.last_activity = Utc::now();
313                            session.is_dirty = true;
314                        }
315                    }
316                }
317
318                RecordingResponse::Ack {
319                    event_id: Uuid::new_v4().to_string(),
320                    session_id: session_id.clone(),
321                    status: "message_updated".to_string(),
322                }
323            }
324
325            RecordingEvent::MessageAppend {
326                session_id,
327                message_id,
328                content_delta,
329            } => {
330                if let Ok(mut sessions) = self.active_sessions.write() {
331                    if let Some(session) = sessions.get_mut(session_id) {
332                        if let Some(msg) = session
333                            .messages
334                            .iter_mut()
335                            .find(|m| m.message_id == *message_id)
336                        {
337                            msg.content.push_str(content_delta);
338                            session.last_activity = Utc::now();
339                            session.is_dirty = true;
340                        }
341                    }
342                }
343
344                RecordingResponse::Ack {
345                    event_id: Uuid::new_v4().to_string(),
346                    session_id: session_id.clone(),
347                    status: "message_appended".to_string(),
348                }
349            }
350
351            RecordingEvent::SessionUpdate {
352                session_id,
353                title,
354                model,
355                metadata: _,
356            } => {
357                if let Ok(mut sessions) = self.active_sessions.write() {
358                    if let Some(session) = sessions.get_mut(session_id) {
359                        if let Some(t) = title {
360                            session.title = Some(t.clone());
361                        }
362                        if let Some(m) = model {
363                            session.model = Some(m.clone());
364                        }
365                        session.last_activity = Utc::now();
366                        session.is_dirty = true;
367                    }
368                }
369
370                RecordingResponse::Ack {
371                    event_id: Uuid::new_v4().to_string(),
372                    session_id: session_id.clone(),
373                    status: "session_updated".to_string(),
374                }
375            }
376
377            RecordingEvent::Heartbeat {
378                session_id,
379                timestamp: _,
380            } => {
381                if let Some(sid) = session_id {
382                    if let Ok(mut sessions) = self.active_sessions.write() {
383                        if let Some(session) = sessions.get_mut(sid) {
384                            session.last_activity = Utc::now();
385                        }
386                    }
387                }
388
389                RecordingResponse::Ack {
390                    event_id: Uuid::new_v4().to_string(),
391                    session_id: session_id.clone().unwrap_or_default(),
392                    status: "heartbeat".to_string(),
393                }
394            }
395
396            RecordingEvent::SessionSnapshot {
397                session_id,
398                provider,
399                workspace_path,
400                title,
401                messages,
402                metadata: _,
403            } => {
404                let session = ActiveSession {
405                    session_id: session_id.clone(),
406                    workspace_id: None,
407                    workspace_path: workspace_path.clone(),
408                    provider: provider.clone(),
409                    title: title.clone(),
410                    model: None,
411                    messages: messages.clone(),
412                    started_at: Utc::now(),
413                    last_activity: Utc::now(),
414                    last_persisted_at: Utc::now(),
415                    is_dirty: true,
416                };
417
418                if let Ok(mut sessions) = self.active_sessions.write() {
419                    sessions.insert(session_id.clone(), session);
420                }
421
422                RecordingResponse::Ack {
423                    event_id: Uuid::new_v4().to_string(),
424                    session_id: session_id.clone(),
425                    status: "snapshot_received".to_string(),
426                }
427            }
428        }
429    }
430
431    /// Get active session count
432    pub fn active_count(&self) -> usize {
433        self.active_sessions.read().map(|s| s.len()).unwrap_or(0)
434    }
435
436    /// Get session for recovery
437    pub fn get_session(&self, session_id: &str) -> Option<ActiveSession> {
438        self.active_sessions
439            .read()
440            .ok()
441            .and_then(|s| s.get(session_id).cloned())
442    }
443
444    /// Get all dirty sessions that need persisting
445    pub fn get_dirty_sessions(&self) -> Vec<ActiveSession> {
446        self.active_sessions
447            .read()
448            .map(|s| s.values().filter(|sess| sess.is_dirty).cloned().collect())
449            .unwrap_or_default()
450    }
451
452    /// Mark session as persisted
453    pub fn mark_persisted(&self, session_id: &str) {
454        if let Ok(mut sessions) = self.active_sessions.write() {
455            if let Some(session) = sessions.get_mut(session_id) {
456                session.is_dirty = false;
457                session.last_persisted_at = Utc::now();
458            }
459        }
460    }
461}
462
463// =============================================================================
464// REST API Handlers
465// =============================================================================
466
467/// Request body for recording events
468#[derive(Debug, Deserialize)]
469pub struct RecordEventRequest {
470    pub events: Vec<RecordingEvent>,
471}
472
473/// Response for recording events
474#[derive(Debug, Serialize)]
475pub struct RecordEventResponse {
476    pub processed: usize,
477    pub responses: Vec<RecordingResponse>,
478}
479
480/// POST /api/recording/events - Process recording events
481pub async fn record_events(
482    state: web::Data<Arc<RecordingState>>,
483    body: web::Json<RecordEventRequest>,
484) -> impl Responder {
485    let mut responses = Vec::new();
486
487    for event in &body.events {
488        let response = state.process_event(event);
489        responses.push(response);
490
491        // Broadcast to WebSocket subscribers
492        let _ = state.event_tx.send(event.clone());
493    }
494
495    HttpResponse::Ok().json(RecordEventResponse {
496        processed: responses.len(),
497        responses,
498    })
499}
500
501/// POST /api/recording/snapshot - Store full session snapshot
502pub async fn store_snapshot(
503    state: web::Data<Arc<RecordingState>>,
504    body: web::Json<RecordingEvent>,
505) -> impl Responder {
506    if let RecordingEvent::SessionSnapshot { .. } = &*body {
507        let response = state.process_event(&body);
508        HttpResponse::Ok().json(response)
509    } else {
510        HttpResponse::BadRequest().json(RecordingResponse::Error {
511            event_id: None,
512            code: "invalid_event".to_string(),
513            message: "Expected SessionSnapshot event".to_string(),
514        })
515    }
516}
517
518/// GET /api/recording/sessions - List active recording sessions
519pub async fn list_sessions(state: web::Data<Arc<RecordingState>>) -> impl Responder {
520    let sessions: Vec<_> = state
521        .active_sessions
522        .read()
523        .map(|s| {
524            s.values()
525                .map(|sess| {
526                    serde_json::json!({
527                        "session_id": sess.session_id,
528                        "provider": sess.provider,
529                        "title": sess.title,
530                        "workspace_path": sess.workspace_path,
531                        "message_count": sess.messages.len(),
532                        "started_at": sess.started_at.to_rfc3339(),
533                        "last_activity": sess.last_activity.to_rfc3339(),
534                        "is_dirty": sess.is_dirty,
535                    })
536                })
537                .collect()
538        })
539        .unwrap_or_default();
540
541    HttpResponse::Ok().json(serde_json::json!({
542        "active_sessions": sessions,
543        "total": sessions.len(),
544    }))
545}
546
547/// GET /api/recording/session/{id} - Get specific session data
548pub async fn get_session(
549    state: web::Data<Arc<RecordingState>>,
550    path: web::Path<String>,
551) -> impl Responder {
552    let session_id = path.into_inner();
553
554    if let Some(session) = state.get_session(&session_id) {
555        HttpResponse::Ok().json(serde_json::json!({
556            "session_id": session.session_id,
557            "provider": session.provider,
558            "title": session.title,
559            "workspace_path": session.workspace_path,
560            "messages": session.messages,
561            "message_count": session.messages.len(),
562            "started_at": session.started_at.to_rfc3339(),
563            "last_activity": session.last_activity.to_rfc3339(),
564        }))
565    } else {
566        HttpResponse::NotFound().json(RecordingResponse::Error {
567            event_id: None,
568            code: "session_not_found".to_string(),
569            message: format!("Session {} not found", session_id),
570        })
571    }
572}
573
574/// GET /api/recording/session/{id}/recovery - Get recovery data for session
575pub async fn get_recovery(
576    state: web::Data<Arc<RecordingState>>,
577    path: web::Path<String>,
578) -> impl Responder {
579    let session_id = path.into_inner();
580
581    if let Some(session) = state.get_session(&session_id) {
582        let last_message_id = session.messages.last().map(|m| m.message_id.clone());
583        HttpResponse::Ok().json(RecordingResponse::Recovery {
584            session_id: session.session_id,
585            last_message_id,
586            message_count: session.messages.len() as i32,
587        })
588    } else {
589        HttpResponse::NotFound().json(RecordingResponse::Error {
590            event_id: None,
591            code: "session_not_found".to_string(),
592            message: format!("Session {} not found", session_id),
593        })
594    }
595}
596
597/// GET /api/recording/status - Recording service status
598pub async fn recording_status(
599    state: web::Data<Arc<RecordingState>>,
600    _req: HttpRequest,
601) -> impl Responder {
602    let dirty_count = state.get_dirty_sessions().len();
603
604    HttpResponse::Ok().json(serde_json::json!({
605        "status": "running",
606        "active_sessions": state.active_count(),
607        "dirty_sessions": dirty_count,
608        "config": {
609            "persist_interval_secs": state.config.persist_interval_secs,
610            "max_memory_messages": state.config.max_memory_messages,
611            "session_timeout_secs": state.config.session_timeout_secs,
612        }
613    }))
614}
615
616// =============================================================================
617// WebSocket Recording Support
618// =============================================================================
619
620const WS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
621const WS_CLIENT_TIMEOUT: Duration = Duration::from_secs(30);
622
623/// WebSocket messages for recording (client -> server)
624#[derive(Debug, Clone, Serialize, Deserialize)]
625#[serde(tag = "type", rename_all = "snake_case")]
626pub enum RecordingWsMessage {
627    /// Client wants to send recording events
628    Events { events: Vec<RecordingEvent> },
629    /// Client wants to subscribe to a session's events
630    Subscribe { session_id: String },
631    /// Client wants to unsubscribe from a session
632    Unsubscribe { session_id: String },
633    /// Ping for keepalive
634    Ping { timestamp: i64 },
635}
636
637/// Server responses for WebSocket (server -> client)
638#[derive(Debug, Clone, Serialize, Deserialize)]
639#[serde(tag = "type", rename_all = "snake_case")]
640pub enum RecordingWsResponse {
641    /// Connected successfully
642    Connected { client_id: String },
643    /// Events processed
644    EventsProcessed {
645        count: usize,
646        responses: Vec<RecordingResponse>,
647    },
648    /// Subscribed to session
649    Subscribed { session_id: String },
650    /// Unsubscribed from session  
651    Unsubscribed { session_id: String },
652    /// Event broadcast (from another client or server)
653    EventBroadcast { event: RecordingEvent },
654    /// Pong response
655    Pong { timestamp: i64, server_time: i64 },
656    /// Error
657    Error { code: String, message: String },
658}
659
660/// Handle incoming WebSocket message
661fn handle_ws_message(
662    text: &str,
663    state: &Arc<RecordingState>,
664    subscribed_sessions: &mut Vec<String>,
665) -> Option<RecordingWsResponse> {
666    match serde_json::from_str::<RecordingWsMessage>(text) {
667        Ok(msg) => match msg {
668            RecordingWsMessage::Events { events } => {
669                let mut responses = Vec::new();
670                for event in events {
671                    let response = state.process_event(&event);
672                    responses.push(response);
673                    // Broadcast to subscribers
674                    let _ = state.event_tx.send(event);
675                }
676                Some(RecordingWsResponse::EventsProcessed {
677                    count: responses.len(),
678                    responses,
679                })
680            }
681            RecordingWsMessage::Subscribe { session_id } => {
682                if !subscribed_sessions.contains(&session_id) {
683                    subscribed_sessions.push(session_id.clone());
684                }
685                Some(RecordingWsResponse::Subscribed { session_id })
686            }
687            RecordingWsMessage::Unsubscribe { session_id } => {
688                subscribed_sessions.retain(|s| s != &session_id);
689                Some(RecordingWsResponse::Unsubscribed { session_id })
690            }
691            RecordingWsMessage::Ping { timestamp } => Some(RecordingWsResponse::Pong {
692                timestamp,
693                server_time: Utc::now().timestamp_millis(),
694            }),
695        },
696        Err(e) => Some(RecordingWsResponse::Error {
697            code: "parse_error".to_string(),
698            message: format!("Invalid message: {}", e),
699        }),
700    }
701}
702
703/// Check if event should be forwarded to this client
704fn should_forward_event(event: &RecordingEvent, subscribed_sessions: &[String]) -> bool {
705    // If no specific subscriptions, forward all events
706    if subscribed_sessions.is_empty() {
707        return true;
708    }
709
710    let session_id = match event {
711        RecordingEvent::SessionStart { session_id, .. } => Some(session_id),
712        RecordingEvent::SessionEnd { session_id, .. } => Some(session_id),
713        RecordingEvent::MessageAdd { session_id, .. } => Some(session_id),
714        RecordingEvent::MessageUpdate { session_id, .. } => Some(session_id),
715        RecordingEvent::MessageAppend { session_id, .. } => Some(session_id),
716        RecordingEvent::SessionUpdate { session_id, .. } => Some(session_id),
717        RecordingEvent::SessionSnapshot { session_id, .. } => Some(session_id),
718        RecordingEvent::Heartbeat { session_id, .. } => session_id.as_ref(),
719    };
720
721    session_id
722        .map(|sid| subscribed_sessions.contains(sid))
723        .unwrap_or(false)
724}
725
726/// WebSocket endpoint for recording using actix-ws
727pub async fn recording_ws_handler(
728    req: HttpRequest,
729    body: web::Payload,
730    state: web::Data<Arc<RecordingState>>,
731) -> Result<HttpResponse, Error> {
732    // Perform WebSocket handshake
733    let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
734
735    let client_id = Uuid::new_v4().to_string();
736    let state_clone = state.get_ref().clone();
737
738    // Send connected message
739    let connected_msg = RecordingWsResponse::Connected {
740        client_id: client_id.clone(),
741    };
742    if let Ok(json) = serde_json::to_string(&connected_msg) {
743        let _ = session.text(json).await;
744    }
745
746    eprintln!("[WS] Recording client {} connected", client_id);
747
748    // Subscribe to broadcast channel
749    let mut broadcast_rx = state.event_tx.subscribe();
750
751    // Spawn handler task
752    let client_id_clone = client_id.clone();
753    actix_web::rt::spawn(async move {
754        let mut heartbeat_interval = tokio::time::interval(WS_HEARTBEAT_INTERVAL);
755        let mut last_heartbeat = Instant::now();
756        let mut subscribed_sessions: Vec<String> = Vec::new();
757
758        loop {
759            tokio::select! {
760                // Handle incoming messages
761                Some(msg_result) = msg_stream.next() => {
762                    match msg_result {
763                        Ok(actix_ws::Message::Text(text)) => {
764                            last_heartbeat = Instant::now();
765                            if let Some(response) = handle_ws_message(
766                                &text,
767                                &state_clone,
768                                &mut subscribed_sessions,
769                            ) {
770                                if let Ok(json) = serde_json::to_string(&response) {
771                                    let _ = session.text(json).await;
772                                }
773                            }
774                        }
775                        Ok(actix_ws::Message::Ping(data)) => {
776                            last_heartbeat = Instant::now();
777                            let _ = session.pong(&data).await;
778                        }
779                        Ok(actix_ws::Message::Pong(_)) => {
780                            last_heartbeat = Instant::now();
781                        }
782                        Ok(actix_ws::Message::Close(_)) => {
783                            eprintln!("[WS] Recording client {} requested close", client_id_clone);
784                            break;
785                        }
786                        _ => {}
787                    }
788                }
789
790                // Handle broadcast messages from other clients
791                Ok(event) = broadcast_rx.recv() => {
792                    if should_forward_event(&event, &subscribed_sessions) {
793                        let msg = RecordingWsResponse::EventBroadcast { event };
794                        if let Ok(json) = serde_json::to_string(&msg) {
795                            let _ = session.text(json).await;
796                        }
797                    }
798                }
799
800                // Heartbeat check
801                _ = heartbeat_interval.tick() => {
802                    if Instant::now().duration_since(last_heartbeat) > WS_CLIENT_TIMEOUT {
803                        eprintln!("[WS] Recording client {} timed out", client_id_clone);
804                        break;
805                    }
806                    let _ = session.ping(b"").await;
807                }
808            }
809        }
810
811        let _ = session.close(None).await;
812        eprintln!("[WS] Recording client {} disconnected", client_id_clone);
813    });
814
815    Ok(response)
816}
817
818// =============================================================================
819// Route Configuration
820// =============================================================================
821
822pub fn create_recording_state() -> Arc<RecordingState> {
823    Arc::new(RecordingState::new(RecordingConfig::default()))
824}
825
826pub fn configure_recording_routes(cfg: &mut web::ServiceConfig) {
827    eprintln!("[DEBUG] Configuring recording routes...");
828    cfg.service(
829        web::scope("/recording")
830            .route("/events", web::post().to(record_events))
831            .route("/snapshot", web::post().to(store_snapshot))
832            .route("/sessions", web::get().to(list_sessions))
833            .route("/session/{id}", web::get().to(get_session))
834            .route("/session/{id}/recovery", web::get().to(get_recovery))
835            .route("/status", web::get().to(recording_status))
836            .route("/ws", web::get().to(recording_ws_handler)),
837    );
838    eprintln!("[DEBUG] Recording routes configured.");
839}
840
841// =============================================================================
842// Tests
843// =============================================================================
844
845#[cfg(test)]
846mod tests {
847    use super::*;
848
849    #[test]
850    fn test_session_start() {
851        let state = RecordingState::new(RecordingConfig::default());
852
853        let event = RecordingEvent::SessionStart {
854            session_id: "test-123".to_string(),
855            workspace_id: None,
856            workspace_path: Some("/test/path".to_string()),
857            provider: "vscode".to_string(),
858            title: Some("Test Session".to_string()),
859            model: Some("gpt-4".to_string()),
860            metadata: None,
861        };
862
863        let response = state.process_event(&event);
864        assert!(matches!(response, RecordingResponse::Ack { .. }));
865        assert_eq!(state.active_count(), 1);
866    }
867
868    #[test]
869    fn test_message_add() {
870        let state = RecordingState::new(RecordingConfig::default());
871
872        // Start session
873        state.process_event(&RecordingEvent::SessionStart {
874            session_id: "test-123".to_string(),
875            workspace_id: None,
876            workspace_path: None,
877            provider: "vscode".to_string(),
878            title: None,
879            model: None,
880            metadata: None,
881        });
882
883        // Add message
884        state.process_event(&RecordingEvent::MessageAdd {
885            session_id: "test-123".to_string(),
886            message_id: "msg-1".to_string(),
887            role: "user".to_string(),
888            content: "Hello".to_string(),
889            model: None,
890            parent_id: None,
891            metadata: None,
892        });
893
894        let session = state.get_session("test-123").unwrap();
895        assert_eq!(session.messages.len(), 1);
896        assert_eq!(session.messages[0].content, "Hello");
897    }
898
899    #[test]
900    fn test_message_append() {
901        let state = RecordingState::new(RecordingConfig::default());
902
903        // Start session
904        state.process_event(&RecordingEvent::SessionStart {
905            session_id: "test-123".to_string(),
906            workspace_id: None,
907            workspace_path: None,
908            provider: "vscode".to_string(),
909            title: None,
910            model: None,
911            metadata: None,
912        });
913
914        // Add message
915        state.process_event(&RecordingEvent::MessageAdd {
916            session_id: "test-123".to_string(),
917            message_id: "msg-1".to_string(),
918            role: "assistant".to_string(),
919            content: "Hello".to_string(),
920            model: None,
921            parent_id: None,
922            metadata: None,
923        });
924
925        // Append to message
926        state.process_event(&RecordingEvent::MessageAppend {
927            session_id: "test-123".to_string(),
928            message_id: "msg-1".to_string(),
929            content_delta: " World!".to_string(),
930        });
931
932        let session = state.get_session("test-123").unwrap();
933        assert_eq!(session.messages[0].content, "Hello World!");
934    }
935}