Skip to main content

opendev_web/state/
bridge.rs

1//! Bridge mode, injection queues, and agent executor management.
2
3use std::sync::Arc;
4
5use tokio::sync::mpsc;
6
7use super::{AgentExecutor, AppState, INJECTION_QUEUE_CAPACITY};
8
9impl AppState {
10    // --- Bridge mode ---
11
12    /// Check if bridge mode is active (TUI owns execution, Web UI mirrors).
13    pub async fn is_bridge_mode(&self) -> bool {
14        self.inner.bridge.read().await.active
15    }
16
17    /// Get the bridge session ID, if bridge mode is active.
18    pub async fn bridge_session_id(&self) -> Option<String> {
19        let bridge = self.inner.bridge.read().await;
20        if bridge.active {
21            bridge.session_id.clone()
22        } else {
23            None
24        }
25    }
26
27    /// Activate bridge mode for a given session.
28    ///
29    /// While active, the Web UI should not start its own agent execution
30    /// for this session; instead it should route messages to the TUI injector.
31    pub async fn set_bridge_session(&self, session_id: String) {
32        let mut bridge = self.inner.bridge.write().await;
33        bridge.active = true;
34        bridge.session_id = Some(session_id);
35    }
36
37    /// Deactivate bridge mode.
38    pub async fn clear_bridge_session(&self) {
39        let mut bridge = self.inner.bridge.write().await;
40        bridge.active = false;
41        bridge.session_id = None;
42    }
43
44    /// Check whether a mutation on a session should be blocked because
45    /// the TUI owns it in bridge mode.
46    ///
47    /// Returns `true` if the session is bridge-owned and should not be
48    /// mutated by the web server's own agent executor.
49    pub async fn is_bridge_guarded(&self, session_id: &str) -> bool {
50        let bridge = self.inner.bridge.read().await;
51        bridge.active && bridge.session_id.as_deref() == Some(session_id)
52    }
53
54    // --- Injection queues ---
55
56    /// Get or create the injection queue sender for a session.
57    ///
58    /// Returns `(sender, Option<receiver>)`. The receiver is `Some` only when the
59    /// queue was first created -- the caller that creates the session's agent loop
60    /// should take the receiver. Subsequent callers get `None` for the receiver.
61    pub async fn get_or_create_injection_queue(
62        &self,
63        session_id: &str,
64    ) -> (mpsc::Sender<String>, Option<mpsc::Receiver<String>>) {
65        let mut queues = self.inner.injection_queues.lock().await;
66        if let Some(tx) = queues.get(session_id) {
67            (tx.clone(), None)
68        } else {
69            let (tx, rx) = mpsc::channel(INJECTION_QUEUE_CAPACITY);
70            queues.insert(session_id.to_string(), tx.clone());
71            (tx, Some(rx))
72        }
73    }
74
75    /// Try to inject a message into a running session's queue.
76    ///
77    /// Returns `Ok(())` on success, `Err(message)` if queue is full or not found.
78    pub async fn try_inject_message(
79        &self,
80        session_id: &str,
81        message: String,
82    ) -> Result<(), String> {
83        let queues = self.inner.injection_queues.lock().await;
84        if let Some(tx) = queues.get(session_id) {
85            tx.try_send(message)
86                .map_err(|e| format!("Injection queue full or closed: {}", e))
87        } else {
88            Err("No injection queue for session".to_string())
89        }
90    }
91
92    /// Remove the injection queue for a session.
93    pub async fn clear_injection_queue(&self, session_id: &str) {
94        self.inner.injection_queues.lock().await.remove(session_id);
95    }
96
97    // --- Agent executor ---
98
99    /// Set the agent executor implementation.
100    pub async fn set_agent_executor(&self, executor: Arc<dyn AgentExecutor>) {
101        *self.inner.agent_executor.lock().await = Some(executor);
102    }
103
104    /// Get the agent executor (if set).
105    pub async fn agent_executor(&self) -> Option<Arc<dyn AgentExecutor>> {
106        self.inner.agent_executor.lock().await.clone()
107    }
108}