opendev-web 0.1.4

Web backend (axum + WebSocket) for OpenDev AI coding agent
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
//! Shared application state.
//!
//! Thread-safe state shared between HTTP handlers and WebSocket connections.
//! Uses `tokio::sync::oneshot` channels for approval, ask-user, and plan-approval
//! notification so that waiting agent tasks are woken immediately on resolution
//! (no polling).

mod approvals;
mod bridge;

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, broadcast, mpsc, oneshot};

use opendev_config::ModelRegistry;
use opendev_history::SessionManager;
use opendev_http::UserStore;
use opendev_models::AppConfig;

/// WebSocket broadcast message.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WsBroadcast {
    #[serde(rename = "type")]
    pub msg_type: String,
    #[serde(default)]
    pub data: serde_json::Value,
}

/// Shared application state wrapped in Arc for use with Axum.
#[derive(Clone)]
pub struct AppState {
    inner: Arc<AppStateInner>,
}

pub(super) struct AppStateInner {
    /// Session manager for persistence.
    pub(super) session_manager: RwLock<SessionManager>,
    /// Application configuration.
    pub(super) config: RwLock<AppConfig>,
    /// Working directory for the current project.
    pub(super) working_dir: String,
    /// Broadcast channel for WebSocket messages.
    pub(super) ws_tx: broadcast::Sender<WsBroadcast>,
    /// Pending approval requests: approval_id -> (metadata, oneshot sender).
    pub(super) pending_approvals: Mutex<HashMap<String, PendingApprovalSlot>>,
    /// Pending ask-user requests: request_id -> (metadata, oneshot sender).
    pub(super) pending_ask_users: Mutex<HashMap<String, PendingAskUserSlot>>,
    /// Pending plan approval requests: request_id -> (metadata, oneshot sender).
    pub(super) pending_plan_approvals: Mutex<HashMap<String, PendingPlanApprovalSlot>>,
    /// Current operation mode (normal/plan).
    pub(super) mode: RwLock<OperationMode>,
    /// Autonomy level.
    pub(super) autonomy_level: RwLock<String>,
    /// Interrupt flag.
    pub(super) interrupt_requested: Mutex<bool>,
    /// Running sessions: session_id -> status.
    pub(super) running_sessions: Mutex<HashMap<String, String>>,
    /// Live message injection queues: session_id -> bounded mpsc sender.
    pub(super) injection_queues: Mutex<HashMap<String, mpsc::Sender<String>>>,
    /// Agent executor (trait-object, set once on first query).
    pub(super) agent_executor: Mutex<Option<Arc<dyn AgentExecutor>>>,
    /// User store for authentication.
    pub(super) user_store: Arc<UserStore>,
    /// Model/provider registry from models.dev cache.
    pub(super) model_registry: RwLock<ModelRegistry>,
    /// Bridge mode state.
    pub(super) bridge: RwLock<BridgeState>,
}

/// Bridge mode state: when the TUI owns agent execution and
/// the Web UI mirrors it.
#[derive(Debug, Default)]
pub(super) struct BridgeState {
    /// Session ID currently owned by the TUI bridge.
    pub(super) session_id: Option<String>,
    /// Whether bridge mode is active.
    pub(super) active: bool,
}

/// Operation mode for the agent.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum OperationMode {
    Normal,
    Plan,
}

impl std::fmt::Display for OperationMode {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            OperationMode::Normal => write!(f, "normal"),
            OperationMode::Plan => write!(f, "plan"),
        }
    }
}

/// Metadata for a pending approval request.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PendingApproval {
    pub tool_name: String,
    pub arguments: serde_json::Value,
    pub session_id: Option<String>,
}

/// Internal slot holding approval metadata and the oneshot sender.
pub(super) struct PendingApprovalSlot {
    pub meta: PendingApproval,
    pub tx: Option<oneshot::Sender<ApprovalResult>>,
}

/// Result sent through the oneshot channel when an approval is resolved.
#[derive(Debug, Clone)]
pub struct ApprovalResult {
    pub approved: bool,
    pub auto_approve: bool,
}

/// Metadata for a pending ask-user request.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PendingAskUser {
    pub prompt: String,
    pub session_id: Option<String>,
}

/// Internal slot holding ask-user metadata and the oneshot sender.
pub(super) struct PendingAskUserSlot {
    pub meta: PendingAskUser,
    pub tx: Option<oneshot::Sender<AskUserResult>>,
}

/// Result sent through the oneshot channel when ask-user is resolved.
#[derive(Debug, Clone)]
pub struct AskUserResult {
    pub answers: Option<serde_json::Value>,
    pub cancelled: bool,
}

/// Metadata for a pending plan approval request.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PendingPlanApproval {
    pub data: serde_json::Value,
    pub session_id: Option<String>,
}

/// Internal slot holding plan-approval metadata and the oneshot sender.
pub(super) struct PendingPlanApprovalSlot {
    pub meta: PendingPlanApproval,
    pub tx: Option<oneshot::Sender<PlanApprovalResult>>,
}

/// Result sent through the oneshot channel when a plan approval is resolved.
#[derive(Debug, Clone)]
pub struct PlanApprovalResult {
    pub action: String,
    pub feedback: String,
}

/// Trait for agent execution -- injected into AppState for testability.
#[async_trait::async_trait]
pub trait AgentExecutor: Send + Sync + 'static {
    /// Execute a query for a given session. Called as a background task.
    async fn execute_query(
        &self,
        message: String,
        session_id: String,
        state: AppState,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}

/// Injection queue capacity per session.
const INJECTION_QUEUE_CAPACITY: usize = 10;

impl AppState {
    /// Create a new AppState.
    pub fn new(
        session_manager: SessionManager,
        config: AppConfig,
        working_dir: String,
        user_store: UserStore,
        model_registry: ModelRegistry,
    ) -> Self {
        let (ws_tx, _) = broadcast::channel(256);
        Self {
            inner: Arc::new(AppStateInner {
                session_manager: RwLock::new(session_manager),
                config: RwLock::new(config),
                working_dir,
                ws_tx,
                pending_approvals: Mutex::new(HashMap::new()),
                pending_ask_users: Mutex::new(HashMap::new()),
                pending_plan_approvals: Mutex::new(HashMap::new()),
                mode: RwLock::new(OperationMode::Normal),
                autonomy_level: RwLock::new("Manual".to_string()),
                interrupt_requested: Mutex::new(false),
                running_sessions: Mutex::new(HashMap::new()),
                injection_queues: Mutex::new(HashMap::new()),
                agent_executor: Mutex::new(None),
                user_store: Arc::new(user_store),
                model_registry: RwLock::new(model_registry),
                bridge: RwLock::new(BridgeState::default()),
            }),
        }
    }

    // --- Accessors ---

    /// Get a read guard for the session manager.
    pub async fn session_manager(&self) -> tokio::sync::RwLockReadGuard<'_, SessionManager> {
        self.inner.session_manager.read().await
    }

    /// Get a write guard for the session manager.
    pub async fn session_manager_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, SessionManager> {
        self.inner.session_manager.write().await
    }

    /// Get the current session ID (if a session is loaded).
    pub async fn current_session_id(&self) -> Option<String> {
        self.inner
            .session_manager
            .read()
            .await
            .current_session()
            .map(|s| s.id.clone())
    }

    /// Get a read guard for the app config.
    pub async fn config(&self) -> tokio::sync::RwLockReadGuard<'_, AppConfig> {
        self.inner.config.read().await
    }

    /// Get a write guard for the app config.
    pub async fn config_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, AppConfig> {
        self.inner.config.write().await
    }

    /// Get the working directory.
    pub fn working_dir(&self) -> &str {
        &self.inner.working_dir
    }

    // --- User store ---

    /// Get a reference to the user store.
    pub fn user_store(&self) -> &UserStore {
        &self.inner.user_store
    }

    // --- Model registry ---

    /// Get a read guard for the model registry.
    pub async fn model_registry(&self) -> tokio::sync::RwLockReadGuard<'_, ModelRegistry> {
        self.inner.model_registry.read().await
    }

    /// Get a write guard for the model registry.
    pub async fn model_registry_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, ModelRegistry> {
        self.inner.model_registry.write().await
    }

    // --- WebSocket ---

    /// Get a clone of the broadcast sender.
    pub fn ws_sender(&self) -> broadcast::Sender<WsBroadcast> {
        self.inner.ws_tx.clone()
    }

    /// Subscribe to WebSocket broadcasts.
    pub fn ws_subscribe(&self) -> broadcast::Receiver<WsBroadcast> {
        self.inner.ws_tx.subscribe()
    }

    /// Broadcast a message to all WebSocket subscribers.
    pub fn broadcast(&self, msg: WsBroadcast) {
        // Ignore send errors (no subscribers is fine).
        let _ = self.inner.ws_tx.send(msg);
    }

    // --- Mode / settings ---

    /// Get the current operation mode.
    pub async fn mode(&self) -> OperationMode {
        *self.inner.mode.read().await
    }

    /// Set the operation mode.
    pub async fn set_mode(&self, mode: OperationMode) {
        *self.inner.mode.write().await = mode;
    }

    // --- Autonomy level ---

    /// Get the current autonomy level.
    pub async fn autonomy_level(&self) -> String {
        self.inner.autonomy_level.read().await.clone()
    }

    /// Set the autonomy level.
    pub async fn set_autonomy_level(&self, level: String) {
        *self.inner.autonomy_level.write().await = level;
    }

    // --- Interrupt ---

    /// Request an interrupt.
    ///
    /// Also denies all pending approvals, ask-user, and plan-approval requests
    /// by sending rejection through their oneshot channels so blocked tasks wake up.
    pub async fn request_interrupt(&self) {
        *self.inner.interrupt_requested.lock().await = true;

        // Deny all pending approvals.
        {
            let mut approvals = self.inner.pending_approvals.lock().await;
            for (_id, slot) in approvals.iter_mut() {
                if let Some(tx) = slot.tx.take() {
                    let _ = tx.send(ApprovalResult {
                        approved: false,
                        auto_approve: false,
                    });
                }
            }
            approvals.clear();
        }

        // Cancel all pending ask-user requests.
        {
            let mut ask_users = self.inner.pending_ask_users.lock().await;
            for (_id, slot) in ask_users.iter_mut() {
                if let Some(tx) = slot.tx.take() {
                    let _ = tx.send(AskUserResult {
                        answers: None,
                        cancelled: true,
                    });
                }
            }
            ask_users.clear();
        }

        // Reject all pending plan approvals.
        {
            let mut plan_approvals = self.inner.pending_plan_approvals.lock().await;
            for (_id, slot) in plan_approvals.iter_mut() {
                if let Some(tx) = slot.tx.take() {
                    let _ = tx.send(PlanApprovalResult {
                        action: "reject".to_string(),
                        feedback: "Interrupted".to_string(),
                    });
                }
            }
            plan_approvals.clear();
        }
    }

    /// Clear the interrupt flag.
    pub async fn clear_interrupt(&self) {
        *self.inner.interrupt_requested.lock().await = false;
    }

    /// Check if interrupt has been requested.
    pub async fn is_interrupt_requested(&self) -> bool {
        *self.inner.interrupt_requested.lock().await
    }

    // --- Running sessions ---

    /// Mark a session as running.
    pub async fn set_session_running(&self, session_id: String) {
        self.inner
            .running_sessions
            .lock()
            .await
            .insert(session_id, "running".to_string());
    }

    /// Mark a session as idle.
    pub async fn set_session_idle(&self, session_id: &str) {
        self.inner.running_sessions.lock().await.remove(session_id);
    }

    /// Check if a session is running.
    pub async fn is_session_running(&self, session_id: &str) -> bool {
        self.inner
            .running_sessions
            .lock()
            .await
            .contains_key(session_id)
    }

    // --- Git branch ---

    /// Get the git branch for the working directory.
    pub fn git_branch(&self) -> Option<String> {
        let output = std::process::Command::new("git")
            .args(["rev-parse", "--abbrev-ref", "HEAD"])
            .current_dir(&self.inner.working_dir)
            .output()
            .ok()?;

        if output.status.success() {
            Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
        } else {
            None
        }
    }
}

#[cfg(test)]
mod tests;