Skip to main content

localgpt_server/
http.rs

1//! HTTP server for LocalGPT
2//!
3//! Supports multiple sessions with session ID-based routing.
4//! Sessions are created on demand and cached for reuse.
5
6use anyhow::Result;
7use axum::{
8    Router,
9    extract::{
10        Path, Query, Request, State,
11        ws::{Message as WsMessage, WebSocket, WebSocketUpgrade},
12    },
13    http::{StatusCode, header},
14    middleware::{self, Next},
15    response::{
16        IntoResponse, Json, Response,
17        sse::{Event, Sse},
18    },
19    routing::{delete, get, post},
20};
21use futures::{SinkExt, StreamExt};
22use rust_embed::RustEmbed;
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::HashMap;
26use std::convert::Infallible;
27use std::net::SocketAddr;
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use tokio::sync::Mutex;
31use tower_http::cors::{AllowOrigin, Any, CorsLayer};
32use tower_http::limit::RequestBodyLimitLayer;
33use tracing::{debug, info};
34
35use localgpt_core::agent::{Agent, AgentConfig, StreamEvent, extract_tool_detail};
36use localgpt_core::concurrency::{TurnGate, WorkspaceLock};
37use localgpt_core::config::Config;
38use localgpt_core::heartbeat::{HeartbeatStatus, get_last_heartbeat_event};
39use localgpt_core::memory::MemoryManager;
40
41/// Embedded UI assets
42#[derive(RustEmbed)]
43#[folder = "ui/"]
44struct UiAssets;
45
46/// Session timeout (30 minutes of inactivity)
47const SESSION_TIMEOUT: Duration = Duration::from_secs(30 * 60);
48
49/// Maximum number of concurrent sessions
50const MAX_SESSIONS: usize = 100;
51
52/// Agent ID for HTTP sessions
53const HTTP_AGENT_ID: &str = "http";
54
55pub struct Server {
56    config: Config,
57    turn_gate: TurnGate,
58    bridge_manager: crate::security::BridgeManager,
59}
60
61pub(crate) struct SessionEntry {
62    agent: Agent,
63    last_accessed: Instant,
64    /// Whether session has unsaved changes
65    dirty: bool,
66}
67
68pub(crate) struct AppState {
69    pub(crate) config: Config,
70    pub(crate) sessions: Mutex<HashMap<String, SessionEntry>>,
71    /// Shared MemoryManager to avoid reinitializing embedding provider
72    pub(crate) memory: MemoryManager,
73    /// In-process turn gate shared with heartbeat runner
74    turn_gate: TurnGate,
75    /// Cross-process workspace lock
76    workspace_lock: WorkspaceLock,
77    /// Per-IP rate limiter
78    rate_limiter: Arc<crate::rate_limiter::RateLimiter>,
79    /// Bridge manager for tracking active connections
80    pub(crate) bridge_manager: crate::security::BridgeManager,
81}
82
83impl Server {
84    pub fn new(config: &Config) -> Result<Self> {
85        Ok(Self {
86            config: config.clone(),
87            turn_gate: TurnGate::new(),
88            bridge_manager: crate::security::BridgeManager::new(),
89        })
90    }
91
92    /// Create a server with a shared TurnGate (for daemon mode where
93    /// heartbeat and HTTP share concurrency control).
94    pub fn new_with_gate(config: &Config, turn_gate: TurnGate) -> Result<Self> {
95        Ok(Self {
96            config: config.clone(),
97            turn_gate,
98            bridge_manager: crate::security::BridgeManager::new(),
99        })
100    }
101
102    /// Create a server with both a shared TurnGate and a shared BridgeManager.
103    pub fn new_daemon(
104        config: &Config,
105        turn_gate: TurnGate,
106        bridge_manager: crate::security::BridgeManager,
107    ) -> Result<Self> {
108        Ok(Self {
109            config: config.clone(),
110            turn_gate,
111            bridge_manager,
112        })
113    }
114
115    pub async fn run(&self) -> Result<()> {
116        // Create shared MemoryManager once to avoid reinitializing embedding provider
117        let memory =
118            MemoryManager::new_with_full_config(&self.config.memory, Some(&self.config), "main")?;
119
120        let workspace_lock = WorkspaceLock::new()?;
121        let rate_limiter = crate::rate_limiter::create_rate_limiter(&self.config.server.rate_limit);
122
123        let state = Arc::new(AppState {
124            config: self.config.clone(),
125            sessions: Mutex::new(HashMap::new()),
126            memory,
127            turn_gate: self.turn_gate.clone(),
128            workspace_lock,
129            rate_limiter,
130            bridge_manager: self.bridge_manager.clone(),
131        });
132
133        // Load persisted sessions on startup
134        if let Err(e) = load_persisted_sessions(&state).await {
135            info!("Could not load persisted sessions: {}", e);
136        }
137
138        // Spawn session cleanup task
139        let cleanup_state = state.clone();
140        tokio::spawn(async move {
141            let mut interval = tokio::time::interval(Duration::from_secs(60));
142            loop {
143                interval.tick().await;
144                cleanup_expired_sessions(&cleanup_state).await;
145            }
146        });
147
148        // Spawn session save task (save every 5 minutes)
149        let save_state = state.clone();
150        tokio::spawn(async move {
151            let mut interval = tokio::time::interval(Duration::from_secs(300));
152            loop {
153                interval.tick().await;
154                save_dirty_sessions(&save_state).await;
155            }
156        });
157
158        let cors = if self.config.server.cors_origins.is_empty() {
159            // Default: allow only localhost origins (any port, http/https, IPv4/IPv6)
160            CorsLayer::new()
161                .allow_origin(AllowOrigin::predicate(|origin, _| {
162                    let origin = origin.as_bytes();
163                    is_localhost_origin(origin)
164                }))
165                .allow_methods(Any)
166                .allow_headers(Any)
167        } else {
168            let origins: Vec<axum::http::HeaderValue> = self
169                .config
170                .server
171                .cors_origins
172                .iter()
173                .filter_map(|o| o.parse().ok())
174                .collect();
175            CorsLayer::new()
176                .allow_origin(origins)
177                .allow_methods(Any)
178                .allow_headers(Any)
179        };
180
181        // Public routes (no auth required)
182        let public_routes = Router::new()
183            .route("/", get(serve_ui_index))
184            .route("/ui/{*path}", get(serve_ui_file))
185            .route("/health", get(health_check))
186            .route("/api/auth/status", get(auth_status));
187
188        // OpenAI-compatible API routes (auth required if token configured)
189        let openai_routes = Router::new()
190            .route(
191                "/v1/chat/completions",
192                post(crate::openai_compat::chat_completions),
193            )
194            .route("/v1/models", get(crate::openai_compat::list_models))
195            .layer(middleware::from_fn_with_state(
196                state.clone(),
197                rate_limit_middleware,
198            ))
199            .layer(middleware::from_fn_with_state(
200                state.clone(),
201                auth_middleware,
202            ));
203
204        // Protected API routes (auth required if token configured)
205        let api_routes = Router::new()
206            .route("/api/sessions", post(create_session))
207            .route("/api/sessions", get(list_sessions))
208            .route("/api/sessions/{session_id}", delete(delete_session))
209            .route("/api/sessions/{session_id}", get(get_session_status))
210            .route(
211                "/api/sessions/{session_id}/messages",
212                get(get_session_messages),
213            )
214            .route("/api/sessions/{session_id}/compact", post(compact_session))
215            .route("/api/sessions/{session_id}/clear", post(clear_session))
216            .route("/api/sessions/{session_id}/model", post(set_session_model))
217            .route("/api/chat", post(chat))
218            .route("/api/chat/stream", post(chat_stream))
219            .route("/api/ws", get(websocket_handler))
220            .route("/api/memory/search", get(memory_search))
221            .route("/api/memory/stats", get(memory_stats))
222            .route("/api/memory/reindex", post(memory_reindex))
223            .route("/api/status", get(status))
224            .route("/api/config", get(get_config).post(set_config))
225            .route("/api/heartbeat/status", get(heartbeat_status))
226            .route("/api/bridges", get(list_bridges))
227            .route("/api/channels/status", get(channels_status))
228            .route("/api/saved-sessions", get(list_saved_sessions))
229            .route("/api/saved-sessions/{session_id}", get(get_saved_session))
230            .route("/api/logs/daemon", get(get_daemon_logs))
231            .route(
232                "/api/sessions/{session_id}/approve",
233                post(approve_tool_execution),
234            )
235            .layer(middleware::from_fn_with_state(
236                state.clone(),
237                rate_limit_middleware,
238            ))
239            .layer(middleware::from_fn_with_state(
240                state.clone(),
241                auth_middleware,
242            ));
243
244        let app = public_routes
245            .merge(api_routes)
246            .merge(openai_routes)
247            .layer(RequestBodyLimitLayer::new(
248                self.config.server.max_request_body,
249            ))
250            .layer(cors)
251            .with_state(state);
252
253        let addr: SocketAddr =
254            format!("{}:{}", self.config.server.bind, self.config.server.port).parse()?;
255
256        // TLS mode: generate/load certs and serve HTTPS
257        #[cfg(feature = "tls")]
258        if self.config.server.tls_enabled {
259            let cert_dir = std::path::Path::new(&self.config.server.tls_cert_dir);
260            let cert_paths = crate::tls::certs::ensure_certs(
261                cert_dir,
262                self.config.server.tls_renew_threshold_days,
263            )?;
264
265            info!("Starting HTTPS server on https://{}", addr);
266            info!(
267                "CA certificate: {} (share with clients for trust)",
268                cert_paths.ca_cert.display()
269            );
270
271            let tls_config = axum_server::tls_rustls::RustlsConfig::from_pem_file(
272                &cert_paths.server_cert,
273                &cert_paths.server_key,
274            )
275            .await?;
276
277            // Spawn companion HTTP server for CA cert download on port+1
278            let companion_port = self.config.server.port.saturating_add(1);
279            let companion_addr: SocketAddr =
280                format!("{}:{}", self.config.server.bind, companion_port).parse()?;
281            let ca_cert_path = cert_paths.ca_cert.clone();
282            tokio::spawn(async move {
283                let ca_app = axum::Router::new().route(
284                    "/ca.pem",
285                    get(move || async move {
286                        match tokio::fs::read(&ca_cert_path).await {
287                            Ok(data) => (
288                                StatusCode::OK,
289                                [(header::CONTENT_TYPE, "application/x-pem-file")],
290                                data,
291                            )
292                                .into_response(),
293                            Err(_) => StatusCode::NOT_FOUND.into_response(),
294                        }
295                    }),
296                );
297                info!(
298                    "CA cert download available at http://{}/ca.pem",
299                    companion_addr
300                );
301                if let Ok(listener) = tokio::net::TcpListener::bind(companion_addr).await {
302                    let _ = axum::serve(listener, ca_app).await;
303                }
304            });
305
306            axum_server::bind_rustls(addr, tls_config)
307                .serve(app.into_make_service())
308                .await?;
309
310            return Ok(());
311        }
312
313        // Plain HTTP mode (default)
314        info!("Starting HTTP server on http://{}", addr);
315
316        let listener = tokio::net::TcpListener::bind(addr).await?;
317        axum::serve(listener, app).await?;
318
319        Ok(())
320    }
321}
322
323// Error response type
324struct AppError(StatusCode, String);
325
326impl IntoResponse for AppError {
327    fn into_response(self) -> Response {
328        (self.0, self.1).into_response()
329    }
330}
331
332/// Constant-time byte comparison to prevent timing attacks on auth tokens.
333fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
334    if a.len() != b.len() {
335        return false;
336    }
337    a.iter()
338        .zip(b.iter())
339        .fold(0u8, |acc, (x, y)| acc | (x ^ y))
340        == 0
341}
342
343// Auth middleware for API routes
344async fn auth_middleware(
345    State(state): State<Arc<AppState>>,
346    request: Request,
347    next: Next,
348) -> Result<Response, StatusCode> {
349    // If no token configured, pass through (backward compat)
350    let Some(expected) = &state.config.server.auth_token else {
351        return Ok(next.run(request).await);
352    };
353
354    let auth_header = request
355        .headers()
356        .get("authorization")
357        .and_then(|v| v.to_str().ok());
358
359    match auth_header {
360        Some(h) if h.starts_with("Bearer ") => {
361            let token = &h[7..];
362            // Use constant-time comparison to prevent timing attacks
363            if constant_time_eq(token.as_bytes(), expected.as_bytes()) {
364                Ok(next.run(request).await)
365            } else {
366                debug!("Auth failed: invalid token");
367                Err(StatusCode::UNAUTHORIZED)
368            }
369        }
370        _ => {
371            debug!("Auth failed: missing or invalid Authorization header");
372            Err(StatusCode::UNAUTHORIZED)
373        }
374    }
375}
376
377/// Verify a webhook signature (HMAC-SHA256).
378///
379/// Checks `X-Signature-256` header against HMAC-SHA256(secret, body).
380/// Returns Ok(()) if valid, Err(StatusCode) if invalid.
381/// If no webhook_secret is configured, always passes.
382#[allow(dead_code)]
383pub fn verify_webhook_signature(
384    secret: Option<&str>,
385    signature_header: Option<&str>,
386    body: &[u8],
387) -> Result<(), StatusCode> {
388    let Some(secret) = secret else {
389        return Ok(()); // No secret = no verification (backward compat)
390    };
391
392    let sig_header = signature_header.ok_or_else(|| {
393        debug!("Webhook rejected: missing X-Signature-256 header");
394        StatusCode::UNAUTHORIZED
395    })?;
396
397    // Expected format: sha256=<hex>
398    let expected_hex = sig_header.strip_prefix("sha256=").ok_or_else(|| {
399        debug!("Webhook rejected: X-Signature-256 must start with 'sha256='");
400        StatusCode::UNAUTHORIZED
401    })?;
402
403    use hmac::{Hmac, Mac};
404    use sha2::Sha256;
405    type HmacSha256 = Hmac<Sha256>;
406
407    let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
408        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
409    mac.update(body);
410    let result = mac.finalize();
411
412    let computed_hex: String = result
413        .into_bytes()
414        .iter()
415        .map(|b| format!("{:02x}", b))
416        .collect();
417
418    if constant_time_eq(computed_hex.as_bytes(), expected_hex.as_bytes()) {
419        Ok(())
420    } else {
421        debug!("Webhook rejected: signature mismatch");
422        Err(StatusCode::UNAUTHORIZED)
423    }
424}
425
426// Rate limit middleware for API routes
427async fn rate_limit_middleware(
428    State(state): State<Arc<AppState>>,
429    request: Request,
430    next: Next,
431) -> Result<Response, Response> {
432    let ip = request
433        .extensions()
434        .get::<axum::extract::ConnectInfo<SocketAddr>>()
435        .map(|ci| ci.0.ip())
436        .unwrap_or_else(|| std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
437
438    if !state.rate_limiter.check(ip).await {
439        return Err((
440            StatusCode::TOO_MANY_REQUESTS,
441            [(header::RETRY_AFTER, "60")],
442            "Rate limit exceeded",
443        )
444            .into_response());
445    }
446
447    Ok(next.run(request).await)
448}
449
450// Auth status endpoint (public, tells client if auth is required)
451async fn auth_status(State(state): State<Arc<AppState>>) -> impl IntoResponse {
452    Json(json!({
453        "auth_required": state.config.server.auth_token.is_some()
454    }))
455}
456
457// Session cleanup task
458async fn cleanup_expired_sessions(state: &Arc<AppState>) {
459    let mut sessions = state.sessions.lock().await;
460    let before_count = sessions.len();
461
462    sessions.retain(|id, entry| {
463        let expired = entry.last_accessed.elapsed() > SESSION_TIMEOUT;
464        if expired {
465            debug!("Expiring session: {}", id);
466        }
467        !expired
468    });
469
470    let removed = before_count - sessions.len();
471    if removed > 0 {
472        info!("Cleaned up {} expired sessions", removed);
473    }
474}
475
476// Load persisted sessions from disk
477async fn load_persisted_sessions(state: &Arc<AppState>) -> Result<(), anyhow::Error> {
478    use localgpt_core::agent::list_sessions_for_agent;
479    use std::sync::Arc as StdArc;
480
481    let sessions_list = list_sessions_for_agent(HTTP_AGENT_ID)?;
482    let mut loaded = 0;
483
484    for session_info in sessions_list.into_iter().take(MAX_SESSIONS) {
485        let agent_config = AgentConfig {
486            model: state.config.agent.default_model.clone(),
487            context_window: state.config.agent.context_window,
488            reserve_tokens: state.config.agent.reserve_tokens,
489        };
490
491        let memory = StdArc::new(state.memory.clone());
492        let mut agent = Agent::new(agent_config, &state.config, memory).await?;
493
494        // Try to resume the session
495        if agent.resume_session(&session_info.id).await.is_ok() {
496            let mut sessions = state.sessions.lock().await;
497            sessions.insert(
498                session_info.id.clone(),
499                SessionEntry {
500                    agent,
501                    last_accessed: Instant::now(),
502                    dirty: false,
503                },
504            );
505            loaded += 1;
506        }
507    }
508
509    if loaded > 0 {
510        info!("Loaded {} persisted HTTP sessions", loaded);
511    }
512
513    Ok(())
514}
515
516// Save dirty sessions to disk
517async fn save_dirty_sessions(state: &Arc<AppState>) {
518    let mut sessions = state.sessions.lock().await;
519    let mut saved = 0;
520
521    for (id, entry) in sessions.iter_mut() {
522        if entry.dirty {
523            if let Err(e) = entry.agent.save_session_for_agent(HTTP_AGENT_ID).await {
524                debug!("Failed to save session {}: {}", id, e);
525            } else {
526                entry.dirty = false;
527                saved += 1;
528            }
529        }
530    }
531
532    if saved > 0 {
533        info!("Saved {} HTTP sessions to disk", saved);
534    }
535}
536
537// Get or create a session
538async fn get_or_create_session(
539    state: &Arc<AppState>,
540    session_id: Option<String>,
541) -> Result<String, AppError> {
542    let mut sessions = state.sessions.lock().await;
543
544    // If session_id provided, try to use existing session
545    if let Some(ref id) = session_id
546        && sessions.contains_key(id)
547    {
548        // Update last accessed time
549        if let Some(entry) = sessions.get_mut(id) {
550            entry.last_accessed = Instant::now();
551        }
552        return Ok(id.clone());
553    }
554
555    // Check session limit
556    if sessions.len() >= MAX_SESSIONS {
557        // Try to remove oldest session
558        if let Some(oldest_id) = sessions
559            .iter()
560            .min_by_key(|(_, e)| e.last_accessed)
561            .map(|(id, _)| id.clone())
562        {
563            sessions.remove(&oldest_id);
564            info!("Removed oldest session {} to make room", oldest_id);
565        }
566    }
567
568    // Create new session
569    let new_id = session_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
570
571    let agent_config = AgentConfig {
572        model: state.config.agent.default_model.clone(),
573        context_window: state.config.agent.context_window,
574        reserve_tokens: state.config.agent.reserve_tokens,
575    };
576
577    let memory = std::sync::Arc::new(state.memory.clone());
578    let mut agent = Agent::new(agent_config, &state.config, memory)
579        .await
580        .map_err(|e| AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
581
582    agent
583        .new_session()
584        .await
585        .map_err(|e| AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
586
587    sessions.insert(
588        new_id.clone(),
589        SessionEntry {
590            agent,
591            last_accessed: Instant::now(),
592            dirty: true, // New sessions should be saved
593        },
594    );
595
596    info!("Created new session: {}", new_id);
597    Ok(new_id)
598}
599
600// Health check endpoint
601async fn health_check() -> &'static str {
602    "OK"
603}
604
605// Serve UI index.html at root
606async fn serve_ui_index() -> Response {
607    serve_ui_asset("index.html")
608}
609
610// Serve UI static files
611async fn serve_ui_file(Path(path): Path<String>) -> Response {
612    serve_ui_asset(&path)
613}
614
615// Helper to serve embedded UI assets
616fn serve_ui_asset(path: &str) -> Response {
617    match UiAssets::get(path) {
618        Some(content) => {
619            let mime = match path.rsplit('.').next() {
620                Some("js") => "application/javascript".to_string(),
621                Some("wasm") => "application/wasm".to_string(),
622                _ => mime_guess::from_path(path)
623                    .first_or_octet_stream()
624                    .to_string(),
625            };
626            ([(header::CONTENT_TYPE, mime)], content.data.to_vec()).into_response()
627        }
628        None => (StatusCode::NOT_FOUND, "Not found").into_response(),
629    }
630}
631
632// Status endpoint
633#[derive(Serialize)]
634struct StatusResponse {
635    version: String,
636    model: String,
637    memory_chunks: usize,
638    active_sessions: usize,
639    is_brand_new: bool,
640}
641
642async fn status(State(state): State<Arc<AppState>>) -> Json<StatusResponse> {
643    let sessions = state.sessions.lock().await;
644
645    Json(StatusResponse {
646        version: env!("CARGO_PKG_VERSION").to_string(),
647        model: state.config.agent.default_model.clone(),
648        memory_chunks: state.memory.chunk_count().unwrap_or(0),
649        active_sessions: sessions.len(),
650        is_brand_new: state.memory.is_brand_new(),
651    })
652}
653
654async fn list_bridges(
655    State(state): State<Arc<AppState>>,
656) -> Json<Vec<crate::security::bridge::BridgeStatus>> {
657    Json(state.bridge_manager.get_active_bridges().await)
658}
659
660#[derive(serde::Serialize)]
661struct ChannelStatus {
662    name: String,
663    state: String,
664    connected_since: Option<String>,
665    last_active: Option<String>,
666    health: String,
667}
668
669#[derive(serde::Serialize)]
670struct ChannelsSummary {
671    total: usize,
672    connected: usize,
673    disconnected: usize,
674    degraded: usize,
675}
676
677#[derive(serde::Serialize)]
678struct ChannelsStatusResponse {
679    channels: Vec<ChannelStatus>,
680    summary: ChannelsSummary,
681}
682
683async fn channels_status(State(state): State<Arc<AppState>>) -> Json<ChannelsStatusResponse> {
684    let bridges = state.bridge_manager.get_active_bridges().await;
685
686    let channels: Vec<ChannelStatus> = bridges
687        .iter()
688        .map(|b| {
689            let name = b
690                .bridge_id
691                .as_deref()
692                .unwrap_or(&b.connection_id)
693                .to_string();
694            let health_str = format!("{:?}", b.health).to_lowercase();
695            let channel_state = match b.health {
696                crate::security::bridge::HealthStatus::Healthy => "connected",
697                crate::security::bridge::HealthStatus::Degraded => "degraded",
698                crate::security::bridge::HealthStatus::Unhealthy => "disconnected",
699            };
700
701            ChannelStatus {
702                name,
703                state: channel_state.to_string(),
704                connected_since: Some(b.connected_at.to_rfc3339()),
705                last_active: Some(b.last_active.to_rfc3339()),
706                health: health_str,
707            }
708        })
709        .collect();
710
711    let connected = channels.iter().filter(|c| c.state == "connected").count();
712    let degraded = channels.iter().filter(|c| c.state == "degraded").count();
713    let disconnected = channels
714        .iter()
715        .filter(|c| c.state == "disconnected")
716        .count();
717
718    Json(ChannelsStatusResponse {
719        summary: ChannelsSummary {
720            total: channels.len(),
721            connected,
722            disconnected,
723            degraded,
724        },
725        channels,
726    })
727}
728
729// Session management endpoints
730#[derive(Deserialize)]
731struct CreateSessionRequest {
732    session_id: Option<String>,
733}
734
735#[derive(Serialize)]
736struct SessionResponse {
737    session_id: String,
738    model: String,
739}
740
741async fn create_session(
742    State(state): State<Arc<AppState>>,
743    Json(request): Json<CreateSessionRequest>,
744) -> Response {
745    match get_or_create_session(&state, request.session_id).await {
746        Ok(session_id) => Json(SessionResponse {
747            session_id,
748            model: state.config.agent.default_model.clone(),
749        })
750        .into_response(),
751        Err(e) => e.into_response(),
752    }
753}
754
755#[derive(Serialize)]
756struct SessionInfo {
757    session_id: String,
758    idle_seconds: u64,
759}
760
761#[derive(Serialize)]
762struct ListSessionsResponse {
763    sessions: Vec<SessionInfo>,
764}
765
766async fn list_sessions(State(state): State<Arc<AppState>>) -> Json<ListSessionsResponse> {
767    let sessions = state.sessions.lock().await;
768
769    let session_list: Vec<SessionInfo> = sessions
770        .iter()
771        .map(|(id, entry)| SessionInfo {
772            session_id: id.clone(),
773            idle_seconds: entry.last_accessed.elapsed().as_secs(),
774        })
775        .collect();
776
777    Json(ListSessionsResponse {
778        sessions: session_list,
779    })
780}
781
782// Delete a session
783async fn delete_session(
784    State(state): State<Arc<AppState>>,
785    Path(session_id): Path<String>,
786) -> Response {
787    let mut sessions = state.sessions.lock().await;
788
789    if sessions.remove(&session_id).is_some() {
790        info!("Deleted session: {}", session_id);
791        Json(json!({"deleted": true, "session_id": session_id})).into_response()
792    } else {
793        AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response()
794    }
795}
796
797// Get session status
798#[derive(Serialize)]
799struct SessionStatusResponse {
800    session_id: String,
801    model: String,
802    message_count: usize,
803    token_count: usize,
804    idle_seconds: u64,
805    api_input_tokens: u64,
806    api_output_tokens: u64,
807    api_cost_usd: f64,
808    search_queries: u64,
809    search_cached_hits: u64,
810    search_cost_usd: f64,
811}
812
813async fn get_session_status(
814    State(state): State<Arc<AppState>>,
815    Path(session_id): Path<String>,
816) -> Response {
817    let sessions = state.sessions.lock().await;
818
819    match sessions.get(&session_id) {
820        Some(entry) => {
821            let status = entry.agent.session_status();
822            Json(SessionStatusResponse {
823                session_id,
824                model: entry.agent.model().to_string(),
825                message_count: status.message_count,
826                token_count: status.token_count,
827                idle_seconds: entry.last_accessed.elapsed().as_secs(),
828                api_input_tokens: status.api_input_tokens,
829                api_output_tokens: status.api_output_tokens,
830                api_cost_usd: status.api_cost_usd,
831                search_queries: status.search_queries,
832                search_cached_hits: status.search_cached_hits,
833                search_cost_usd: status.search_cost_usd,
834            })
835            .into_response()
836        }
837        None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
838    }
839}
840
841// Get session messages - returns message history for an active session
842#[derive(Serialize)]
843struct ActiveSessionMessage {
844    role: String,
845    content: Option<String>,
846    tool_calls: Option<Vec<serde_json::Value>>,
847    tool_call_id: Option<String>,
848    timestamp: u64,
849}
850
851#[derive(Serialize)]
852struct SessionMessagesResponse {
853    session_id: String,
854    messages: Vec<ActiveSessionMessage>,
855}
856
857async fn get_session_messages(
858    State(state): State<Arc<AppState>>,
859    Path(session_id): Path<String>,
860) -> Response {
861    let mut sessions = state.sessions.lock().await;
862
863    match sessions.get_mut(&session_id) {
864        Some(entry) => {
865            entry.last_accessed = Instant::now();
866
867            let messages: Vec<ActiveSessionMessage> = entry
868                .agent
869                .raw_session_messages()
870                .iter()
871                .map(|sm| {
872                    let role = match sm.message.role {
873                        localgpt_core::agent::Role::User => "user",
874                        localgpt_core::agent::Role::Assistant => "assistant",
875                        localgpt_core::agent::Role::System => "system",
876                        localgpt_core::agent::Role::Tool => "toolResult",
877                    };
878
879                    // Convert tool calls to JSON
880                    let tool_calls = sm.message.tool_calls.as_ref().map(|tcs| {
881                        tcs.iter()
882                            .map(|tc| {
883                                json!({
884                                    "id": tc.id,
885                                    "name": tc.name,
886                                    "arguments": tc.arguments
887                                })
888                            })
889                            .collect()
890                    });
891
892                    ActiveSessionMessage {
893                        role: role.to_string(),
894                        content: if sm.message.content.is_empty() {
895                            None
896                        } else {
897                            Some(sm.message.content.clone())
898                        },
899                        tool_calls,
900                        tool_call_id: sm.message.tool_call_id.clone(),
901                        timestamp: sm.timestamp,
902                    }
903                })
904                .collect();
905
906            Json(SessionMessagesResponse {
907                session_id,
908                messages,
909            })
910            .into_response()
911        }
912        None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
913    }
914}
915
916// Compact session history
917async fn compact_session(
918    State(state): State<Arc<AppState>>,
919    Path(session_id): Path<String>,
920) -> Response {
921    let mut sessions = state.sessions.lock().await;
922
923    match sessions.get_mut(&session_id) {
924        Some(entry) => {
925            entry.last_accessed = Instant::now();
926
927            match entry.agent.compact_session().await {
928                Ok((before, after)) => Json(json!({
929                    "session_id": session_id,
930                    "token_count_before": before,
931                    "token_count_after": after,
932                }))
933                .into_response(),
934                Err(e) => {
935                    AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
936                }
937            }
938        }
939        None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
940    }
941}
942
943// Clear session history
944async fn clear_session(
945    State(state): State<Arc<AppState>>,
946    Path(session_id): Path<String>,
947) -> Response {
948    let mut sessions = state.sessions.lock().await;
949
950    match sessions.get_mut(&session_id) {
951        Some(entry) => {
952            entry.last_accessed = Instant::now();
953            entry.agent.clear_session();
954            Json(json!({"session_id": session_id, "cleared": true})).into_response()
955        }
956        None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
957    }
958}
959
960// Set session model
961#[derive(Deserialize)]
962struct SetModelRequest {
963    model: String,
964}
965
966async fn set_session_model(
967    State(state): State<Arc<AppState>>,
968    Path(session_id): Path<String>,
969    Json(request): Json<SetModelRequest>,
970) -> Response {
971    let mut sessions = state.sessions.lock().await;
972
973    match sessions.get_mut(&session_id) {
974        Some(entry) => {
975            entry.last_accessed = Instant::now();
976
977            match entry.agent.set_model(&request.model) {
978                Ok(()) => Json(json!({
979                    "session_id": session_id,
980                    "model": request.model,
981                }))
982                .into_response(),
983                Err(e) => AppError(StatusCode::BAD_REQUEST, e.to_string()).into_response(),
984            }
985        }
986        None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
987    }
988}
989
990// Chat endpoint
991#[derive(Deserialize)]
992struct ChatRequest {
993    message: String,
994    session_id: Option<String>,
995    /// Optional model to use for this request (switches session model)
996    model: Option<String>,
997}
998
999#[derive(Serialize)]
1000struct ChatResponse {
1001    response: String,
1002    session_id: String,
1003    model: String,
1004}
1005
1006async fn chat(State(state): State<Arc<AppState>>, Json(request): Json<ChatRequest>) -> Response {
1007    // Get or create session
1008    let session_id = match get_or_create_session(&state, request.session_id).await {
1009        Ok(id) => id,
1010        Err(e) => return e.into_response(),
1011    };
1012
1013    // Acquire in-process turn gate (waits for other turns to finish)
1014    let _gate_permit = state.turn_gate.acquire().await;
1015
1016    // Acquire cross-process workspace lock (blocking, so use spawn_blocking)
1017    let ws_lock_path = state.workspace_lock.clone();
1018    let ws_guard = match tokio::task::spawn_blocking(move || ws_lock_path.acquire()).await {
1019        Ok(Ok(guard)) => guard,
1020        Ok(Err(e)) => {
1021            return AppError(
1022                StatusCode::INTERNAL_SERVER_ERROR,
1023                format!("Failed to acquire workspace lock: {}", e),
1024            )
1025            .into_response();
1026        }
1027        Err(e) => {
1028            return AppError(
1029                StatusCode::INTERNAL_SERVER_ERROR,
1030                format!("Lock task error: {}", e),
1031            )
1032            .into_response();
1033        }
1034    };
1035
1036    // Get agent from session
1037    let mut sessions = state.sessions.lock().await;
1038    let entry = match sessions.get_mut(&session_id) {
1039        Some(e) => e,
1040        None => {
1041            return AppError(StatusCode::NOT_FOUND, "Session not found".to_string())
1042                .into_response();
1043        }
1044    };
1045
1046    entry.last_accessed = Instant::now();
1047
1048    // Switch model if requested
1049    if let Some(ref model) = request.model
1050        && let Err(e) = entry.agent.set_model(model)
1051    {
1052        return AppError(StatusCode::BAD_REQUEST, format!("Invalid model: {}", e)).into_response();
1053    }
1054
1055    let result = entry.agent.chat(&request.message).await;
1056
1057    // Release workspace lock explicitly before returning
1058    drop(ws_guard);
1059
1060    match result {
1061        Ok(response) => {
1062            entry.dirty = true;
1063            Json(ChatResponse {
1064                response,
1065                session_id,
1066                model: entry.agent.model().to_string(),
1067            })
1068            .into_response()
1069        }
1070        Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
1071    }
1072}
1073
1074// Streaming chat endpoint (SSE) with tool support
1075async fn chat_stream(
1076    State(state): State<Arc<AppState>>,
1077    Json(request): Json<ChatRequest>,
1078) -> Response {
1079    // Get or create session first (outside the stream)
1080    let session_id = match get_or_create_session(&state, request.session_id).await {
1081        Ok(id) => id,
1082        Err(e) => return e.into_response(),
1083    };
1084
1085    let state_clone = state.clone();
1086    let message = request.message.clone();
1087
1088    let stream = async_stream::stream! {
1089        // Send session_id first
1090        yield Ok::<Event, Infallible>(Event::default().data(json!({"type": "session", "session_id": session_id}).to_string()));
1091
1092        // Acquire in-process turn gate
1093        let _gate_permit = state_clone.turn_gate.acquire().await;
1094
1095        // Acquire cross-process workspace lock
1096        let ws_lock = state_clone.workspace_lock.clone();
1097        let _ws_guard = match tokio::task::spawn_blocking(move || ws_lock.acquire()).await {
1098            Ok(Ok(guard)) => Some(guard),
1099            Ok(Err(e)) => {
1100                yield Ok(Event::default().data(json!({"error": format!("Workspace lock error: {}", e)}).to_string()));
1101                return;
1102            }
1103            Err(e) => {
1104                yield Ok(Event::default().data(json!({"error": format!("Lock task error: {}", e)}).to_string()));
1105                return;
1106            }
1107        };
1108
1109        let mut sessions = state_clone.sessions.lock().await;
1110        let entry = match sessions.get_mut(&session_id) {
1111            Some(e) => e,
1112            None => {
1113                yield Ok(Event::default().data(json!({"error": "Session not found"}).to_string()));
1114                return;
1115            }
1116        };
1117
1118        entry.last_accessed = Instant::now();
1119        entry.dirty = true;
1120
1121        // Use streaming with tools
1122        match entry.agent.chat_stream_with_tools(&message, Vec::new()).await {
1123            Ok(event_stream) => {
1124                use futures::StreamExt;
1125
1126                // Pin the stream to iterate over it
1127                let mut pinned_stream = std::pin::pin!(event_stream);
1128
1129                while let Some(event) = pinned_stream.next().await {
1130                    match event {
1131                        Ok(StreamEvent::Content(content)) => {
1132                            let data = json!({"type": "content", "delta": content});
1133                            yield Ok(Event::default().data(data.to_string()));
1134                        }
1135                        Ok(StreamEvent::ToolCallStart { name, id, arguments }) => {
1136                            let detail = extract_tool_detail(&name, &arguments);
1137                            let data = json!({"type": "tool_start", "name": name, "id": id, "detail": detail});
1138                            yield Ok(Event::default().data(data.to_string()));
1139                        }
1140                        Ok(StreamEvent::ToolCallEnd { name, id, output, warnings }) => {
1141                            let data = json!({
1142                                "type": "tool_end",
1143                                "name": name,
1144                                "id": id,
1145                                "output": output.chars().take(500).collect::<String>(),
1146                                "warnings": warnings
1147                            });
1148                            yield Ok(Event::default().data(data.to_string()));
1149                        }
1150                        Ok(StreamEvent::Done) => {
1151                            let data = json!({"type": "done"});
1152                            yield Ok(Event::default().data(data.to_string()));
1153                        }
1154                        Ok(StreamEvent::ApprovalRequired { request_id, tool_name, arguments, level }) => {
1155                            let data = json!({
1156                                "type": "approval_required",
1157                                "request_id": request_id,
1158                                "tool_name": tool_name,
1159                                "arguments": arguments,
1160                                "level": level,
1161                            });
1162                            yield Ok(Event::default().data(data.to_string()));
1163                        }
1164                        Err(e) => {
1165                            yield Ok(Event::default().data(json!({"error": e.to_string()}).to_string()));
1166                            break;
1167                        }
1168                    }
1169                }
1170            }
1171            Err(e) => {
1172                yield Ok(Event::default().data(json!({"error": e.to_string()}).to_string()));
1173            }
1174        }
1175
1176        yield Ok(Event::default().data("[DONE]"));
1177    };
1178
1179    Sse::new(stream).into_response()
1180}
1181
1182// Memory search endpoint
1183#[derive(Deserialize)]
1184struct SearchQuery {
1185    q: String,
1186    limit: Option<usize>,
1187}
1188
1189#[derive(Serialize)]
1190struct SearchResult {
1191    file: String,
1192    line_start: i32,
1193    line_end: i32,
1194    content: String,
1195    score: f64,
1196}
1197
1198#[derive(Serialize)]
1199struct SearchResponse {
1200    results: Vec<SearchResult>,
1201    query: String,
1202}
1203
1204async fn memory_search(
1205    State(state): State<Arc<AppState>>,
1206    Query(query): Query<SearchQuery>,
1207) -> Response {
1208    // Reject excessively long queries to prevent DoS
1209    if query.q.len() > 1000 {
1210        return AppError(StatusCode::BAD_REQUEST, "Query too long".to_string()).into_response();
1211    }
1212    match memory_search_inner(&state.memory, &query.q, query.limit) {
1213        Ok(response) => Json(response).into_response(),
1214        Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
1215    }
1216}
1217
1218fn memory_search_inner(
1219    memory: &MemoryManager,
1220    query: &str,
1221    limit: Option<usize>,
1222) -> Result<SearchResponse, anyhow::Error> {
1223    let limit = limit.unwrap_or(10).min(100);
1224    let results = memory.search(query, limit)?;
1225
1226    let results: Vec<SearchResult> = results
1227        .into_iter()
1228        .map(|r| SearchResult {
1229            file: r.file,
1230            line_start: r.line_start,
1231            line_end: r.line_end,
1232            content: r.content,
1233            score: r.score,
1234        })
1235        .collect();
1236
1237    Ok(SearchResponse {
1238        results,
1239        query: query.to_string(),
1240    })
1241}
1242
1243// Memory stats endpoint
1244#[derive(Serialize)]
1245struct StatsResponse {
1246    workspace: String,
1247    total_files: usize,
1248    total_chunks: usize,
1249    index_size_kb: u64,
1250}
1251
1252async fn memory_stats(State(state): State<Arc<AppState>>) -> Response {
1253    match memory_stats_inner(&state.memory) {
1254        Ok(response) => Json(response).into_response(),
1255        Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
1256    }
1257}
1258
1259fn memory_stats_inner(memory: &MemoryManager) -> Result<StatsResponse, anyhow::Error> {
1260    let stats = memory.stats()?;
1261
1262    Ok(StatsResponse {
1263        workspace: stats.workspace,
1264        total_files: stats.total_files,
1265        total_chunks: stats.total_chunks,
1266        index_size_kb: stats.index_size_kb,
1267    })
1268}
1269
1270// Memory reindex endpoint
1271#[derive(Deserialize)]
1272struct ReindexRequest {
1273    #[serde(default)]
1274    force: bool,
1275}
1276
1277#[derive(Serialize)]
1278struct ReindexResponse {
1279    files_processed: usize,
1280    files_updated: usize,
1281    chunks_indexed: usize,
1282    duration_ms: u128,
1283}
1284
1285async fn memory_reindex(
1286    State(state): State<Arc<AppState>>,
1287    Json(request): Json<ReindexRequest>,
1288) -> Response {
1289    // Run reindex in blocking task since it uses sqlite
1290    let memory = state.memory.clone();
1291    let force = request.force;
1292
1293    match tokio::task::spawn_blocking(move || memory_reindex_inner(&memory, force)).await {
1294        Ok(Ok(response)) => Json(response).into_response(),
1295        Ok(Err(e)) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
1296        Err(e) => AppError(
1297            StatusCode::INTERNAL_SERVER_ERROR,
1298            format!("Task error: {}", e),
1299        )
1300        .into_response(),
1301    }
1302}
1303
1304fn memory_reindex_inner(
1305    memory: &MemoryManager,
1306    force: bool,
1307) -> Result<ReindexResponse, anyhow::Error> {
1308    let stats = memory.reindex(force)?;
1309
1310    Ok(ReindexResponse {
1311        files_processed: stats.files_processed,
1312        files_updated: stats.files_updated,
1313        chunks_indexed: stats.chunks_indexed,
1314        duration_ms: stats.duration.as_millis(),
1315    })
1316}
1317
1318// Config endpoint - show current configuration (safe subset)
1319#[derive(Serialize)]
1320struct ConfigResponse {
1321    agent: AgentConfigInfo,
1322    server: ServerConfigInfo,
1323    memory: MemoryConfigInfo,
1324    heartbeat: HeartbeatConfigInfo,
1325}
1326
1327#[derive(Serialize)]
1328struct AgentConfigInfo {
1329    default_model: String,
1330    context_window: usize,
1331    reserve_tokens: usize,
1332}
1333
1334#[derive(Serialize)]
1335struct ServerConfigInfo {
1336    port: u16,
1337    bind: String,
1338}
1339
1340#[derive(Serialize)]
1341struct MemoryConfigInfo {
1342    workspace: String,
1343    embedding_model: String,
1344    chunk_size: usize,
1345    chunk_overlap: usize,
1346}
1347
1348#[derive(Serialize)]
1349struct HeartbeatConfigInfo {
1350    enabled: bool,
1351    interval: String,
1352}
1353
1354async fn get_config(State(state): State<Arc<AppState>>) -> Json<ConfigResponse> {
1355    Json(ConfigResponse {
1356        agent: AgentConfigInfo {
1357            default_model: state.config.agent.default_model.clone(),
1358            context_window: state.config.agent.context_window,
1359            reserve_tokens: state.config.agent.reserve_tokens,
1360        },
1361        server: ServerConfigInfo {
1362            port: state.config.server.port,
1363            bind: state.config.server.bind.clone(),
1364        },
1365        memory: MemoryConfigInfo {
1366            workspace: state.config.memory.workspace.clone(),
1367            embedding_model: state.config.memory.embedding_model.clone(),
1368            chunk_size: state.config.memory.chunk_size,
1369            chunk_overlap: state.config.memory.chunk_overlap,
1370        },
1371        heartbeat: HeartbeatConfigInfo {
1372            enabled: state.config.heartbeat.enabled,
1373            interval: state.config.heartbeat.interval.clone(),
1374        },
1375    })
1376}
1377
1378#[derive(Deserialize)]
1379struct SetConfigRequest {
1380    key: String,
1381    value: String,
1382}
1383
1384#[derive(Serialize)]
1385struct SetConfigResponse {
1386    success: bool,
1387    key: String,
1388    value: String,
1389}
1390
1391async fn set_config(
1392    State(state): State<Arc<AppState>>,
1393    Json(req): Json<SetConfigRequest>,
1394) -> std::result::Result<Json<SetConfigResponse>, (axum::http::StatusCode, String)> {
1395    // Validate key exists by trying to get current value
1396    let current = state.config.get_value(&req.key);
1397    if current.is_err() {
1398        return Err((
1399            axum::http::StatusCode::BAD_REQUEST,
1400            format!("Unknown config key: {}", req.key),
1401        ));
1402    }
1403
1404    // Apply the change to config file on disk
1405    let config_path = state.config.paths.config_file();
1406    if config_path.exists() {
1407        let content = std::fs::read_to_string(&config_path).map_err(|e| {
1408            (
1409                axum::http::StatusCode::INTERNAL_SERVER_ERROR,
1410                format!("Failed to read config: {}", e),
1411            )
1412        })?;
1413
1414        let new_content = apply_config_value(&content, &req.key, &req.value);
1415
1416        std::fs::write(&config_path, new_content).map_err(|e| {
1417            (
1418                axum::http::StatusCode::INTERNAL_SERVER_ERROR,
1419                format!("Failed to write config: {}", e),
1420            )
1421        })?;
1422    }
1423
1424    Ok(Json(SetConfigResponse {
1425        success: true,
1426        key: req.key,
1427        value: req.value,
1428    }))
1429}
1430
1431/// Apply a config key=value change to raw TOML content.
1432/// Finds the matching key in the correct section and updates it.
1433fn apply_config_value(content: &str, key: &str, value: &str) -> String {
1434    let parts: Vec<&str> = key.split('.').collect();
1435    if parts.len() != 2 {
1436        return content.to_string();
1437    }
1438
1439    let (section, field) = (parts[0], parts[1]);
1440    let section_header = format!("[{}]", section);
1441
1442    let mut result = String::with_capacity(content.len());
1443    let mut in_target_section = false;
1444    let mut value_set = false;
1445
1446    for line in content.lines() {
1447        let trimmed = line.trim();
1448
1449        // Track which section we're in
1450        if trimmed.starts_with('[') && !trimmed.starts_with("[[") {
1451            in_target_section = trimmed == section_header;
1452        }
1453
1454        // Replace matching key in target section
1455        if in_target_section
1456            && !value_set
1457            && trimmed.starts_with(field)
1458            && let Some((key_part, _)) = trimmed.split_once('=')
1459            && key_part.trim() == field
1460        {
1461            let indent = &line[..line.len() - trimmed.len()];
1462            // Determine if value needs quoting
1463            let formatted = if value.parse::<f64>().is_ok() || value == "true" || value == "false" {
1464                format!("{}{} = {}", indent, field, value)
1465            } else {
1466                format!("{}{} = \"{}\"", indent, field, value)
1467            };
1468            result.push_str(&formatted);
1469            result.push('\n');
1470            value_set = true;
1471            continue;
1472        }
1473
1474        result.push_str(line);
1475        result.push('\n');
1476    }
1477
1478    result
1479}
1480
1481// Heartbeat status endpoint
1482#[derive(Serialize)]
1483struct HeartbeatStatusResponse {
1484    enabled: bool,
1485    interval: String,
1486    last_event: Option<HeartbeatEventInfo>,
1487}
1488
1489#[derive(Serialize)]
1490struct HeartbeatEventInfo {
1491    ts: u64,
1492    status: String,
1493    duration_ms: u64,
1494    preview: Option<String>,
1495    reason: Option<String>,
1496    age_seconds: u64,
1497}
1498
1499async fn heartbeat_status(State(state): State<Arc<AppState>>) -> Json<HeartbeatStatusResponse> {
1500    let last_event = get_last_heartbeat_event().map(|event| {
1501        let now_ms = std::time::SystemTime::now()
1502            .duration_since(std::time::UNIX_EPOCH)
1503            .map(|d| d.as_millis() as u64)
1504            .unwrap_or(0);
1505        let age_seconds = (now_ms.saturating_sub(event.ts)) / 1000;
1506
1507        let status = match event.status {
1508            HeartbeatStatus::Sent => "sent",
1509            HeartbeatStatus::Ok => "ok",
1510            HeartbeatStatus::Skipped => "skipped",
1511            HeartbeatStatus::SkippedMayTry => "skipped",
1512            HeartbeatStatus::Failed => "failed",
1513            HeartbeatStatus::TimedOut => "timed_out",
1514        };
1515
1516        HeartbeatEventInfo {
1517            ts: event.ts,
1518            status: status.to_string(),
1519            duration_ms: event.duration_ms,
1520            preview: event.preview,
1521            reason: event.reason,
1522            age_seconds,
1523        }
1524    });
1525
1526    Json(HeartbeatStatusResponse {
1527        enabled: state.config.heartbeat.enabled,
1528        interval: state.config.heartbeat.interval.clone(),
1529        last_event,
1530    })
1531}
1532
1533// Saved sessions endpoint - list sessions from file store
1534#[derive(Serialize)]
1535struct SavedSessionInfo {
1536    id: String,
1537    message_count: usize,
1538    created_at: String,
1539}
1540
1541#[derive(Serialize)]
1542struct SavedSessionsResponse {
1543    sessions: Vec<SavedSessionInfo>,
1544}
1545
1546async fn list_saved_sessions(State(_state): State<Arc<AppState>>) -> Response {
1547    use localgpt_core::agent::list_sessions_for_agent;
1548
1549    match list_sessions_for_agent(HTTP_AGENT_ID) {
1550        Ok(sessions) => {
1551            let session_list: Vec<SavedSessionInfo> = sessions
1552                .into_iter()
1553                .map(|s| SavedSessionInfo {
1554                    id: s.id,
1555                    message_count: s.message_count,
1556                    created_at: s.created_at.format("%Y-%m-%dT%H:%M:%S").to_string(),
1557                })
1558                .collect();
1559
1560            Json(SavedSessionsResponse {
1561                sessions: session_list,
1562            })
1563            .into_response()
1564        }
1565        Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
1566    }
1567}
1568
1569// Get saved session detail - read and parse JSONL session file
1570#[derive(Serialize)]
1571struct SavedSessionMessage {
1572    role: String,
1573    content: Option<String>,
1574    tool_calls: Option<Vec<serde_json::Value>>,
1575    tool_call_id: Option<String>,
1576    timestamp: Option<u64>,
1577}
1578
1579#[derive(Serialize)]
1580struct SavedSessionDetail {
1581    session_id: String,
1582    created_at: String,
1583    messages: Vec<SavedSessionMessage>,
1584}
1585
1586async fn get_saved_session(Path(session_id): Path<String>) -> Response {
1587    use localgpt_core::agent::get_sessions_dir_for_agent;
1588    use std::fs::File;
1589    use std::io::{BufRead, BufReader};
1590
1591    // Validate session_id uses only safe characters (alphanumeric, hyphens, underscores)
1592    if !session_id
1593        .chars()
1594        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1595    {
1596        return AppError(StatusCode::BAD_REQUEST, "Invalid session ID".to_string()).into_response();
1597    }
1598
1599    let sessions_dir = match get_sessions_dir_for_agent(HTTP_AGENT_ID) {
1600        Ok(dir) => dir,
1601        Err(e) => {
1602            return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
1603        }
1604    };
1605
1606    let session_path = sessions_dir.join(format!("{}.jsonl", session_id));
1607
1608    if !session_path.exists() {
1609        return AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response();
1610    }
1611
1612    let file = match File::open(&session_path) {
1613        Ok(f) => f,
1614        Err(e) => {
1615            return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
1616        }
1617    };
1618
1619    let reader = BufReader::new(file);
1620    let mut messages = Vec::new();
1621    let mut created_at = String::new();
1622
1623    for (i, line) in reader.lines().enumerate() {
1624        let line = match line {
1625            Ok(l) => l,
1626            Err(_) => continue,
1627        };
1628
1629        let parsed: serde_json::Value = match serde_json::from_str(&line) {
1630            Ok(v) => v,
1631            Err(_) => continue,
1632        };
1633
1634        // First line is session header
1635        if i == 0 && parsed["type"].as_str() == Some("session") {
1636            created_at = parsed["timestamp"].as_str().unwrap_or("").to_string();
1637            continue;
1638        }
1639
1640        // Parse message entries
1641        if parsed["type"].as_str() == Some("message")
1642            && let Some(msg) = parsed.get("message")
1643        {
1644            let role = msg["role"].as_str().unwrap_or("unknown").to_string();
1645
1646            // Extract text content
1647            let content = if let Some(content_arr) = msg["content"].as_array() {
1648                content_arr
1649                    .iter()
1650                    .filter_map(|c| {
1651                        if c["type"].as_str() == Some("text") {
1652                            c["text"].as_str().map(String::from)
1653                        } else {
1654                            None
1655                        }
1656                    })
1657                    .collect::<Vec<_>>()
1658                    .join("\n")
1659            } else if let Some(text) = msg["content"].as_str() {
1660                text.to_string()
1661            } else {
1662                String::new()
1663            };
1664
1665            // Extract tool calls
1666            let tool_calls = msg["toolCalls"].as_array().cloned();
1667
1668            // Extract tool result ID
1669            let tool_call_id = msg["toolCallId"].as_str().map(String::from);
1670
1671            let timestamp = msg["timestamp"].as_u64();
1672
1673            messages.push(SavedSessionMessage {
1674                role,
1675                content: if content.is_empty() {
1676                    None
1677                } else {
1678                    Some(content)
1679                },
1680                tool_calls,
1681                tool_call_id,
1682                timestamp,
1683            });
1684        }
1685    }
1686
1687    Json(SavedSessionDetail {
1688        session_id,
1689        created_at,
1690        messages,
1691    })
1692    .into_response()
1693}
1694
1695// Daemon logs endpoint - read log file
1696#[derive(Deserialize)]
1697struct LogsQuery {
1698    lines: Option<usize>,
1699}
1700
1701#[derive(Serialize)]
1702struct DaemonLogsResponse {
1703    lines: Vec<String>,
1704    total_lines: usize,
1705    file_size_bytes: u64,
1706}
1707
1708async fn get_daemon_logs(Query(query): Query<LogsQuery>) -> Response {
1709    use localgpt_core::agent::get_state_dir;
1710    use std::fs::File;
1711    use std::io::{BufRead, BufReader};
1712
1713    let lines_requested = query.lines.unwrap_or(200).min(1000);
1714
1715    let state_dir = match get_state_dir() {
1716        Ok(dir) => dir,
1717        Err(e) => {
1718            return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
1719        }
1720    };
1721
1722    // Use date-based log file (matches daemon.rs)
1723    let date = chrono::Local::now().format("%Y-%m-%d");
1724    let log_path = state_dir
1725        .join("logs")
1726        .join(format!("localgpt-{}.log", date));
1727
1728    if !log_path.exists() {
1729        return Json(DaemonLogsResponse {
1730            lines: vec![],
1731            total_lines: 0,
1732            file_size_bytes: 0,
1733        })
1734        .into_response();
1735    }
1736
1737    let metadata = match std::fs::metadata(&log_path) {
1738        Ok(m) => m,
1739        Err(e) => {
1740            return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
1741        }
1742    };
1743
1744    let file = match File::open(&log_path) {
1745        Ok(f) => f,
1746        Err(e) => {
1747            return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
1748        }
1749    };
1750
1751    let reader = BufReader::new(file);
1752    let all_lines: Vec<String> = reader.lines().map_while(Result::ok).collect();
1753    let total_lines = all_lines.len();
1754
1755    // Get last N lines
1756    let lines: Vec<String> = if total_lines > lines_requested {
1757        all_lines[(total_lines - lines_requested)..].to_vec()
1758    } else {
1759        all_lines
1760    };
1761
1762    Json(DaemonLogsResponse {
1763        lines,
1764        total_lines,
1765        file_size_bytes: metadata.len(),
1766    })
1767    .into_response()
1768}
1769
1770// WebSocket handler
1771async fn websocket_handler(
1772    ws: WebSocketUpgrade,
1773    State(state): State<Arc<AppState>>,
1774) -> impl IntoResponse {
1775    // Limit WebSocket message size to 1MB to prevent DoS
1776    ws.max_message_size(1024 * 1024)
1777        .on_upgrade(|socket| handle_websocket(socket, state))
1778}
1779
1780/// WebSocket message types
1781#[derive(Deserialize)]
1782#[serde(tag = "type")]
1783enum WsIncoming {
1784    /// Start or resume a session
1785    #[serde(rename = "session")]
1786    Session { session_id: Option<String> },
1787    /// Chat message (uses tool loop, returns complete response)
1788    /// For streaming, use the SSE endpoint at /api/chat/stream
1789    #[serde(rename = "chat")]
1790    Chat { message: String },
1791    /// Ping for keepalive
1792    #[serde(rename = "ping")]
1793    Ping,
1794}
1795
1796#[derive(Serialize)]
1797#[serde(tag = "type")]
1798#[allow(dead_code)] // ToolStart/ToolEnd reserved for streaming with tools
1799enum WsOutgoing {
1800    /// Connection established
1801    #[serde(rename = "connected")]
1802    Connected { session_id: String },
1803    /// Text content chunk
1804    #[serde(rename = "content")]
1805    Content { delta: String },
1806    /// Tool call started
1807    #[serde(rename = "tool_start")]
1808    ToolStart { name: String, id: String },
1809    /// Tool call completed
1810    #[serde(rename = "tool_end")]
1811    ToolEnd {
1812        name: String,
1813        id: String,
1814        output: String,
1815    },
1816    /// Message complete
1817    #[serde(rename = "done")]
1818    Done,
1819    /// Pong response
1820    #[serde(rename = "pong")]
1821    Pong,
1822    /// Error
1823    #[serde(rename = "error")]
1824    Error { message: String },
1825}
1826
1827async fn handle_websocket(socket: WebSocket, state: Arc<AppState>) {
1828    let (mut sender, mut receiver) = socket.split();
1829
1830    debug!("WebSocket client connected");
1831
1832    // Track current session for this connection
1833    let mut current_session_id: Option<String> = None;
1834
1835    // Process incoming messages
1836    while let Some(msg) = receiver.next().await {
1837        match msg {
1838            Ok(WsMessage::Text(text)) => {
1839                // Parse incoming message
1840                match serde_json::from_str::<WsIncoming>(&text) {
1841                    Ok(WsIncoming::Session { session_id }) => {
1842                        // Create or resume session
1843                        match get_or_create_session(&state, session_id).await {
1844                            Ok(id) => {
1845                                current_session_id = Some(id.clone());
1846                                let connected = WsOutgoing::Connected { session_id: id };
1847                                if let Ok(json) = serde_json::to_string(&connected) {
1848                                    let _ = sender.send(WsMessage::Text(json.into())).await;
1849                                }
1850                            }
1851                            Err(e) => {
1852                                let error = WsOutgoing::Error {
1853                                    message: format!("Failed to create session: {}", e.1),
1854                                };
1855                                if let Ok(json) = serde_json::to_string(&error) {
1856                                    let _ = sender.send(WsMessage::Text(json.into())).await;
1857                                }
1858                            }
1859                        }
1860                    }
1861                    Ok(WsIncoming::Chat { message }) => {
1862                        // Ensure we have a session
1863                        let session_id = match &current_session_id {
1864                            Some(id) => id.clone(),
1865                            None => {
1866                                // Auto-create session if none exists
1867                                match get_or_create_session(&state, None).await {
1868                                    Ok(id) => {
1869                                        current_session_id = Some(id.clone());
1870                                        // Notify client of new session
1871                                        let connected = WsOutgoing::Connected {
1872                                            session_id: id.clone(),
1873                                        };
1874                                        if let Ok(json) = serde_json::to_string(&connected) {
1875                                            let _ = sender.send(WsMessage::Text(json.into())).await;
1876                                        }
1877                                        id
1878                                    }
1879                                    Err(e) => {
1880                                        let error = WsOutgoing::Error {
1881                                            message: format!("Failed to create session: {}", e.1),
1882                                        };
1883                                        if let Ok(json) = serde_json::to_string(&error) {
1884                                            let _ = sender.send(WsMessage::Text(json.into())).await;
1885                                        }
1886                                        continue;
1887                                    }
1888                                }
1889                            }
1890                        };
1891
1892                        debug!("WebSocket chat [{}]: {}", session_id, message);
1893
1894                        // Acquire in-process turn gate
1895                        let _gate_permit = state.turn_gate.acquire().await;
1896
1897                        // Acquire cross-process workspace lock
1898                        let ws_lock = state.workspace_lock.clone();
1899                        let _ws_guard =
1900                            match tokio::task::spawn_blocking(move || ws_lock.acquire()).await {
1901                                Ok(Ok(guard)) => guard,
1902                                Ok(Err(e)) => {
1903                                    let error = WsOutgoing::Error {
1904                                        message: format!("Workspace lock error: {}", e),
1905                                    };
1906                                    if let Ok(json) = serde_json::to_string(&error) {
1907                                        let _ = sender.send(WsMessage::Text(json.into())).await;
1908                                    }
1909                                    continue;
1910                                }
1911                                Err(e) => {
1912                                    let error = WsOutgoing::Error {
1913                                        message: format!("Lock task error: {}", e),
1914                                    };
1915                                    if let Ok(json) = serde_json::to_string(&error) {
1916                                        let _ = sender.send(WsMessage::Text(json.into())).await;
1917                                    }
1918                                    continue;
1919                                }
1920                            };
1921
1922                        // Process chat
1923                        let mut sessions = state.sessions.lock().await;
1924                        let entry = match sessions.get_mut(&session_id) {
1925                            Some(e) => e,
1926                            None => {
1927                                let error = WsOutgoing::Error {
1928                                    message: "Session not found".to_string(),
1929                                };
1930                                if let Ok(json) = serde_json::to_string(&error) {
1931                                    let _ = sender.send(WsMessage::Text(json.into())).await;
1932                                }
1933                                current_session_id = None;
1934                                continue;
1935                            }
1936                        };
1937
1938                        entry.last_accessed = Instant::now();
1939
1940                        match entry.agent.chat(&message).await {
1941                            Ok(response) => {
1942                                // Send response as content
1943                                let content = WsOutgoing::Content { delta: response };
1944                                if let Ok(json) = serde_json::to_string(&content) {
1945                                    let _ = sender.send(WsMessage::Text(json.into())).await;
1946                                }
1947
1948                                // Send done
1949                                let done = WsOutgoing::Done;
1950                                if let Ok(json) = serde_json::to_string(&done) {
1951                                    let _ = sender.send(WsMessage::Text(json.into())).await;
1952                                }
1953                            }
1954                            Err(e) => {
1955                                let error = WsOutgoing::Error {
1956                                    message: e.to_string(),
1957                                };
1958                                if let Ok(json) = serde_json::to_string(&error) {
1959                                    let _ = sender.send(WsMessage::Text(json.into())).await;
1960                                }
1961                            }
1962                        }
1963                    }
1964                    Ok(WsIncoming::Ping) => {
1965                        let pong = WsOutgoing::Pong;
1966                        if let Ok(json) = serde_json::to_string(&pong) {
1967                            let _ = sender.send(WsMessage::Text(json.into())).await;
1968                        }
1969                    }
1970                    Err(e) => {
1971                        let error = WsOutgoing::Error {
1972                            message: format!("Invalid message format: {}", e),
1973                        };
1974                        if let Ok(json) = serde_json::to_string(&error) {
1975                            let _ = sender.send(WsMessage::Text(json.into())).await;
1976                        }
1977                    }
1978                }
1979            }
1980            Ok(WsMessage::Ping(data)) => {
1981                let _ = sender.send(WsMessage::Pong(data)).await;
1982            }
1983            Ok(WsMessage::Close(_)) => {
1984                debug!("WebSocket client disconnected");
1985                break;
1986            }
1987            Err(e) => {
1988                debug!("WebSocket error: {}", e);
1989                break;
1990            }
1991            _ => {}
1992        }
1993    }
1994
1995    debug!("WebSocket connection closed");
1996}
1997
1998/// Check if an origin header value represents a localhost origin.
1999///
2000/// Matches: `http(s)://localhost:<port>`, `http(s)://127.0.0.1:<port>`,
2001/// and `http(s)://[::1]:<port>` (any port, or no port).
2002fn is_localhost_origin(origin: &[u8]) -> bool {
2003    let s = match std::str::from_utf8(origin) {
2004        Ok(s) => s,
2005        Err(_) => return false,
2006    };
2007
2008    // Strip scheme
2009    let rest = if let Some(r) = s.strip_prefix("http://") {
2010        r
2011    } else if let Some(r) = s.strip_prefix("https://") {
2012        r
2013    } else {
2014        return false;
2015    };
2016
2017    // Check host (with optional :<port> suffix)
2018    let host = if let Some(pos) = rest.rfind(':') {
2019        // Could be host:port — but we need to handle [::1]:port carefully
2020        if rest.starts_with('[') {
2021            // IPv6 bracket notation: [::1]:port or [::1]
2022            if let Some(bracket_end) = rest.find(']') {
2023                let ipv6_host = &rest[..=bracket_end]; // e.g. "[::1]"
2024                if bracket_end + 1 < rest.len() {
2025                    // There's something after ']', should be ':port'
2026                    let after = &rest[bracket_end + 1..];
2027                    if !after.starts_with(':') || !after[1..].chars().all(|c| c.is_ascii_digit()) {
2028                        return false;
2029                    }
2030                }
2031                ipv6_host
2032            } else {
2033                return false;
2034            }
2035        } else {
2036            // Plain host:port — verify port part is digits
2037            let port_part = &rest[pos + 1..];
2038            if port_part.chars().all(|c| c.is_ascii_digit()) {
2039                &rest[..pos]
2040            } else {
2041                rest
2042            }
2043        }
2044    } else {
2045        rest
2046    };
2047
2048    matches!(host, "localhost" | "127.0.0.1" | "[::1]")
2049}
2050
2051/// POST /api/sessions/{session_id}/approve — approve or deny an elevated tool execution.
2052///
2053/// Body: `{ "request_id": "...", "decision": "approved" | "approved_for_session" | "denied", "reason": "..." }`
2054///
2055/// This endpoint is a placeholder for future interactive approval flows.
2056/// Currently returns the decision acknowledgment. Full interactive approval
2057/// (blocking agent execution until client responds) requires WebSocket-based
2058/// request/response pairing, which can be built on top of this endpoint.
2059async fn approve_tool_execution(
2060    Path(session_id): Path<String>,
2061    State(_state): State<Arc<AppState>>,
2062    Json(body): Json<serde_json::Value>,
2063) -> impl IntoResponse {
2064    let decision = body["decision"].as_str().unwrap_or("denied");
2065    let reason = body["reason"].as_str().unwrap_or("");
2066
2067    let response = serde_json::json!({
2068        "session_id": session_id,
2069        "decision": decision,
2070        "reason": reason,
2071        "status": "acknowledged"
2072    });
2073
2074    (StatusCode::OK, Json(response))
2075}