Skip to main content

opendev_web/routes/
chat.rs

1//! Chat message routes.
2
3use axum::extract::State;
4use axum::routing::{delete, get, post};
5use axum::{Json, Router};
6use serde::Deserialize;
7use tracing::{error, info};
8
9use crate::error::WebError;
10use crate::state::{AppState, WsBroadcast};
11
12/// Chat query request.
13#[derive(Debug, Deserialize)]
14pub struct QueryRequest {
15    pub message: String,
16    #[serde(default)]
17    pub session_id: Option<String>,
18}
19
20/// Interrupt request.
21#[derive(Debug, Deserialize)]
22pub struct InterruptRequest {
23    #[serde(default)]
24    pub session_id: Option<String>,
25}
26
27/// Clear chat request.
28#[derive(Debug, Deserialize)]
29pub struct ClearChatRequest {
30    #[serde(default)]
31    pub workspace: Option<String>,
32}
33
34/// Build the chat router.
35pub fn router() -> Router<AppState> {
36    Router::new()
37        .route("/api/chat/messages", get(get_messages))
38        .route("/api/chat/query", post(send_query))
39        .route("/api/chat/interrupt", post(interrupt))
40        .route("/api/chat/clear", delete(clear_chat))
41}
42
43/// Get messages for the current session.
44async fn get_messages(State(state): State<AppState>) -> Result<Json<serde_json::Value>, WebError> {
45    let mgr = state.session_manager().await;
46    let session = mgr
47        .current_session()
48        .ok_or_else(|| WebError::NotFound("No active session".to_string()))?;
49
50    let messages: Vec<serde_json::Value> = session
51        .messages
52        .iter()
53        .filter(|msg| {
54            // Skip system-injected messages (nudges, directives, internal)
55            if msg.metadata.contains_key("_msg_class") {
56                return false;
57            }
58            // Skip [SYSTEM] prefixed messages from older sessions
59            if msg.role == opendev_models::message::Role::User
60                && msg.content.starts_with("[SYSTEM] ")
61            {
62                return false;
63            }
64            // Skip system messages
65            if msg.role == opendev_models::message::Role::System {
66                return false;
67            }
68            true
69        })
70        .map(|msg| {
71            let mut val = serde_json::json!({
72                "role": msg.role,
73                "content": msg.content,
74                "timestamp": msg.timestamp,
75                "tool_calls": msg.tool_calls.iter()
76                    .filter(|tc| tc.name != "task_complete")
77                    .count(),
78            });
79            if let Some(ref reasoning) = msg.reasoning_content {
80                val["reasoning_content"] = serde_json::json!(reasoning);
81            }
82            if let Some(ref trace) = msg.thinking_trace {
83                val["thinking_trace"] = serde_json::json!(trace);
84            }
85            val
86        })
87        .collect();
88
89    Ok(Json(serde_json::json!(messages)))
90}
91
92/// Send a query to the agent.
93///
94/// 4-case dispatch:
95/// 1. Empty message -> 400 Bad Request
96/// 2. Session already running -> inject into live queue; 409 if full
97/// 3. Normal -> load session, persist message, broadcast, fire agent loop
98/// 4. No agent executor set -> accept but warn
99async fn send_query(
100    State(state): State<AppState>,
101    Json(payload): Json<QueryRequest>,
102) -> Result<Json<serde_json::Value>, WebError> {
103    // Case 1: Empty message.
104    let message = payload.message.trim().to_string();
105    if message.is_empty() {
106        return Err(WebError::BadRequest("Message cannot be empty.".to_string()));
107    }
108
109    // Resolve session ID.
110    let session_id = match payload.session_id {
111        Some(id) => id,
112        None => state.current_session_id().await.ok_or_else(|| {
113            WebError::BadRequest("No active session. Create a session first.".to_string())
114        })?,
115    };
116
117    // Case 2: Session already running -> inject into live queue.
118    if state.is_session_running(&session_id).await {
119        match state.try_inject_message(&session_id, message.clone()).await {
120            Ok(()) => {
121                // Broadcast the injected user message.
122                state.broadcast(WsBroadcast {
123                    msg_type: "user_message".to_string(),
124                    data: serde_json::json!({
125                        "role": "user",
126                        "content": message,
127                        "session_id": session_id,
128                        "injected": true,
129                    }),
130                });
131                return Ok(Json(serde_json::json!({
132                    "status": "accepted",
133                    "session_id": session_id,
134                })));
135            }
136            Err(_) => {
137                return Err(WebError::Conflict(
138                    "Agent is busy; injection queue is full. Try again shortly.".to_string(),
139                ));
140            }
141        }
142    }
143
144    // Case 3: Normal flow — load session, persist message, broadcast, fire agent.
145
146    // Load session (try from session manager, fall back to current).
147    let mgr = state.session_manager().await;
148    let session_exists = mgr.load_session(&session_id).is_ok()
149        || mgr
150            .current_session()
151            .map(|s| s.id == session_id)
152            .unwrap_or(false);
153    drop(mgr);
154
155    if !session_exists {
156        return Err(WebError::NotFound(format!(
157            "Session '{}' not found.",
158            session_id
159        )));
160    }
161
162    // Broadcast user message to WebSocket clients.
163    state.broadcast(WsBroadcast {
164        msg_type: "user_message".to_string(),
165        data: serde_json::json!({
166            "role": "user",
167            "content": message,
168            "session_id": session_id,
169        }),
170    });
171
172    // Fire the agent executor as a background task.
173    if let Some(executor) = state.agent_executor().await {
174        let state_clone = state.clone();
175        let msg = message.clone();
176        let sid = session_id.clone();
177        tokio::spawn(async move {
178            if let Err(e) = executor.execute_query(msg, sid, state_clone).await {
179                error!("Agent executor error: {}", e);
180            }
181        });
182    } else {
183        info!(
184            "Query accepted for session {} but no agent executor is wired",
185            session_id
186        );
187    }
188
189    Ok(Json(serde_json::json!({
190        "status": "accepted",
191        "session_id": session_id,
192    })))
193}
194
195/// Interrupt an ongoing task.
196///
197/// Calls `request_interrupt()` which also denies all pending approvals and
198/// ask-user requests via their oneshot channels.
199async fn interrupt(
200    State(state): State<AppState>,
201    Json(_payload): Json<InterruptRequest>,
202) -> Json<serde_json::Value> {
203    state.request_interrupt().await;
204
205    state.broadcast(WsBroadcast {
206        msg_type: "interrupt".to_string(),
207        data: serde_json::json!({"status": "requested"}),
208    });
209
210    Json(serde_json::json!({
211        "status": "interrupt_requested",
212    }))
213}
214
215/// Clear the current chat session by creating a new one.
216async fn clear_chat(
217    State(state): State<AppState>,
218    body: Option<Json<ClearChatRequest>>,
219) -> Result<Json<serde_json::Value>, WebError> {
220    let mut mgr = state.session_manager_mut().await;
221    let session = mgr.create_session();
222    let session_id = session.id.clone();
223
224    // Set working directory if provided.
225    if let Some(Json(req)) = body
226        && let Some(wd) = req.workspace
227        && let Some(session) = mgr.current_session_mut()
228    {
229        session.working_directory = Some(wd);
230    }
231
232    mgr.save_current()
233        .map_err(|e| WebError::Internal(format!("Failed to save new session: {}", e)))?;
234
235    Ok(Json(serde_json::json!({
236        "status": "success",
237        "message": "Chat cleared",
238        "session_id": session_id,
239    })))
240}
241
242#[cfg(test)]
243#[path = "chat_tests.rs"]
244mod tests;