1mod approvals;
9mod bridge;
10
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::{Mutex, RwLock, broadcast, mpsc, oneshot};
14
15use opendev_config::ModelRegistry;
16use opendev_history::SessionManager;
17use opendev_http::UserStore;
18use opendev_models::AppConfig;
19
20#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
22pub struct WsBroadcast {
23 #[serde(rename = "type")]
24 pub msg_type: String,
25 #[serde(default)]
26 pub data: serde_json::Value,
27}
28
29#[derive(Clone)]
31pub struct AppState {
32 inner: Arc<AppStateInner>,
33}
34
35pub(super) struct AppStateInner {
36 pub(super) session_manager: RwLock<SessionManager>,
38 pub(super) config: RwLock<AppConfig>,
40 pub(super) working_dir: String,
42 pub(super) ws_tx: broadcast::Sender<WsBroadcast>,
44 pub(super) pending_approvals: Mutex<HashMap<String, PendingApprovalSlot>>,
46 pub(super) pending_ask_users: Mutex<HashMap<String, PendingAskUserSlot>>,
48 pub(super) pending_plan_approvals: Mutex<HashMap<String, PendingPlanApprovalSlot>>,
50 pub(super) mode: RwLock<OperationMode>,
52 pub(super) autonomy_level: RwLock<String>,
54 pub(super) interrupt_requested: Mutex<bool>,
56 pub(super) running_sessions: Mutex<HashMap<String, String>>,
58 pub(super) injection_queues: Mutex<HashMap<String, mpsc::Sender<String>>>,
60 pub(super) agent_executor: Mutex<Option<Arc<dyn AgentExecutor>>>,
62 pub(super) user_store: Arc<UserStore>,
64 pub(super) model_registry: RwLock<ModelRegistry>,
66 pub(super) bridge: RwLock<BridgeState>,
68}
69
70#[derive(Debug, Default)]
73pub(super) struct BridgeState {
74 pub(super) session_id: Option<String>,
76 pub(super) active: bool,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
82#[serde(rename_all = "lowercase")]
83pub enum OperationMode {
84 Normal,
85 Plan,
86}
87
88impl std::fmt::Display for OperationMode {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 match self {
91 OperationMode::Normal => write!(f, "normal"),
92 OperationMode::Plan => write!(f, "plan"),
93 }
94 }
95}
96
97#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
99pub struct PendingApproval {
100 pub tool_name: String,
101 pub arguments: serde_json::Value,
102 pub session_id: Option<String>,
103}
104
105pub(super) struct PendingApprovalSlot {
107 pub meta: PendingApproval,
108 pub tx: Option<oneshot::Sender<ApprovalResult>>,
109}
110
111#[derive(Debug, Clone)]
113pub struct ApprovalResult {
114 pub approved: bool,
115 pub auto_approve: bool,
116}
117
118#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
120pub struct PendingAskUser {
121 pub prompt: String,
122 pub session_id: Option<String>,
123}
124
125pub(super) struct PendingAskUserSlot {
127 pub meta: PendingAskUser,
128 pub tx: Option<oneshot::Sender<AskUserResult>>,
129}
130
131#[derive(Debug, Clone)]
133pub struct AskUserResult {
134 pub answers: Option<serde_json::Value>,
135 pub cancelled: bool,
136}
137
138#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
140pub struct PendingPlanApproval {
141 pub data: serde_json::Value,
142 pub session_id: Option<String>,
143}
144
145pub(super) struct PendingPlanApprovalSlot {
147 pub meta: PendingPlanApproval,
148 pub tx: Option<oneshot::Sender<PlanApprovalResult>>,
149}
150
151#[derive(Debug, Clone)]
153pub struct PlanApprovalResult {
154 pub action: String,
155 pub feedback: String,
156}
157
158#[async_trait::async_trait]
160pub trait AgentExecutor: Send + Sync + 'static {
161 async fn execute_query(
163 &self,
164 message: String,
165 session_id: String,
166 state: AppState,
167 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
168}
169
170const INJECTION_QUEUE_CAPACITY: usize = 10;
172
173impl AppState {
174 pub fn new(
176 session_manager: SessionManager,
177 config: AppConfig,
178 working_dir: String,
179 user_store: UserStore,
180 model_registry: ModelRegistry,
181 ) -> Self {
182 let (ws_tx, _) = broadcast::channel(256);
183 Self {
184 inner: Arc::new(AppStateInner {
185 session_manager: RwLock::new(session_manager),
186 config: RwLock::new(config),
187 working_dir,
188 ws_tx,
189 pending_approvals: Mutex::new(HashMap::new()),
190 pending_ask_users: Mutex::new(HashMap::new()),
191 pending_plan_approvals: Mutex::new(HashMap::new()),
192 mode: RwLock::new(OperationMode::Normal),
193 autonomy_level: RwLock::new("Manual".to_string()),
194 interrupt_requested: Mutex::new(false),
195 running_sessions: Mutex::new(HashMap::new()),
196 injection_queues: Mutex::new(HashMap::new()),
197 agent_executor: Mutex::new(None),
198 user_store: Arc::new(user_store),
199 model_registry: RwLock::new(model_registry),
200 bridge: RwLock::new(BridgeState::default()),
201 }),
202 }
203 }
204
205 pub async fn session_manager(&self) -> tokio::sync::RwLockReadGuard<'_, SessionManager> {
209 self.inner.session_manager.read().await
210 }
211
212 pub async fn session_manager_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, SessionManager> {
214 self.inner.session_manager.write().await
215 }
216
217 pub async fn current_session_id(&self) -> Option<String> {
219 self.inner
220 .session_manager
221 .read()
222 .await
223 .current_session()
224 .map(|s| s.id.clone())
225 }
226
227 pub async fn config(&self) -> tokio::sync::RwLockReadGuard<'_, AppConfig> {
229 self.inner.config.read().await
230 }
231
232 pub async fn config_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, AppConfig> {
234 self.inner.config.write().await
235 }
236
237 pub fn working_dir(&self) -> &str {
239 &self.inner.working_dir
240 }
241
242 pub fn user_store(&self) -> &UserStore {
246 &self.inner.user_store
247 }
248
249 pub async fn model_registry(&self) -> tokio::sync::RwLockReadGuard<'_, ModelRegistry> {
253 self.inner.model_registry.read().await
254 }
255
256 pub async fn model_registry_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, ModelRegistry> {
258 self.inner.model_registry.write().await
259 }
260
261 pub fn ws_sender(&self) -> broadcast::Sender<WsBroadcast> {
265 self.inner.ws_tx.clone()
266 }
267
268 pub fn ws_subscribe(&self) -> broadcast::Receiver<WsBroadcast> {
270 self.inner.ws_tx.subscribe()
271 }
272
273 pub fn broadcast(&self, msg: WsBroadcast) {
275 let _ = self.inner.ws_tx.send(msg);
277 }
278
279 pub async fn mode(&self) -> OperationMode {
283 *self.inner.mode.read().await
284 }
285
286 pub async fn set_mode(&self, mode: OperationMode) {
288 *self.inner.mode.write().await = mode;
289 }
290
291 pub async fn autonomy_level(&self) -> String {
295 self.inner.autonomy_level.read().await.clone()
296 }
297
298 pub async fn set_autonomy_level(&self, level: String) {
300 *self.inner.autonomy_level.write().await = level;
301 }
302
303 pub async fn request_interrupt(&self) {
310 *self.inner.interrupt_requested.lock().await = true;
311
312 {
314 let mut approvals = self.inner.pending_approvals.lock().await;
315 for (_id, slot) in approvals.iter_mut() {
316 if let Some(tx) = slot.tx.take() {
317 let _ = tx.send(ApprovalResult {
318 approved: false,
319 auto_approve: false,
320 });
321 }
322 }
323 approvals.clear();
324 }
325
326 {
328 let mut ask_users = self.inner.pending_ask_users.lock().await;
329 for (_id, slot) in ask_users.iter_mut() {
330 if let Some(tx) = slot.tx.take() {
331 let _ = tx.send(AskUserResult {
332 answers: None,
333 cancelled: true,
334 });
335 }
336 }
337 ask_users.clear();
338 }
339
340 {
342 let mut plan_approvals = self.inner.pending_plan_approvals.lock().await;
343 for (_id, slot) in plan_approvals.iter_mut() {
344 if let Some(tx) = slot.tx.take() {
345 let _ = tx.send(PlanApprovalResult {
346 action: "reject".to_string(),
347 feedback: "Interrupted".to_string(),
348 });
349 }
350 }
351 plan_approvals.clear();
352 }
353 }
354
355 pub async fn clear_interrupt(&self) {
357 *self.inner.interrupt_requested.lock().await = false;
358 }
359
360 pub async fn is_interrupt_requested(&self) -> bool {
362 *self.inner.interrupt_requested.lock().await
363 }
364
365 pub async fn set_session_running(&self, session_id: String) {
369 self.inner
370 .running_sessions
371 .lock()
372 .await
373 .insert(session_id, "running".to_string());
374 }
375
376 pub async fn set_session_idle(&self, session_id: &str) {
378 self.inner.running_sessions.lock().await.remove(session_id);
379 }
380
381 pub async fn is_session_running(&self, session_id: &str) -> bool {
383 self.inner
384 .running_sessions
385 .lock()
386 .await
387 .contains_key(session_id)
388 }
389
390 pub fn git_branch(&self) -> Option<String> {
394 let output = std::process::Command::new("git")
395 .args(["rev-parse", "--abbrev-ref", "HEAD"])
396 .current_dir(&self.inner.working_dir)
397 .output()
398 .ok()?;
399
400 if output.status.success() {
401 Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
402 } else {
403 None
404 }
405 }
406}
407
408#[cfg(test)]
409mod tests;