mod approvals;
mod bridge;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, broadcast, mpsc, oneshot};
use opendev_config::ModelRegistry;
use opendev_history::SessionManager;
use opendev_http::UserStore;
use opendev_models::AppConfig;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WsBroadcast {
#[serde(rename = "type")]
pub msg_type: String,
#[serde(default)]
pub data: serde_json::Value,
}
#[derive(Clone)]
pub struct AppState {
inner: Arc<AppStateInner>,
}
pub(super) struct AppStateInner {
pub(super) session_manager: RwLock<SessionManager>,
pub(super) config: RwLock<AppConfig>,
pub(super) working_dir: String,
pub(super) ws_tx: broadcast::Sender<WsBroadcast>,
pub(super) pending_approvals: Mutex<HashMap<String, PendingApprovalSlot>>,
pub(super) pending_ask_users: Mutex<HashMap<String, PendingAskUserSlot>>,
pub(super) pending_plan_approvals: Mutex<HashMap<String, PendingPlanApprovalSlot>>,
pub(super) mode: RwLock<OperationMode>,
pub(super) autonomy_level: RwLock<String>,
pub(super) interrupt_requested: Mutex<bool>,
pub(super) running_sessions: Mutex<HashMap<String, String>>,
pub(super) injection_queues: Mutex<HashMap<String, mpsc::Sender<String>>>,
pub(super) agent_executor: Mutex<Option<Arc<dyn AgentExecutor>>>,
pub(super) user_store: Arc<UserStore>,
pub(super) model_registry: RwLock<ModelRegistry>,
pub(super) bridge: RwLock<BridgeState>,
}
#[derive(Debug, Default)]
pub(super) struct BridgeState {
pub(super) session_id: Option<String>,
pub(super) active: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum OperationMode {
Normal,
Plan,
}
impl std::fmt::Display for OperationMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OperationMode::Normal => write!(f, "normal"),
OperationMode::Plan => write!(f, "plan"),
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PendingApproval {
pub tool_name: String,
pub arguments: serde_json::Value,
pub session_id: Option<String>,
}
pub(super) struct PendingApprovalSlot {
pub meta: PendingApproval,
pub tx: Option<oneshot::Sender<ApprovalResult>>,
}
#[derive(Debug, Clone)]
pub struct ApprovalResult {
pub approved: bool,
pub auto_approve: bool,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PendingAskUser {
pub prompt: String,
pub session_id: Option<String>,
}
pub(super) struct PendingAskUserSlot {
pub meta: PendingAskUser,
pub tx: Option<oneshot::Sender<AskUserResult>>,
}
#[derive(Debug, Clone)]
pub struct AskUserResult {
pub answers: Option<serde_json::Value>,
pub cancelled: bool,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PendingPlanApproval {
pub data: serde_json::Value,
pub session_id: Option<String>,
}
pub(super) struct PendingPlanApprovalSlot {
pub meta: PendingPlanApproval,
pub tx: Option<oneshot::Sender<PlanApprovalResult>>,
}
#[derive(Debug, Clone)]
pub struct PlanApprovalResult {
pub action: String,
pub feedback: String,
}
#[async_trait::async_trait]
pub trait AgentExecutor: Send + Sync + 'static {
async fn execute_query(
&self,
message: String,
session_id: String,
state: AppState,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}
const INJECTION_QUEUE_CAPACITY: usize = 10;
impl AppState {
pub fn new(
session_manager: SessionManager,
config: AppConfig,
working_dir: String,
user_store: UserStore,
model_registry: ModelRegistry,
) -> Self {
let (ws_tx, _) = broadcast::channel(256);
Self {
inner: Arc::new(AppStateInner {
session_manager: RwLock::new(session_manager),
config: RwLock::new(config),
working_dir,
ws_tx,
pending_approvals: Mutex::new(HashMap::new()),
pending_ask_users: Mutex::new(HashMap::new()),
pending_plan_approvals: Mutex::new(HashMap::new()),
mode: RwLock::new(OperationMode::Normal),
autonomy_level: RwLock::new("Manual".to_string()),
interrupt_requested: Mutex::new(false),
running_sessions: Mutex::new(HashMap::new()),
injection_queues: Mutex::new(HashMap::new()),
agent_executor: Mutex::new(None),
user_store: Arc::new(user_store),
model_registry: RwLock::new(model_registry),
bridge: RwLock::new(BridgeState::default()),
}),
}
}
pub async fn session_manager(&self) -> tokio::sync::RwLockReadGuard<'_, SessionManager> {
self.inner.session_manager.read().await
}
pub async fn session_manager_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, SessionManager> {
self.inner.session_manager.write().await
}
pub async fn current_session_id(&self) -> Option<String> {
self.inner
.session_manager
.read()
.await
.current_session()
.map(|s| s.id.clone())
}
pub async fn config(&self) -> tokio::sync::RwLockReadGuard<'_, AppConfig> {
self.inner.config.read().await
}
pub async fn config_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, AppConfig> {
self.inner.config.write().await
}
pub fn working_dir(&self) -> &str {
&self.inner.working_dir
}
pub fn user_store(&self) -> &UserStore {
&self.inner.user_store
}
pub async fn model_registry(&self) -> tokio::sync::RwLockReadGuard<'_, ModelRegistry> {
self.inner.model_registry.read().await
}
pub async fn model_registry_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, ModelRegistry> {
self.inner.model_registry.write().await
}
pub fn ws_sender(&self) -> broadcast::Sender<WsBroadcast> {
self.inner.ws_tx.clone()
}
pub fn ws_subscribe(&self) -> broadcast::Receiver<WsBroadcast> {
self.inner.ws_tx.subscribe()
}
pub fn broadcast(&self, msg: WsBroadcast) {
let _ = self.inner.ws_tx.send(msg);
}
pub async fn mode(&self) -> OperationMode {
*self.inner.mode.read().await
}
pub async fn set_mode(&self, mode: OperationMode) {
*self.inner.mode.write().await = mode;
}
pub async fn autonomy_level(&self) -> String {
self.inner.autonomy_level.read().await.clone()
}
pub async fn set_autonomy_level(&self, level: String) {
*self.inner.autonomy_level.write().await = level;
}
pub async fn request_interrupt(&self) {
*self.inner.interrupt_requested.lock().await = true;
{
let mut approvals = self.inner.pending_approvals.lock().await;
for (_id, slot) in approvals.iter_mut() {
if let Some(tx) = slot.tx.take() {
let _ = tx.send(ApprovalResult {
approved: false,
auto_approve: false,
});
}
}
approvals.clear();
}
{
let mut ask_users = self.inner.pending_ask_users.lock().await;
for (_id, slot) in ask_users.iter_mut() {
if let Some(tx) = slot.tx.take() {
let _ = tx.send(AskUserResult {
answers: None,
cancelled: true,
});
}
}
ask_users.clear();
}
{
let mut plan_approvals = self.inner.pending_plan_approvals.lock().await;
for (_id, slot) in plan_approvals.iter_mut() {
if let Some(tx) = slot.tx.take() {
let _ = tx.send(PlanApprovalResult {
action: "reject".to_string(),
feedback: "Interrupted".to_string(),
});
}
}
plan_approvals.clear();
}
}
pub async fn clear_interrupt(&self) {
*self.inner.interrupt_requested.lock().await = false;
}
pub async fn is_interrupt_requested(&self) -> bool {
*self.inner.interrupt_requested.lock().await
}
pub async fn set_session_running(&self, session_id: String) {
self.inner
.running_sessions
.lock()
.await
.insert(session_id, "running".to_string());
}
pub async fn set_session_idle(&self, session_id: &str) {
self.inner.running_sessions.lock().await.remove(session_id);
}
pub async fn is_session_running(&self, session_id: &str) -> bool {
self.inner
.running_sessions
.lock()
.await
.contains_key(session_id)
}
pub fn git_branch(&self) -> Option<String> {
let output = std::process::Command::new("git")
.args(["rev-parse", "--abbrev-ref", "HEAD"])
.current_dir(&self.inner.working_dir)
.output()
.ok()?;
if output.status.success() {
Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
} else {
None
}
}
}
#[cfg(test)]
mod tests;