1use anyhow::Result;
7use axum::{
8 Router,
9 extract::{
10 Path, Query, Request, State,
11 ws::{Message as WsMessage, WebSocket, WebSocketUpgrade},
12 },
13 http::{StatusCode, header},
14 middleware::{self, Next},
15 response::{
16 IntoResponse, Json, Response,
17 sse::{Event, Sse},
18 },
19 routing::{delete, get, post},
20};
21use futures::{SinkExt, StreamExt};
22use rust_embed::RustEmbed;
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::HashMap;
26use std::convert::Infallible;
27use std::net::SocketAddr;
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use tokio::sync::Mutex;
31use tower_http::cors::{AllowOrigin, Any, CorsLayer};
32use tower_http::limit::RequestBodyLimitLayer;
33use tracing::{debug, info};
34
35use localgpt_core::agent::{Agent, AgentConfig, StreamEvent, extract_tool_detail};
36use localgpt_core::concurrency::{TurnGate, WorkspaceLock};
37use localgpt_core::config::Config;
38use localgpt_core::heartbeat::{HeartbeatStatus, get_last_heartbeat_event};
39use localgpt_core::memory::MemoryManager;
40
41#[derive(RustEmbed)]
43#[folder = "ui/"]
44struct UiAssets;
45
46const SESSION_TIMEOUT: Duration = Duration::from_secs(30 * 60);
48
49const MAX_SESSIONS: usize = 100;
51
52const HTTP_AGENT_ID: &str = "http";
54
55pub struct Server {
56 config: Config,
57 turn_gate: TurnGate,
58 bridge_manager: crate::security::BridgeManager,
59}
60
61pub(crate) struct SessionEntry {
62 agent: Agent,
63 last_accessed: Instant,
64 dirty: bool,
66}
67
68pub(crate) struct AppState {
69 pub(crate) config: Config,
70 pub(crate) sessions: Mutex<HashMap<String, SessionEntry>>,
71 pub(crate) memory: MemoryManager,
73 turn_gate: TurnGate,
75 workspace_lock: WorkspaceLock,
77 rate_limiter: Arc<crate::rate_limiter::RateLimiter>,
79 pub(crate) bridge_manager: crate::security::BridgeManager,
81}
82
83impl Server {
84 pub fn new(config: &Config) -> Result<Self> {
85 Ok(Self {
86 config: config.clone(),
87 turn_gate: TurnGate::new(),
88 bridge_manager: crate::security::BridgeManager::new(),
89 })
90 }
91
92 pub fn new_with_gate(config: &Config, turn_gate: TurnGate) -> Result<Self> {
95 Ok(Self {
96 config: config.clone(),
97 turn_gate,
98 bridge_manager: crate::security::BridgeManager::new(),
99 })
100 }
101
102 pub fn new_daemon(
104 config: &Config,
105 turn_gate: TurnGate,
106 bridge_manager: crate::security::BridgeManager,
107 ) -> Result<Self> {
108 Ok(Self {
109 config: config.clone(),
110 turn_gate,
111 bridge_manager,
112 })
113 }
114
115 pub async fn run(&self) -> Result<()> {
116 let memory =
118 MemoryManager::new_with_full_config(&self.config.memory, Some(&self.config), "main")?;
119
120 let workspace_lock = WorkspaceLock::new()?;
121 let rate_limiter = crate::rate_limiter::create_rate_limiter(&self.config.server.rate_limit);
122
123 let state = Arc::new(AppState {
124 config: self.config.clone(),
125 sessions: Mutex::new(HashMap::new()),
126 memory,
127 turn_gate: self.turn_gate.clone(),
128 workspace_lock,
129 rate_limiter,
130 bridge_manager: self.bridge_manager.clone(),
131 });
132
133 if let Err(e) = load_persisted_sessions(&state).await {
135 info!("Could not load persisted sessions: {}", e);
136 }
137
138 let cleanup_state = state.clone();
140 tokio::spawn(async move {
141 let mut interval = tokio::time::interval(Duration::from_secs(60));
142 loop {
143 interval.tick().await;
144 cleanup_expired_sessions(&cleanup_state).await;
145 }
146 });
147
148 let save_state = state.clone();
150 tokio::spawn(async move {
151 let mut interval = tokio::time::interval(Duration::from_secs(300));
152 loop {
153 interval.tick().await;
154 save_dirty_sessions(&save_state).await;
155 }
156 });
157
158 let cors = if self.config.server.cors_origins.is_empty() {
159 CorsLayer::new()
161 .allow_origin(AllowOrigin::predicate(|origin, _| {
162 let origin = origin.as_bytes();
163 is_localhost_origin(origin)
164 }))
165 .allow_methods(Any)
166 .allow_headers(Any)
167 } else {
168 let origins: Vec<axum::http::HeaderValue> = self
169 .config
170 .server
171 .cors_origins
172 .iter()
173 .filter_map(|o| o.parse().ok())
174 .collect();
175 CorsLayer::new()
176 .allow_origin(origins)
177 .allow_methods(Any)
178 .allow_headers(Any)
179 };
180
181 let public_routes = Router::new()
183 .route("/", get(serve_ui_index))
184 .route("/ui/{*path}", get(serve_ui_file))
185 .route("/health", get(health_check))
186 .route("/api/auth/status", get(auth_status));
187
188 let openai_routes = Router::new()
190 .route(
191 "/v1/chat/completions",
192 post(crate::openai_compat::chat_completions),
193 )
194 .route("/v1/models", get(crate::openai_compat::list_models))
195 .layer(middleware::from_fn_with_state(
196 state.clone(),
197 rate_limit_middleware,
198 ))
199 .layer(middleware::from_fn_with_state(
200 state.clone(),
201 auth_middleware,
202 ));
203
204 let api_routes = Router::new()
206 .route("/api/sessions", post(create_session))
207 .route("/api/sessions", get(list_sessions))
208 .route("/api/sessions/{session_id}", delete(delete_session))
209 .route("/api/sessions/{session_id}", get(get_session_status))
210 .route(
211 "/api/sessions/{session_id}/messages",
212 get(get_session_messages),
213 )
214 .route("/api/sessions/{session_id}/compact", post(compact_session))
215 .route("/api/sessions/{session_id}/clear", post(clear_session))
216 .route("/api/sessions/{session_id}/model", post(set_session_model))
217 .route("/api/chat", post(chat))
218 .route("/api/chat/stream", post(chat_stream))
219 .route("/api/ws", get(websocket_handler))
220 .route("/api/memory/search", get(memory_search))
221 .route("/api/memory/stats", get(memory_stats))
222 .route("/api/memory/reindex", post(memory_reindex))
223 .route("/api/status", get(status))
224 .route("/api/config", get(get_config).post(set_config))
225 .route("/api/heartbeat/status", get(heartbeat_status))
226 .route("/api/bridges", get(list_bridges))
227 .route("/api/channels/status", get(channels_status))
228 .route("/api/saved-sessions", get(list_saved_sessions))
229 .route("/api/saved-sessions/{session_id}", get(get_saved_session))
230 .route("/api/logs/daemon", get(get_daemon_logs))
231 .route(
232 "/api/sessions/{session_id}/approve",
233 post(approve_tool_execution),
234 )
235 .layer(middleware::from_fn_with_state(
236 state.clone(),
237 rate_limit_middleware,
238 ))
239 .layer(middleware::from_fn_with_state(
240 state.clone(),
241 auth_middleware,
242 ));
243
244 let app = public_routes
245 .merge(api_routes)
246 .merge(openai_routes)
247 .layer(RequestBodyLimitLayer::new(
248 self.config.server.max_request_body,
249 ))
250 .layer(cors)
251 .with_state(state);
252
253 let addr: SocketAddr =
254 format!("{}:{}", self.config.server.bind, self.config.server.port).parse()?;
255
256 #[cfg(feature = "tls")]
258 if self.config.server.tls_enabled {
259 let cert_dir = std::path::Path::new(&self.config.server.tls_cert_dir);
260 let cert_paths = crate::tls::certs::ensure_certs(
261 cert_dir,
262 self.config.server.tls_renew_threshold_days,
263 )?;
264
265 info!("Starting HTTPS server on https://{}", addr);
266 info!(
267 "CA certificate: {} (share with clients for trust)",
268 cert_paths.ca_cert.display()
269 );
270
271 let tls_config = axum_server::tls_rustls::RustlsConfig::from_pem_file(
272 &cert_paths.server_cert,
273 &cert_paths.server_key,
274 )
275 .await?;
276
277 let companion_port = self.config.server.port.saturating_add(1);
279 let companion_addr: SocketAddr =
280 format!("{}:{}", self.config.server.bind, companion_port).parse()?;
281 let ca_cert_path = cert_paths.ca_cert.clone();
282 tokio::spawn(async move {
283 let ca_app = axum::Router::new().route(
284 "/ca.pem",
285 get(move || async move {
286 match tokio::fs::read(&ca_cert_path).await {
287 Ok(data) => (
288 StatusCode::OK,
289 [(header::CONTENT_TYPE, "application/x-pem-file")],
290 data,
291 )
292 .into_response(),
293 Err(_) => StatusCode::NOT_FOUND.into_response(),
294 }
295 }),
296 );
297 info!(
298 "CA cert download available at http://{}/ca.pem",
299 companion_addr
300 );
301 if let Ok(listener) = tokio::net::TcpListener::bind(companion_addr).await {
302 let _ = axum::serve(listener, ca_app).await;
303 }
304 });
305
306 axum_server::bind_rustls(addr, tls_config)
307 .serve(app.into_make_service())
308 .await?;
309
310 return Ok(());
311 }
312
313 info!("Starting HTTP server on http://{}", addr);
315
316 let listener = tokio::net::TcpListener::bind(addr).await?;
317 axum::serve(listener, app).await?;
318
319 Ok(())
320 }
321}
322
323struct AppError(StatusCode, String);
325
326impl IntoResponse for AppError {
327 fn into_response(self) -> Response {
328 (self.0, self.1).into_response()
329 }
330}
331
332fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
334 if a.len() != b.len() {
335 return false;
336 }
337 a.iter()
338 .zip(b.iter())
339 .fold(0u8, |acc, (x, y)| acc | (x ^ y))
340 == 0
341}
342
343async fn auth_middleware(
345 State(state): State<Arc<AppState>>,
346 request: Request,
347 next: Next,
348) -> Result<Response, StatusCode> {
349 let Some(expected) = &state.config.server.auth_token else {
351 return Ok(next.run(request).await);
352 };
353
354 let auth_header = request
355 .headers()
356 .get("authorization")
357 .and_then(|v| v.to_str().ok());
358
359 match auth_header {
360 Some(h) if h.starts_with("Bearer ") => {
361 let token = &h[7..];
362 if constant_time_eq(token.as_bytes(), expected.as_bytes()) {
364 Ok(next.run(request).await)
365 } else {
366 debug!("Auth failed: invalid token");
367 Err(StatusCode::UNAUTHORIZED)
368 }
369 }
370 _ => {
371 debug!("Auth failed: missing or invalid Authorization header");
372 Err(StatusCode::UNAUTHORIZED)
373 }
374 }
375}
376
377#[allow(dead_code)]
383pub fn verify_webhook_signature(
384 secret: Option<&str>,
385 signature_header: Option<&str>,
386 body: &[u8],
387) -> Result<(), StatusCode> {
388 let Some(secret) = secret else {
389 return Ok(()); };
391
392 let sig_header = signature_header.ok_or_else(|| {
393 debug!("Webhook rejected: missing X-Signature-256 header");
394 StatusCode::UNAUTHORIZED
395 })?;
396
397 let expected_hex = sig_header.strip_prefix("sha256=").ok_or_else(|| {
399 debug!("Webhook rejected: X-Signature-256 must start with 'sha256='");
400 StatusCode::UNAUTHORIZED
401 })?;
402
403 use hmac::{Hmac, Mac};
404 use sha2::Sha256;
405 type HmacSha256 = Hmac<Sha256>;
406
407 let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
408 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
409 mac.update(body);
410 let result = mac.finalize();
411
412 let computed_hex: String = result
413 .into_bytes()
414 .iter()
415 .map(|b| format!("{:02x}", b))
416 .collect();
417
418 if constant_time_eq(computed_hex.as_bytes(), expected_hex.as_bytes()) {
419 Ok(())
420 } else {
421 debug!("Webhook rejected: signature mismatch");
422 Err(StatusCode::UNAUTHORIZED)
423 }
424}
425
426async fn rate_limit_middleware(
428 State(state): State<Arc<AppState>>,
429 request: Request,
430 next: Next,
431) -> Result<Response, Response> {
432 let ip = request
433 .extensions()
434 .get::<axum::extract::ConnectInfo<SocketAddr>>()
435 .map(|ci| ci.0.ip())
436 .unwrap_or_else(|| std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
437
438 if !state.rate_limiter.check(ip).await {
439 return Err((
440 StatusCode::TOO_MANY_REQUESTS,
441 [(header::RETRY_AFTER, "60")],
442 "Rate limit exceeded",
443 )
444 .into_response());
445 }
446
447 Ok(next.run(request).await)
448}
449
450async fn auth_status(State(state): State<Arc<AppState>>) -> impl IntoResponse {
452 Json(json!({
453 "auth_required": state.config.server.auth_token.is_some()
454 }))
455}
456
457async fn cleanup_expired_sessions(state: &Arc<AppState>) {
459 let mut sessions = state.sessions.lock().await;
460 let before_count = sessions.len();
461
462 sessions.retain(|id, entry| {
463 let expired = entry.last_accessed.elapsed() > SESSION_TIMEOUT;
464 if expired {
465 debug!("Expiring session: {}", id);
466 }
467 !expired
468 });
469
470 let removed = before_count - sessions.len();
471 if removed > 0 {
472 info!("Cleaned up {} expired sessions", removed);
473 }
474}
475
476async fn load_persisted_sessions(state: &Arc<AppState>) -> Result<(), anyhow::Error> {
478 use localgpt_core::agent::list_sessions_for_agent;
479 use std::sync::Arc as StdArc;
480
481 let sessions_list = list_sessions_for_agent(HTTP_AGENT_ID)?;
482 let mut loaded = 0;
483
484 for session_info in sessions_list.into_iter().take(MAX_SESSIONS) {
485 let agent_config = AgentConfig {
486 model: state.config.agent.default_model.clone(),
487 context_window: state.config.agent.context_window,
488 reserve_tokens: state.config.agent.reserve_tokens,
489 };
490
491 let memory = StdArc::new(state.memory.clone());
492 let mut agent = Agent::new(agent_config, &state.config, memory).await?;
493
494 if agent.resume_session(&session_info.id).await.is_ok() {
496 let mut sessions = state.sessions.lock().await;
497 sessions.insert(
498 session_info.id.clone(),
499 SessionEntry {
500 agent,
501 last_accessed: Instant::now(),
502 dirty: false,
503 },
504 );
505 loaded += 1;
506 }
507 }
508
509 if loaded > 0 {
510 info!("Loaded {} persisted HTTP sessions", loaded);
511 }
512
513 Ok(())
514}
515
516async fn save_dirty_sessions(state: &Arc<AppState>) {
518 let mut sessions = state.sessions.lock().await;
519 let mut saved = 0;
520
521 for (id, entry) in sessions.iter_mut() {
522 if entry.dirty {
523 if let Err(e) = entry.agent.save_session_for_agent(HTTP_AGENT_ID).await {
524 debug!("Failed to save session {}: {}", id, e);
525 } else {
526 entry.dirty = false;
527 saved += 1;
528 }
529 }
530 }
531
532 if saved > 0 {
533 info!("Saved {} HTTP sessions to disk", saved);
534 }
535}
536
537async fn get_or_create_session(
539 state: &Arc<AppState>,
540 session_id: Option<String>,
541) -> Result<String, AppError> {
542 let mut sessions = state.sessions.lock().await;
543
544 if let Some(ref id) = session_id
546 && sessions.contains_key(id)
547 {
548 if let Some(entry) = sessions.get_mut(id) {
550 entry.last_accessed = Instant::now();
551 }
552 return Ok(id.clone());
553 }
554
555 if sessions.len() >= MAX_SESSIONS {
557 if let Some(oldest_id) = sessions
559 .iter()
560 .min_by_key(|(_, e)| e.last_accessed)
561 .map(|(id, _)| id.clone())
562 {
563 sessions.remove(&oldest_id);
564 info!("Removed oldest session {} to make room", oldest_id);
565 }
566 }
567
568 let new_id = session_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
570
571 let agent_config = AgentConfig {
572 model: state.config.agent.default_model.clone(),
573 context_window: state.config.agent.context_window,
574 reserve_tokens: state.config.agent.reserve_tokens,
575 };
576
577 let memory = std::sync::Arc::new(state.memory.clone());
578 let mut agent = Agent::new(agent_config, &state.config, memory)
579 .await
580 .map_err(|e| AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
581
582 agent
583 .new_session()
584 .await
585 .map_err(|e| AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
586
587 sessions.insert(
588 new_id.clone(),
589 SessionEntry {
590 agent,
591 last_accessed: Instant::now(),
592 dirty: true, },
594 );
595
596 info!("Created new session: {}", new_id);
597 Ok(new_id)
598}
599
600async fn health_check() -> &'static str {
602 "OK"
603}
604
605async fn serve_ui_index() -> Response {
607 serve_ui_asset("index.html")
608}
609
610async fn serve_ui_file(Path(path): Path<String>) -> Response {
612 serve_ui_asset(&path)
613}
614
615fn serve_ui_asset(path: &str) -> Response {
617 match UiAssets::get(path) {
618 Some(content) => {
619 let mime = match path.rsplit('.').next() {
620 Some("js") => "application/javascript".to_string(),
621 Some("wasm") => "application/wasm".to_string(),
622 _ => mime_guess::from_path(path)
623 .first_or_octet_stream()
624 .to_string(),
625 };
626 ([(header::CONTENT_TYPE, mime)], content.data.to_vec()).into_response()
627 }
628 None => (StatusCode::NOT_FOUND, "Not found").into_response(),
629 }
630}
631
632#[derive(Serialize)]
634struct StatusResponse {
635 version: String,
636 model: String,
637 memory_chunks: usize,
638 active_sessions: usize,
639 is_brand_new: bool,
640}
641
642async fn status(State(state): State<Arc<AppState>>) -> Json<StatusResponse> {
643 let sessions = state.sessions.lock().await;
644
645 Json(StatusResponse {
646 version: env!("CARGO_PKG_VERSION").to_string(),
647 model: state.config.agent.default_model.clone(),
648 memory_chunks: state.memory.chunk_count().unwrap_or(0),
649 active_sessions: sessions.len(),
650 is_brand_new: state.memory.is_brand_new(),
651 })
652}
653
654async fn list_bridges(
655 State(state): State<Arc<AppState>>,
656) -> Json<Vec<crate::security::bridge::BridgeStatus>> {
657 Json(state.bridge_manager.get_active_bridges().await)
658}
659
660#[derive(serde::Serialize)]
661struct ChannelStatus {
662 name: String,
663 state: String,
664 connected_since: Option<String>,
665 last_active: Option<String>,
666 health: String,
667}
668
669#[derive(serde::Serialize)]
670struct ChannelsSummary {
671 total: usize,
672 connected: usize,
673 disconnected: usize,
674 degraded: usize,
675}
676
677#[derive(serde::Serialize)]
678struct ChannelsStatusResponse {
679 channels: Vec<ChannelStatus>,
680 summary: ChannelsSummary,
681}
682
683async fn channels_status(State(state): State<Arc<AppState>>) -> Json<ChannelsStatusResponse> {
684 let bridges = state.bridge_manager.get_active_bridges().await;
685
686 let channels: Vec<ChannelStatus> = bridges
687 .iter()
688 .map(|b| {
689 let name = b
690 .bridge_id
691 .as_deref()
692 .unwrap_or(&b.connection_id)
693 .to_string();
694 let health_str = format!("{:?}", b.health).to_lowercase();
695 let channel_state = match b.health {
696 crate::security::bridge::HealthStatus::Healthy => "connected",
697 crate::security::bridge::HealthStatus::Degraded => "degraded",
698 crate::security::bridge::HealthStatus::Unhealthy => "disconnected",
699 };
700
701 ChannelStatus {
702 name,
703 state: channel_state.to_string(),
704 connected_since: Some(b.connected_at.to_rfc3339()),
705 last_active: Some(b.last_active.to_rfc3339()),
706 health: health_str,
707 }
708 })
709 .collect();
710
711 let connected = channels.iter().filter(|c| c.state == "connected").count();
712 let degraded = channels.iter().filter(|c| c.state == "degraded").count();
713 let disconnected = channels
714 .iter()
715 .filter(|c| c.state == "disconnected")
716 .count();
717
718 Json(ChannelsStatusResponse {
719 summary: ChannelsSummary {
720 total: channels.len(),
721 connected,
722 disconnected,
723 degraded,
724 },
725 channels,
726 })
727}
728
729#[derive(Deserialize)]
731struct CreateSessionRequest {
732 session_id: Option<String>,
733}
734
735#[derive(Serialize)]
736struct SessionResponse {
737 session_id: String,
738 model: String,
739}
740
741async fn create_session(
742 State(state): State<Arc<AppState>>,
743 Json(request): Json<CreateSessionRequest>,
744) -> Response {
745 match get_or_create_session(&state, request.session_id).await {
746 Ok(session_id) => Json(SessionResponse {
747 session_id,
748 model: state.config.agent.default_model.clone(),
749 })
750 .into_response(),
751 Err(e) => e.into_response(),
752 }
753}
754
755#[derive(Serialize)]
756struct SessionInfo {
757 session_id: String,
758 idle_seconds: u64,
759}
760
761#[derive(Serialize)]
762struct ListSessionsResponse {
763 sessions: Vec<SessionInfo>,
764}
765
766async fn list_sessions(State(state): State<Arc<AppState>>) -> Json<ListSessionsResponse> {
767 let sessions = state.sessions.lock().await;
768
769 let session_list: Vec<SessionInfo> = sessions
770 .iter()
771 .map(|(id, entry)| SessionInfo {
772 session_id: id.clone(),
773 idle_seconds: entry.last_accessed.elapsed().as_secs(),
774 })
775 .collect();
776
777 Json(ListSessionsResponse {
778 sessions: session_list,
779 })
780}
781
782async fn delete_session(
784 State(state): State<Arc<AppState>>,
785 Path(session_id): Path<String>,
786) -> Response {
787 let mut sessions = state.sessions.lock().await;
788
789 if sessions.remove(&session_id).is_some() {
790 info!("Deleted session: {}", session_id);
791 Json(json!({"deleted": true, "session_id": session_id})).into_response()
792 } else {
793 AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response()
794 }
795}
796
797#[derive(Serialize)]
799struct SessionStatusResponse {
800 session_id: String,
801 model: String,
802 message_count: usize,
803 token_count: usize,
804 idle_seconds: u64,
805 api_input_tokens: u64,
806 api_output_tokens: u64,
807 api_cost_usd: f64,
808 search_queries: u64,
809 search_cached_hits: u64,
810 search_cost_usd: f64,
811}
812
813async fn get_session_status(
814 State(state): State<Arc<AppState>>,
815 Path(session_id): Path<String>,
816) -> Response {
817 let sessions = state.sessions.lock().await;
818
819 match sessions.get(&session_id) {
820 Some(entry) => {
821 let status = entry.agent.session_status();
822 Json(SessionStatusResponse {
823 session_id,
824 model: entry.agent.model().to_string(),
825 message_count: status.message_count,
826 token_count: status.token_count,
827 idle_seconds: entry.last_accessed.elapsed().as_secs(),
828 api_input_tokens: status.api_input_tokens,
829 api_output_tokens: status.api_output_tokens,
830 api_cost_usd: status.api_cost_usd,
831 search_queries: status.search_queries,
832 search_cached_hits: status.search_cached_hits,
833 search_cost_usd: status.search_cost_usd,
834 })
835 .into_response()
836 }
837 None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
838 }
839}
840
841#[derive(Serialize)]
843struct ActiveSessionMessage {
844 role: String,
845 content: Option<String>,
846 tool_calls: Option<Vec<serde_json::Value>>,
847 tool_call_id: Option<String>,
848 timestamp: u64,
849}
850
851#[derive(Serialize)]
852struct SessionMessagesResponse {
853 session_id: String,
854 messages: Vec<ActiveSessionMessage>,
855}
856
857async fn get_session_messages(
858 State(state): State<Arc<AppState>>,
859 Path(session_id): Path<String>,
860) -> Response {
861 let mut sessions = state.sessions.lock().await;
862
863 match sessions.get_mut(&session_id) {
864 Some(entry) => {
865 entry.last_accessed = Instant::now();
866
867 let messages: Vec<ActiveSessionMessage> = entry
868 .agent
869 .raw_session_messages()
870 .iter()
871 .map(|sm| {
872 let role = match sm.message.role {
873 localgpt_core::agent::Role::User => "user",
874 localgpt_core::agent::Role::Assistant => "assistant",
875 localgpt_core::agent::Role::System => "system",
876 localgpt_core::agent::Role::Tool => "toolResult",
877 };
878
879 let tool_calls = sm.message.tool_calls.as_ref().map(|tcs| {
881 tcs.iter()
882 .map(|tc| {
883 json!({
884 "id": tc.id,
885 "name": tc.name,
886 "arguments": tc.arguments
887 })
888 })
889 .collect()
890 });
891
892 ActiveSessionMessage {
893 role: role.to_string(),
894 content: if sm.message.content.is_empty() {
895 None
896 } else {
897 Some(sm.message.content.clone())
898 },
899 tool_calls,
900 tool_call_id: sm.message.tool_call_id.clone(),
901 timestamp: sm.timestamp,
902 }
903 })
904 .collect();
905
906 Json(SessionMessagesResponse {
907 session_id,
908 messages,
909 })
910 .into_response()
911 }
912 None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
913 }
914}
915
916async fn compact_session(
918 State(state): State<Arc<AppState>>,
919 Path(session_id): Path<String>,
920) -> Response {
921 let mut sessions = state.sessions.lock().await;
922
923 match sessions.get_mut(&session_id) {
924 Some(entry) => {
925 entry.last_accessed = Instant::now();
926
927 match entry.agent.compact_session().await {
928 Ok((before, after)) => Json(json!({
929 "session_id": session_id,
930 "token_count_before": before,
931 "token_count_after": after,
932 }))
933 .into_response(),
934 Err(e) => {
935 AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
936 }
937 }
938 }
939 None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
940 }
941}
942
943async fn clear_session(
945 State(state): State<Arc<AppState>>,
946 Path(session_id): Path<String>,
947) -> Response {
948 let mut sessions = state.sessions.lock().await;
949
950 match sessions.get_mut(&session_id) {
951 Some(entry) => {
952 entry.last_accessed = Instant::now();
953 entry.agent.clear_session();
954 Json(json!({"session_id": session_id, "cleared": true})).into_response()
955 }
956 None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
957 }
958}
959
960#[derive(Deserialize)]
962struct SetModelRequest {
963 model: String,
964}
965
966async fn set_session_model(
967 State(state): State<Arc<AppState>>,
968 Path(session_id): Path<String>,
969 Json(request): Json<SetModelRequest>,
970) -> Response {
971 let mut sessions = state.sessions.lock().await;
972
973 match sessions.get_mut(&session_id) {
974 Some(entry) => {
975 entry.last_accessed = Instant::now();
976
977 match entry.agent.set_model(&request.model) {
978 Ok(()) => Json(json!({
979 "session_id": session_id,
980 "model": request.model,
981 }))
982 .into_response(),
983 Err(e) => AppError(StatusCode::BAD_REQUEST, e.to_string()).into_response(),
984 }
985 }
986 None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
987 }
988}
989
990#[derive(Deserialize)]
992struct ChatRequest {
993 message: String,
994 session_id: Option<String>,
995 model: Option<String>,
997}
998
999#[derive(Serialize)]
1000struct ChatResponse {
1001 response: String,
1002 session_id: String,
1003 model: String,
1004}
1005
1006async fn chat(State(state): State<Arc<AppState>>, Json(request): Json<ChatRequest>) -> Response {
1007 let session_id = match get_or_create_session(&state, request.session_id).await {
1009 Ok(id) => id,
1010 Err(e) => return e.into_response(),
1011 };
1012
1013 let _gate_permit = state.turn_gate.acquire().await;
1015
1016 let ws_lock_path = state.workspace_lock.clone();
1018 let ws_guard = match tokio::task::spawn_blocking(move || ws_lock_path.acquire()).await {
1019 Ok(Ok(guard)) => guard,
1020 Ok(Err(e)) => {
1021 return AppError(
1022 StatusCode::INTERNAL_SERVER_ERROR,
1023 format!("Failed to acquire workspace lock: {}", e),
1024 )
1025 .into_response();
1026 }
1027 Err(e) => {
1028 return AppError(
1029 StatusCode::INTERNAL_SERVER_ERROR,
1030 format!("Lock task error: {}", e),
1031 )
1032 .into_response();
1033 }
1034 };
1035
1036 let mut sessions = state.sessions.lock().await;
1038 let entry = match sessions.get_mut(&session_id) {
1039 Some(e) => e,
1040 None => {
1041 return AppError(StatusCode::NOT_FOUND, "Session not found".to_string())
1042 .into_response();
1043 }
1044 };
1045
1046 entry.last_accessed = Instant::now();
1047
1048 if let Some(ref model) = request.model
1050 && let Err(e) = entry.agent.set_model(model)
1051 {
1052 return AppError(StatusCode::BAD_REQUEST, format!("Invalid model: {}", e)).into_response();
1053 }
1054
1055 let result = entry.agent.chat(&request.message).await;
1056
1057 drop(ws_guard);
1059
1060 match result {
1061 Ok(response) => {
1062 entry.dirty = true;
1063 Json(ChatResponse {
1064 response,
1065 session_id,
1066 model: entry.agent.model().to_string(),
1067 })
1068 .into_response()
1069 }
1070 Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
1071 }
1072}
1073
1074async fn chat_stream(
1076 State(state): State<Arc<AppState>>,
1077 Json(request): Json<ChatRequest>,
1078) -> Response {
1079 let session_id = match get_or_create_session(&state, request.session_id).await {
1081 Ok(id) => id,
1082 Err(e) => return e.into_response(),
1083 };
1084
1085 let state_clone = state.clone();
1086 let message = request.message.clone();
1087
1088 let stream = async_stream::stream! {
1089 yield Ok::<Event, Infallible>(Event::default().data(json!({"type": "session", "session_id": session_id}).to_string()));
1091
1092 let _gate_permit = state_clone.turn_gate.acquire().await;
1094
1095 let ws_lock = state_clone.workspace_lock.clone();
1097 let _ws_guard = match tokio::task::spawn_blocking(move || ws_lock.acquire()).await {
1098 Ok(Ok(guard)) => Some(guard),
1099 Ok(Err(e)) => {
1100 yield Ok(Event::default().data(json!({"error": format!("Workspace lock error: {}", e)}).to_string()));
1101 return;
1102 }
1103 Err(e) => {
1104 yield Ok(Event::default().data(json!({"error": format!("Lock task error: {}", e)}).to_string()));
1105 return;
1106 }
1107 };
1108
1109 let mut sessions = state_clone.sessions.lock().await;
1110 let entry = match sessions.get_mut(&session_id) {
1111 Some(e) => e,
1112 None => {
1113 yield Ok(Event::default().data(json!({"error": "Session not found"}).to_string()));
1114 return;
1115 }
1116 };
1117
1118 entry.last_accessed = Instant::now();
1119 entry.dirty = true;
1120
1121 match entry.agent.chat_stream_with_tools(&message, Vec::new()).await {
1123 Ok(event_stream) => {
1124 use futures::StreamExt;
1125
1126 let mut pinned_stream = std::pin::pin!(event_stream);
1128
1129 while let Some(event) = pinned_stream.next().await {
1130 match event {
1131 Ok(StreamEvent::Content(content)) => {
1132 let data = json!({"type": "content", "delta": content});
1133 yield Ok(Event::default().data(data.to_string()));
1134 }
1135 Ok(StreamEvent::ToolCallStart { name, id, arguments }) => {
1136 let detail = extract_tool_detail(&name, &arguments);
1137 let data = json!({"type": "tool_start", "name": name, "id": id, "detail": detail});
1138 yield Ok(Event::default().data(data.to_string()));
1139 }
1140 Ok(StreamEvent::ToolCallEnd { name, id, output, warnings }) => {
1141 let data = json!({
1142 "type": "tool_end",
1143 "name": name,
1144 "id": id,
1145 "output": output.chars().take(500).collect::<String>(),
1146 "warnings": warnings
1147 });
1148 yield Ok(Event::default().data(data.to_string()));
1149 }
1150 Ok(StreamEvent::Done) => {
1151 let data = json!({"type": "done"});
1152 yield Ok(Event::default().data(data.to_string()));
1153 }
1154 Ok(StreamEvent::ApprovalRequired { request_id, tool_name, arguments, level }) => {
1155 let data = json!({
1156 "type": "approval_required",
1157 "request_id": request_id,
1158 "tool_name": tool_name,
1159 "arguments": arguments,
1160 "level": level,
1161 });
1162 yield Ok(Event::default().data(data.to_string()));
1163 }
1164 Err(e) => {
1165 yield Ok(Event::default().data(json!({"error": e.to_string()}).to_string()));
1166 break;
1167 }
1168 }
1169 }
1170 }
1171 Err(e) => {
1172 yield Ok(Event::default().data(json!({"error": e.to_string()}).to_string()));
1173 }
1174 }
1175
1176 yield Ok(Event::default().data("[DONE]"));
1177 };
1178
1179 Sse::new(stream).into_response()
1180}
1181
1182#[derive(Deserialize)]
1184struct SearchQuery {
1185 q: String,
1186 limit: Option<usize>,
1187}
1188
1189#[derive(Serialize)]
1190struct SearchResult {
1191 file: String,
1192 line_start: i32,
1193 line_end: i32,
1194 content: String,
1195 score: f64,
1196}
1197
1198#[derive(Serialize)]
1199struct SearchResponse {
1200 results: Vec<SearchResult>,
1201 query: String,
1202}
1203
1204async fn memory_search(
1205 State(state): State<Arc<AppState>>,
1206 Query(query): Query<SearchQuery>,
1207) -> Response {
1208 if query.q.len() > 1000 {
1210 return AppError(StatusCode::BAD_REQUEST, "Query too long".to_string()).into_response();
1211 }
1212 match memory_search_inner(&state.memory, &query.q, query.limit) {
1213 Ok(response) => Json(response).into_response(),
1214 Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
1215 }
1216}
1217
1218fn memory_search_inner(
1219 memory: &MemoryManager,
1220 query: &str,
1221 limit: Option<usize>,
1222) -> Result<SearchResponse, anyhow::Error> {
1223 let limit = limit.unwrap_or(10).min(100);
1224 let results = memory.search(query, limit)?;
1225
1226 let results: Vec<SearchResult> = results
1227 .into_iter()
1228 .map(|r| SearchResult {
1229 file: r.file,
1230 line_start: r.line_start,
1231 line_end: r.line_end,
1232 content: r.content,
1233 score: r.score,
1234 })
1235 .collect();
1236
1237 Ok(SearchResponse {
1238 results,
1239 query: query.to_string(),
1240 })
1241}
1242
1243#[derive(Serialize)]
1245struct StatsResponse {
1246 workspace: String,
1247 total_files: usize,
1248 total_chunks: usize,
1249 index_size_kb: u64,
1250}
1251
1252async fn memory_stats(State(state): State<Arc<AppState>>) -> Response {
1253 match memory_stats_inner(&state.memory) {
1254 Ok(response) => Json(response).into_response(),
1255 Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
1256 }
1257}
1258
1259fn memory_stats_inner(memory: &MemoryManager) -> Result<StatsResponse, anyhow::Error> {
1260 let stats = memory.stats()?;
1261
1262 Ok(StatsResponse {
1263 workspace: stats.workspace,
1264 total_files: stats.total_files,
1265 total_chunks: stats.total_chunks,
1266 index_size_kb: stats.index_size_kb,
1267 })
1268}
1269
1270#[derive(Deserialize)]
1272struct ReindexRequest {
1273 #[serde(default)]
1274 force: bool,
1275}
1276
1277#[derive(Serialize)]
1278struct ReindexResponse {
1279 files_processed: usize,
1280 files_updated: usize,
1281 chunks_indexed: usize,
1282 duration_ms: u128,
1283}
1284
1285async fn memory_reindex(
1286 State(state): State<Arc<AppState>>,
1287 Json(request): Json<ReindexRequest>,
1288) -> Response {
1289 let memory = state.memory.clone();
1291 let force = request.force;
1292
1293 match tokio::task::spawn_blocking(move || memory_reindex_inner(&memory, force)).await {
1294 Ok(Ok(response)) => Json(response).into_response(),
1295 Ok(Err(e)) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
1296 Err(e) => AppError(
1297 StatusCode::INTERNAL_SERVER_ERROR,
1298 format!("Task error: {}", e),
1299 )
1300 .into_response(),
1301 }
1302}
1303
1304fn memory_reindex_inner(
1305 memory: &MemoryManager,
1306 force: bool,
1307) -> Result<ReindexResponse, anyhow::Error> {
1308 let stats = memory.reindex(force)?;
1309
1310 Ok(ReindexResponse {
1311 files_processed: stats.files_processed,
1312 files_updated: stats.files_updated,
1313 chunks_indexed: stats.chunks_indexed,
1314 duration_ms: stats.duration.as_millis(),
1315 })
1316}
1317
1318#[derive(Serialize)]
1320struct ConfigResponse {
1321 agent: AgentConfigInfo,
1322 server: ServerConfigInfo,
1323 memory: MemoryConfigInfo,
1324 heartbeat: HeartbeatConfigInfo,
1325}
1326
1327#[derive(Serialize)]
1328struct AgentConfigInfo {
1329 default_model: String,
1330 context_window: usize,
1331 reserve_tokens: usize,
1332}
1333
1334#[derive(Serialize)]
1335struct ServerConfigInfo {
1336 port: u16,
1337 bind: String,
1338}
1339
1340#[derive(Serialize)]
1341struct MemoryConfigInfo {
1342 workspace: String,
1343 embedding_model: String,
1344 chunk_size: usize,
1345 chunk_overlap: usize,
1346}
1347
1348#[derive(Serialize)]
1349struct HeartbeatConfigInfo {
1350 enabled: bool,
1351 interval: String,
1352}
1353
1354async fn get_config(State(state): State<Arc<AppState>>) -> Json<ConfigResponse> {
1355 Json(ConfigResponse {
1356 agent: AgentConfigInfo {
1357 default_model: state.config.agent.default_model.clone(),
1358 context_window: state.config.agent.context_window,
1359 reserve_tokens: state.config.agent.reserve_tokens,
1360 },
1361 server: ServerConfigInfo {
1362 port: state.config.server.port,
1363 bind: state.config.server.bind.clone(),
1364 },
1365 memory: MemoryConfigInfo {
1366 workspace: state.config.memory.workspace.clone(),
1367 embedding_model: state.config.memory.embedding_model.clone(),
1368 chunk_size: state.config.memory.chunk_size,
1369 chunk_overlap: state.config.memory.chunk_overlap,
1370 },
1371 heartbeat: HeartbeatConfigInfo {
1372 enabled: state.config.heartbeat.enabled,
1373 interval: state.config.heartbeat.interval.clone(),
1374 },
1375 })
1376}
1377
1378#[derive(Deserialize)]
1379struct SetConfigRequest {
1380 key: String,
1381 value: String,
1382}
1383
1384#[derive(Serialize)]
1385struct SetConfigResponse {
1386 success: bool,
1387 key: String,
1388 value: String,
1389}
1390
1391async fn set_config(
1392 State(state): State<Arc<AppState>>,
1393 Json(req): Json<SetConfigRequest>,
1394) -> std::result::Result<Json<SetConfigResponse>, (axum::http::StatusCode, String)> {
1395 let current = state.config.get_value(&req.key);
1397 if current.is_err() {
1398 return Err((
1399 axum::http::StatusCode::BAD_REQUEST,
1400 format!("Unknown config key: {}", req.key),
1401 ));
1402 }
1403
1404 let config_path = state.config.paths.config_file();
1406 if config_path.exists() {
1407 let content = std::fs::read_to_string(&config_path).map_err(|e| {
1408 (
1409 axum::http::StatusCode::INTERNAL_SERVER_ERROR,
1410 format!("Failed to read config: {}", e),
1411 )
1412 })?;
1413
1414 let new_content = apply_config_value(&content, &req.key, &req.value);
1415
1416 std::fs::write(&config_path, new_content).map_err(|e| {
1417 (
1418 axum::http::StatusCode::INTERNAL_SERVER_ERROR,
1419 format!("Failed to write config: {}", e),
1420 )
1421 })?;
1422 }
1423
1424 Ok(Json(SetConfigResponse {
1425 success: true,
1426 key: req.key,
1427 value: req.value,
1428 }))
1429}
1430
1431fn apply_config_value(content: &str, key: &str, value: &str) -> String {
1434 let parts: Vec<&str> = key.split('.').collect();
1435 if parts.len() != 2 {
1436 return content.to_string();
1437 }
1438
1439 let (section, field) = (parts[0], parts[1]);
1440 let section_header = format!("[{}]", section);
1441
1442 let mut result = String::with_capacity(content.len());
1443 let mut in_target_section = false;
1444 let mut value_set = false;
1445
1446 for line in content.lines() {
1447 let trimmed = line.trim();
1448
1449 if trimmed.starts_with('[') && !trimmed.starts_with("[[") {
1451 in_target_section = trimmed == section_header;
1452 }
1453
1454 if in_target_section
1456 && !value_set
1457 && trimmed.starts_with(field)
1458 && let Some((key_part, _)) = trimmed.split_once('=')
1459 && key_part.trim() == field
1460 {
1461 let indent = &line[..line.len() - trimmed.len()];
1462 let formatted = if value.parse::<f64>().is_ok() || value == "true" || value == "false" {
1464 format!("{}{} = {}", indent, field, value)
1465 } else {
1466 format!("{}{} = \"{}\"", indent, field, value)
1467 };
1468 result.push_str(&formatted);
1469 result.push('\n');
1470 value_set = true;
1471 continue;
1472 }
1473
1474 result.push_str(line);
1475 result.push('\n');
1476 }
1477
1478 result
1479}
1480
1481#[derive(Serialize)]
1483struct HeartbeatStatusResponse {
1484 enabled: bool,
1485 interval: String,
1486 last_event: Option<HeartbeatEventInfo>,
1487}
1488
1489#[derive(Serialize)]
1490struct HeartbeatEventInfo {
1491 ts: u64,
1492 status: String,
1493 duration_ms: u64,
1494 preview: Option<String>,
1495 reason: Option<String>,
1496 age_seconds: u64,
1497}
1498
1499async fn heartbeat_status(State(state): State<Arc<AppState>>) -> Json<HeartbeatStatusResponse> {
1500 let last_event = get_last_heartbeat_event().map(|event| {
1501 let now_ms = std::time::SystemTime::now()
1502 .duration_since(std::time::UNIX_EPOCH)
1503 .map(|d| d.as_millis() as u64)
1504 .unwrap_or(0);
1505 let age_seconds = (now_ms.saturating_sub(event.ts)) / 1000;
1506
1507 let status = match event.status {
1508 HeartbeatStatus::Sent => "sent",
1509 HeartbeatStatus::Ok => "ok",
1510 HeartbeatStatus::Skipped => "skipped",
1511 HeartbeatStatus::SkippedMayTry => "skipped",
1512 HeartbeatStatus::Failed => "failed",
1513 HeartbeatStatus::TimedOut => "timed_out",
1514 };
1515
1516 HeartbeatEventInfo {
1517 ts: event.ts,
1518 status: status.to_string(),
1519 duration_ms: event.duration_ms,
1520 preview: event.preview,
1521 reason: event.reason,
1522 age_seconds,
1523 }
1524 });
1525
1526 Json(HeartbeatStatusResponse {
1527 enabled: state.config.heartbeat.enabled,
1528 interval: state.config.heartbeat.interval.clone(),
1529 last_event,
1530 })
1531}
1532
1533#[derive(Serialize)]
1535struct SavedSessionInfo {
1536 id: String,
1537 message_count: usize,
1538 created_at: String,
1539}
1540
1541#[derive(Serialize)]
1542struct SavedSessionsResponse {
1543 sessions: Vec<SavedSessionInfo>,
1544}
1545
1546async fn list_saved_sessions(State(_state): State<Arc<AppState>>) -> Response {
1547 use localgpt_core::agent::list_sessions_for_agent;
1548
1549 match list_sessions_for_agent(HTTP_AGENT_ID) {
1550 Ok(sessions) => {
1551 let session_list: Vec<SavedSessionInfo> = sessions
1552 .into_iter()
1553 .map(|s| SavedSessionInfo {
1554 id: s.id,
1555 message_count: s.message_count,
1556 created_at: s.created_at.format("%Y-%m-%dT%H:%M:%S").to_string(),
1557 })
1558 .collect();
1559
1560 Json(SavedSessionsResponse {
1561 sessions: session_list,
1562 })
1563 .into_response()
1564 }
1565 Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
1566 }
1567}
1568
1569#[derive(Serialize)]
1571struct SavedSessionMessage {
1572 role: String,
1573 content: Option<String>,
1574 tool_calls: Option<Vec<serde_json::Value>>,
1575 tool_call_id: Option<String>,
1576 timestamp: Option<u64>,
1577}
1578
1579#[derive(Serialize)]
1580struct SavedSessionDetail {
1581 session_id: String,
1582 created_at: String,
1583 messages: Vec<SavedSessionMessage>,
1584}
1585
1586async fn get_saved_session(Path(session_id): Path<String>) -> Response {
1587 use localgpt_core::agent::get_sessions_dir_for_agent;
1588 use std::fs::File;
1589 use std::io::{BufRead, BufReader};
1590
1591 if !session_id
1593 .chars()
1594 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1595 {
1596 return AppError(StatusCode::BAD_REQUEST, "Invalid session ID".to_string()).into_response();
1597 }
1598
1599 let sessions_dir = match get_sessions_dir_for_agent(HTTP_AGENT_ID) {
1600 Ok(dir) => dir,
1601 Err(e) => {
1602 return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
1603 }
1604 };
1605
1606 let session_path = sessions_dir.join(format!("{}.jsonl", session_id));
1607
1608 if !session_path.exists() {
1609 return AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response();
1610 }
1611
1612 let file = match File::open(&session_path) {
1613 Ok(f) => f,
1614 Err(e) => {
1615 return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
1616 }
1617 };
1618
1619 let reader = BufReader::new(file);
1620 let mut messages = Vec::new();
1621 let mut created_at = String::new();
1622
1623 for (i, line) in reader.lines().enumerate() {
1624 let line = match line {
1625 Ok(l) => l,
1626 Err(_) => continue,
1627 };
1628
1629 let parsed: serde_json::Value = match serde_json::from_str(&line) {
1630 Ok(v) => v,
1631 Err(_) => continue,
1632 };
1633
1634 if i == 0 && parsed["type"].as_str() == Some("session") {
1636 created_at = parsed["timestamp"].as_str().unwrap_or("").to_string();
1637 continue;
1638 }
1639
1640 if parsed["type"].as_str() == Some("message")
1642 && let Some(msg) = parsed.get("message")
1643 {
1644 let role = msg["role"].as_str().unwrap_or("unknown").to_string();
1645
1646 let content = if let Some(content_arr) = msg["content"].as_array() {
1648 content_arr
1649 .iter()
1650 .filter_map(|c| {
1651 if c["type"].as_str() == Some("text") {
1652 c["text"].as_str().map(String::from)
1653 } else {
1654 None
1655 }
1656 })
1657 .collect::<Vec<_>>()
1658 .join("\n")
1659 } else if let Some(text) = msg["content"].as_str() {
1660 text.to_string()
1661 } else {
1662 String::new()
1663 };
1664
1665 let tool_calls = msg["toolCalls"].as_array().cloned();
1667
1668 let tool_call_id = msg["toolCallId"].as_str().map(String::from);
1670
1671 let timestamp = msg["timestamp"].as_u64();
1672
1673 messages.push(SavedSessionMessage {
1674 role,
1675 content: if content.is_empty() {
1676 None
1677 } else {
1678 Some(content)
1679 },
1680 tool_calls,
1681 tool_call_id,
1682 timestamp,
1683 });
1684 }
1685 }
1686
1687 Json(SavedSessionDetail {
1688 session_id,
1689 created_at,
1690 messages,
1691 })
1692 .into_response()
1693}
1694
1695#[derive(Deserialize)]
1697struct LogsQuery {
1698 lines: Option<usize>,
1699}
1700
1701#[derive(Serialize)]
1702struct DaemonLogsResponse {
1703 lines: Vec<String>,
1704 total_lines: usize,
1705 file_size_bytes: u64,
1706}
1707
1708async fn get_daemon_logs(Query(query): Query<LogsQuery>) -> Response {
1709 use localgpt_core::agent::get_state_dir;
1710 use std::fs::File;
1711 use std::io::{BufRead, BufReader};
1712
1713 let lines_requested = query.lines.unwrap_or(200).min(1000);
1714
1715 let state_dir = match get_state_dir() {
1716 Ok(dir) => dir,
1717 Err(e) => {
1718 return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
1719 }
1720 };
1721
1722 let date = chrono::Local::now().format("%Y-%m-%d");
1724 let log_path = state_dir
1725 .join("logs")
1726 .join(format!("localgpt-{}.log", date));
1727
1728 if !log_path.exists() {
1729 return Json(DaemonLogsResponse {
1730 lines: vec![],
1731 total_lines: 0,
1732 file_size_bytes: 0,
1733 })
1734 .into_response();
1735 }
1736
1737 let metadata = match std::fs::metadata(&log_path) {
1738 Ok(m) => m,
1739 Err(e) => {
1740 return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
1741 }
1742 };
1743
1744 let file = match File::open(&log_path) {
1745 Ok(f) => f,
1746 Err(e) => {
1747 return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
1748 }
1749 };
1750
1751 let reader = BufReader::new(file);
1752 let all_lines: Vec<String> = reader.lines().map_while(Result::ok).collect();
1753 let total_lines = all_lines.len();
1754
1755 let lines: Vec<String> = if total_lines > lines_requested {
1757 all_lines[(total_lines - lines_requested)..].to_vec()
1758 } else {
1759 all_lines
1760 };
1761
1762 Json(DaemonLogsResponse {
1763 lines,
1764 total_lines,
1765 file_size_bytes: metadata.len(),
1766 })
1767 .into_response()
1768}
1769
1770async fn websocket_handler(
1772 ws: WebSocketUpgrade,
1773 State(state): State<Arc<AppState>>,
1774) -> impl IntoResponse {
1775 ws.max_message_size(1024 * 1024)
1777 .on_upgrade(|socket| handle_websocket(socket, state))
1778}
1779
1780#[derive(Deserialize)]
1782#[serde(tag = "type")]
1783enum WsIncoming {
1784 #[serde(rename = "session")]
1786 Session { session_id: Option<String> },
1787 #[serde(rename = "chat")]
1790 Chat { message: String },
1791 #[serde(rename = "ping")]
1793 Ping,
1794}
1795
1796#[derive(Serialize)]
1797#[serde(tag = "type")]
1798#[allow(dead_code)] enum WsOutgoing {
1800 #[serde(rename = "connected")]
1802 Connected { session_id: String },
1803 #[serde(rename = "content")]
1805 Content { delta: String },
1806 #[serde(rename = "tool_start")]
1808 ToolStart { name: String, id: String },
1809 #[serde(rename = "tool_end")]
1811 ToolEnd {
1812 name: String,
1813 id: String,
1814 output: String,
1815 },
1816 #[serde(rename = "done")]
1818 Done,
1819 #[serde(rename = "pong")]
1821 Pong,
1822 #[serde(rename = "error")]
1824 Error { message: String },
1825}
1826
1827async fn handle_websocket(socket: WebSocket, state: Arc<AppState>) {
1828 let (mut sender, mut receiver) = socket.split();
1829
1830 debug!("WebSocket client connected");
1831
1832 let mut current_session_id: Option<String> = None;
1834
1835 while let Some(msg) = receiver.next().await {
1837 match msg {
1838 Ok(WsMessage::Text(text)) => {
1839 match serde_json::from_str::<WsIncoming>(&text) {
1841 Ok(WsIncoming::Session { session_id }) => {
1842 match get_or_create_session(&state, session_id).await {
1844 Ok(id) => {
1845 current_session_id = Some(id.clone());
1846 let connected = WsOutgoing::Connected { session_id: id };
1847 if let Ok(json) = serde_json::to_string(&connected) {
1848 let _ = sender.send(WsMessage::Text(json.into())).await;
1849 }
1850 }
1851 Err(e) => {
1852 let error = WsOutgoing::Error {
1853 message: format!("Failed to create session: {}", e.1),
1854 };
1855 if let Ok(json) = serde_json::to_string(&error) {
1856 let _ = sender.send(WsMessage::Text(json.into())).await;
1857 }
1858 }
1859 }
1860 }
1861 Ok(WsIncoming::Chat { message }) => {
1862 let session_id = match ¤t_session_id {
1864 Some(id) => id.clone(),
1865 None => {
1866 match get_or_create_session(&state, None).await {
1868 Ok(id) => {
1869 current_session_id = Some(id.clone());
1870 let connected = WsOutgoing::Connected {
1872 session_id: id.clone(),
1873 };
1874 if let Ok(json) = serde_json::to_string(&connected) {
1875 let _ = sender.send(WsMessage::Text(json.into())).await;
1876 }
1877 id
1878 }
1879 Err(e) => {
1880 let error = WsOutgoing::Error {
1881 message: format!("Failed to create session: {}", e.1),
1882 };
1883 if let Ok(json) = serde_json::to_string(&error) {
1884 let _ = sender.send(WsMessage::Text(json.into())).await;
1885 }
1886 continue;
1887 }
1888 }
1889 }
1890 };
1891
1892 debug!("WebSocket chat [{}]: {}", session_id, message);
1893
1894 let _gate_permit = state.turn_gate.acquire().await;
1896
1897 let ws_lock = state.workspace_lock.clone();
1899 let _ws_guard =
1900 match tokio::task::spawn_blocking(move || ws_lock.acquire()).await {
1901 Ok(Ok(guard)) => guard,
1902 Ok(Err(e)) => {
1903 let error = WsOutgoing::Error {
1904 message: format!("Workspace lock error: {}", e),
1905 };
1906 if let Ok(json) = serde_json::to_string(&error) {
1907 let _ = sender.send(WsMessage::Text(json.into())).await;
1908 }
1909 continue;
1910 }
1911 Err(e) => {
1912 let error = WsOutgoing::Error {
1913 message: format!("Lock task error: {}", e),
1914 };
1915 if let Ok(json) = serde_json::to_string(&error) {
1916 let _ = sender.send(WsMessage::Text(json.into())).await;
1917 }
1918 continue;
1919 }
1920 };
1921
1922 let mut sessions = state.sessions.lock().await;
1924 let entry = match sessions.get_mut(&session_id) {
1925 Some(e) => e,
1926 None => {
1927 let error = WsOutgoing::Error {
1928 message: "Session not found".to_string(),
1929 };
1930 if let Ok(json) = serde_json::to_string(&error) {
1931 let _ = sender.send(WsMessage::Text(json.into())).await;
1932 }
1933 current_session_id = None;
1934 continue;
1935 }
1936 };
1937
1938 entry.last_accessed = Instant::now();
1939
1940 match entry.agent.chat(&message).await {
1941 Ok(response) => {
1942 let content = WsOutgoing::Content { delta: response };
1944 if let Ok(json) = serde_json::to_string(&content) {
1945 let _ = sender.send(WsMessage::Text(json.into())).await;
1946 }
1947
1948 let done = WsOutgoing::Done;
1950 if let Ok(json) = serde_json::to_string(&done) {
1951 let _ = sender.send(WsMessage::Text(json.into())).await;
1952 }
1953 }
1954 Err(e) => {
1955 let error = WsOutgoing::Error {
1956 message: e.to_string(),
1957 };
1958 if let Ok(json) = serde_json::to_string(&error) {
1959 let _ = sender.send(WsMessage::Text(json.into())).await;
1960 }
1961 }
1962 }
1963 }
1964 Ok(WsIncoming::Ping) => {
1965 let pong = WsOutgoing::Pong;
1966 if let Ok(json) = serde_json::to_string(&pong) {
1967 let _ = sender.send(WsMessage::Text(json.into())).await;
1968 }
1969 }
1970 Err(e) => {
1971 let error = WsOutgoing::Error {
1972 message: format!("Invalid message format: {}", e),
1973 };
1974 if let Ok(json) = serde_json::to_string(&error) {
1975 let _ = sender.send(WsMessage::Text(json.into())).await;
1976 }
1977 }
1978 }
1979 }
1980 Ok(WsMessage::Ping(data)) => {
1981 let _ = sender.send(WsMessage::Pong(data)).await;
1982 }
1983 Ok(WsMessage::Close(_)) => {
1984 debug!("WebSocket client disconnected");
1985 break;
1986 }
1987 Err(e) => {
1988 debug!("WebSocket error: {}", e);
1989 break;
1990 }
1991 _ => {}
1992 }
1993 }
1994
1995 debug!("WebSocket connection closed");
1996}
1997
1998fn is_localhost_origin(origin: &[u8]) -> bool {
2003 let s = match std::str::from_utf8(origin) {
2004 Ok(s) => s,
2005 Err(_) => return false,
2006 };
2007
2008 let rest = if let Some(r) = s.strip_prefix("http://") {
2010 r
2011 } else if let Some(r) = s.strip_prefix("https://") {
2012 r
2013 } else {
2014 return false;
2015 };
2016
2017 let host = if let Some(pos) = rest.rfind(':') {
2019 if rest.starts_with('[') {
2021 if let Some(bracket_end) = rest.find(']') {
2023 let ipv6_host = &rest[..=bracket_end]; if bracket_end + 1 < rest.len() {
2025 let after = &rest[bracket_end + 1..];
2027 if !after.starts_with(':') || !after[1..].chars().all(|c| c.is_ascii_digit()) {
2028 return false;
2029 }
2030 }
2031 ipv6_host
2032 } else {
2033 return false;
2034 }
2035 } else {
2036 let port_part = &rest[pos + 1..];
2038 if port_part.chars().all(|c| c.is_ascii_digit()) {
2039 &rest[..pos]
2040 } else {
2041 rest
2042 }
2043 }
2044 } else {
2045 rest
2046 };
2047
2048 matches!(host, "localhost" | "127.0.0.1" | "[::1]")
2049}
2050
2051async fn approve_tool_execution(
2060 Path(session_id): Path<String>,
2061 State(_state): State<Arc<AppState>>,
2062 Json(body): Json<serde_json::Value>,
2063) -> impl IntoResponse {
2064 let decision = body["decision"].as_str().unwrap_or("denied");
2065 let reason = body["reason"].as_str().unwrap_or("");
2066
2067 let response = serde_json::json!({
2068 "session_id": session_id,
2069 "decision": decision,
2070 "reason": reason,
2071 "status": "acknowledged"
2072 });
2073
2074 (StatusCode::OK, Json(response))
2075}