use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tonic::transport::Server;
use crate::audit::AuditWriter;
use crate::edges::InMemoryEdgeRepo;
use crate::engine::PolicyEngine;
use crate::invalidation::{InvalidationHub, InvalidationServiceImpl};
use crate::registry::AgentRegistry;
use crate::secrets::InMemorySecretsStore;
use crate::service::{
AgentLifecycleServiceImpl, ApprovalServiceImpl, AuditServiceImpl, PolicyServiceImpl, SecretsServiceImpl,
TopologyServiceImpl,
};
use aa_core::{AuditEntry, AuditEventType};
use aa_proto::assembly::agent::v1::agent_lifecycle_service_server::AgentLifecycleServiceServer;
use aa_proto::assembly::approval::v1::approval_service_server::ApprovalServiceServer;
use aa_proto::assembly::audit::v1::audit_service_server::AuditServiceServer;
use aa_proto::assembly::gateway::v1::invalidation_service_server::InvalidationServiceServer;
use aa_proto::assembly::policy::v1::policy_service_server::PolicyServiceServer;
use aa_proto::assembly::secrets::v1::secrets_service_server::SecretsServiceServer;
use aa_proto::assembly::topology::v1::topology_service_server::TopologyServiceServer;
use tokio::sync::broadcast;
use aa_runtime::approval::{ApprovalQueue, ApprovalResolvedNotifier};
use crate::approval::clock::SystemClock;
use crate::approval::db_escalation_scheduler::DbEscalationScheduler;
use crate::approval::escalation::EscalationScheduler;
use crate::approval::NoopAuditSink;
use crate::budget::persistence::{
default_budget_path, load_from_disk, save_to_disk_atomic, start_background_writer, start_window_flush_task,
};
use crate::budget::{BudgetAlert, BudgetTracker, BudgetWindow};
use tokio_util::sync::CancellationToken;
fn default_audit_dir() -> PathBuf {
if let Ok(dir) = std::env::var("AA_AUDIT_DIR") {
if !dir.is_empty() {
return PathBuf::from(dir);
}
}
dirs::data_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join("aa")
.join("audit")
}
fn audit_file_path(audit_dir: &Path, agent_id: &str, session_id: &str) -> PathBuf {
audit_dir.join(format!("{agent_id}-{session_id}.jsonl"))
}
async fn setup_audit(
agent_id: &str,
session_id: &str,
storage: Option<Arc<dyn crate::storage::StorageBackend>>,
) -> Result<(tokio::sync::mpsc::Sender<AuditEntry>, Arc<AtomicU64>, [u8; 32]), Box<dyn std::error::Error>> {
let audit_dir = default_audit_dir();
let audit_path = audit_file_path(&audit_dir, agent_id, session_id);
let initial_hash = AuditWriter::read_last_hash(&audit_path).await?.unwrap_or([0u8; 32]);
let (audit_tx, audit_rx) = tokio::sync::mpsc::channel::<AuditEntry>(4096);
let audit_drops = Arc::new(AtomicU64::new(0));
let mut writer = AuditWriter::new(audit_dir, agent_id, session_id, audit_rx).await?;
if let Some(storage) = storage {
writer = writer.with_storage(storage);
}
tokio::spawn(writer.run());
Ok((audit_tx, audit_drops, initial_hash))
}
fn setup_budget(policy_path: &Path, budget_alert_tx: broadcast::Sender<BudgetAlert>) -> (Arc<BudgetTracker>, PathBuf) {
let budget_path = default_budget_path();
let persisted = load_from_disk(&budget_path).unwrap_or_else(|e| {
tracing::warn!(error = %e, "failed to load budget state, starting fresh");
crate::budget::persistence::PersistedBudget {
per_agent: vec![],
team_budgets: Default::default(),
global: crate::budget::types::BudgetState::new_today(),
timezone: chrono_tz::UTC,
}
});
let yaml = std::fs::read_to_string(policy_path).unwrap_or_default();
let (daily_limit, monthly_limit, window) = if let Ok(output) = crate::policy::PolicyValidator::from_yaml(&yaml) {
let daily = output
.document
.budget
.as_ref()
.and_then(|bp| bp.daily_limit_usd)
.and_then(|v| rust_decimal::Decimal::try_from(v).ok());
let monthly = output
.document
.budget
.as_ref()
.and_then(|bp| bp.monthly_limit_usd)
.and_then(|v| rust_decimal::Decimal::try_from(v).ok());
let window = output.document.budget.as_ref().and_then(|bp| bp.window);
(daily, monthly, window)
} else {
(None, None, None)
};
let mut tracker = BudgetTracker::with_state_and_alert_sender(
crate::budget::PricingTable::default_table(),
daily_limit,
monthly_limit,
persisted,
budget_alert_tx,
);
if let Some(d) = window {
tracker = tracker.with_window(crate::budget::BudgetWindow::Duration(d));
tracing::info!(
window_ms = d.as_millis() as u64,
"budget sub-day rollover window configured"
);
}
let tracker = Arc::new(tracker);
tracing::info!(path = %budget_path.display(), "budget state loaded");
(tracker, budget_path)
}
fn maybe_spawn_window_flush(tracker: Arc<BudgetTracker>) -> Option<tokio::task::JoinHandle<()>> {
match tracker.window() {
BudgetWindow::Daily => None,
BudgetWindow::Duration(interval) => {
let flush_interval = std::cmp::max(interval / 4, std::time::Duration::from_millis(50));
Some(start_window_flush_task(tracker, flush_interval))
}
}
}
async fn start_db_escalation_scheduler(
approval_queue: Arc<ApprovalQueue>,
token: CancellationToken,
) -> Option<Arc<DbEscalationScheduler>> {
let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string());
let db_path = std::path::PathBuf::from(home).join(".aa").join("aa_gateway.db");
let db_url = format!("sqlite://{}?mode=rwc", db_path.display());
let pool = match sqlx::SqlitePool::connect(&db_url).await {
Ok(p) => p,
Err(e) => {
tracing::warn!(error = %e, "failed to open gateway SQLite DB — DB escalation scheduler disabled");
return None;
}
};
let (tx, _rx) = tokio::sync::broadcast::channel(256);
match DbEscalationScheduler::new(
pool,
Arc::new(SystemClock),
approval_queue,
Arc::new(NoopAuditSink),
tx,
std::time::Duration::from_secs(30),
)
.await
{
Ok(scheduler) => {
let scheduler = Arc::new(scheduler);
tokio::spawn(Arc::clone(&scheduler).run(token));
tracing::info!("DB escalation scheduler started");
Some(scheduler)
}
Err(e) => {
tracing::warn!(error = %e, "failed to start DB escalation scheduler — DB escalation disabled");
None
}
}
}
fn start_escalation_scheduler() -> Option<Arc<EscalationScheduler>> {
let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string());
let path = std::path::PathBuf::from(home)
.join(".aa")
.join("pending_escalations.json");
let (tx, _rx) = tokio::sync::broadcast::channel(256);
match EscalationScheduler::new(path, tx, std::time::Duration::from_secs(30)) {
Ok(scheduler) => {
let scheduler = Arc::new(scheduler);
tokio::spawn(Arc::clone(&scheduler).run());
tracing::info!("escalation scheduler started");
Some(scheduler)
}
Err(e) => {
tracing::warn!(error = %e, "failed to start escalation scheduler — approval escalation disabled");
None
}
}
}
async fn shutdown_signal() {
let ctrl_c = tokio::signal::ctrl_c();
#[cfg(unix)]
{
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to register SIGTERM handler");
tokio::select! {
_ = ctrl_c => tracing::info!("received SIGINT, shutting down"),
_ = sigterm.recv() => tracing::info!("received SIGTERM, shutting down"),
}
}
#[cfg(not(unix))]
{
ctrl_c.await.ok();
tracing::info!("received SIGINT, shutting down");
}
}
fn spawn_escalation_audit_task(
scheduler: &Option<Arc<EscalationScheduler>>,
audit_tx: tokio::sync::mpsc::Sender<AuditEntry>,
approval_queue: Arc<ApprovalQueue>,
) {
let Some(sched) = scheduler else { return };
let mut rx = sched.subscribe();
tokio::spawn(async move {
let seq_base = std::sync::atomic::AtomicU64::new(u64::MAX / 2);
let mut prev_hash = [0u8; 32];
loop {
match rx.recv().await {
Ok(event) => {
let seq = seq_base.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
let to_role = event.escalation_approvers.join(",");
let payload = serde_json::json!({
"approval_id": event.request_id.to_string(),
"team_id": event.team_id,
"from_role": event.team_id,
"to_role": to_role,
"escalation_approvers": event.escalation_approvers,
})
.to_string();
let agent_id = aa_core::identity::AgentId::from_bytes([0u8; 16]);
let session_id = aa_core::identity::SessionId::from_bytes([0u8; 16]);
let entry = AuditEntry::new(
seq,
now,
AuditEventType::ApprovalEscalated,
agent_id,
session_id,
payload,
prev_hash,
);
prev_hash = *entry.entry_hash();
let _ = audit_tx.try_send(entry);
let escalation_ts = now / 1_000_000_000; let escalation_status = format!("escalated:{to_role}");
let history_entry = aa_runtime::approval::RoutingHistoryEntry {
at: escalation_ts,
action: "escalated".to_string(),
from_role: None,
to_role: to_role.clone(),
};
approval_queue.record_routing(
event.request_id,
escalation_status,
Some(to_role),
None,
None,
Some(history_entry),
);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(skipped = n, "escalation audit subscriber lagged");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
fn final_budget_save(tracker: &BudgetTracker, budget_path: &Path) {
let snapshot = tracker.snapshot();
match save_to_disk_atomic(budget_path, &snapshot) {
Ok(()) => tracing::info!(path = %budget_path.display(), "budget state saved on shutdown"),
Err(e) => tracing::error!(error = %e, "failed to save budget state on shutdown"),
}
}
pub async fn serve_tcp(
policy_path: &Path,
listen_addr: &str,
registry: Arc<AgentRegistry>,
approval_queue: Arc<ApprovalQueue>,
budget_alert_tx: broadcast::Sender<BudgetAlert>,
storage: Option<Arc<dyn crate::storage::StorageBackend>>,
) -> Result<(), Box<dyn std::error::Error>> {
let (tracker, budget_path) = setup_budget(policy_path, budget_alert_tx);
let _budget_writer = start_background_writer(Arc::clone(&tracker), budget_path.clone());
let _budget_flush = maybe_spawn_window_flush(Arc::clone(&tracker));
let invalidation_hub = InvalidationHub::new();
let engine = Arc::new(
PolicyEngine::load_from_file_with_budget(policy_path, Arc::clone(&tracker))
.map_err(|e| format!("failed to load policy: {e:?}"))?
.with_invalidation_hub(Arc::clone(&invalidation_hub)),
);
let approval_notifier: Arc<dyn ApprovalResolvedNotifier> = invalidation_hub.clone();
approval_queue.set_resolved_notifier(approval_notifier);
let (audit_tx, audit_drops, initial_hash) = setup_audit("gateway", "default", storage).await?;
let escalation_scheduler = start_escalation_scheduler();
let db_token = CancellationToken::new();
let db_scheduler = start_db_escalation_scheduler(Arc::clone(&approval_queue), db_token.clone()).await;
spawn_escalation_audit_task(&escalation_scheduler, audit_tx.clone(), Arc::clone(&approval_queue));
let policy_svc = PolicyServiceImpl::with_registry_approval_and_escalation(
Arc::clone(&engine),
Arc::clone(®istry),
Arc::clone(&approval_queue),
escalation_scheduler.clone(),
audit_tx.clone(),
Arc::clone(&audit_drops),
initial_hash,
)
.with_db_scheduler(db_scheduler.clone());
let audit_svc = AuditServiceImpl::new_with_registry(audit_tx, audit_drops, initial_hash, Arc::clone(®istry));
let (edge_repo, _cross_team_rx) = InMemoryEdgeRepo::with_events(Arc::clone(®istry));
let topology_svc = TopologyServiceImpl::new(Arc::clone(®istry), edge_repo);
let lifecycle_svc = AgentLifecycleServiceImpl::new(registry);
let approval_svc =
ApprovalServiceImpl::new_with_escalation(approval_queue, escalation_scheduler).with_db_scheduler(db_scheduler);
let secrets_svc = SecretsServiceImpl::new(Arc::new(InMemorySecretsStore::new()));
let addr = listen_addr.parse()?;
tracing::info!(%addr, "starting gRPC server on TCP");
Server::builder()
.add_service(PolicyServiceServer::new(policy_svc))
.add_service(AuditServiceServer::new(audit_svc))
.add_service(AgentLifecycleServiceServer::new(lifecycle_svc))
.add_service(ApprovalServiceServer::new(approval_svc))
.add_service(TopologyServiceServer::new(topology_svc))
.add_service(SecretsServiceServer::new(secrets_svc))
.add_service(InvalidationServiceServer::new(InvalidationServiceImpl::new(
Arc::clone(&invalidation_hub),
)))
.serve_with_shutdown(addr, async move {
shutdown_signal().await;
db_token.cancel();
})
.await?;
final_budget_save(&tracker, &budget_path);
Ok(())
}
pub async fn serve_uds(
policy_path: &Path,
socket_path: &Path,
registry: Arc<AgentRegistry>,
approval_queue: Arc<ApprovalQueue>,
budget_alert_tx: broadcast::Sender<BudgetAlert>,
storage: Option<Arc<dyn crate::storage::StorageBackend>>,
) -> Result<(), Box<dyn std::error::Error>> {
let (tracker, budget_path) = setup_budget(policy_path, budget_alert_tx);
let _budget_writer = start_background_writer(Arc::clone(&tracker), budget_path.clone());
let _budget_flush = maybe_spawn_window_flush(Arc::clone(&tracker));
let invalidation_hub = InvalidationHub::new();
let engine = Arc::new(
PolicyEngine::load_from_file_with_budget(policy_path, Arc::clone(&tracker))
.map_err(|e| format!("failed to load policy: {e:?}"))?
.with_invalidation_hub(Arc::clone(&invalidation_hub)),
);
let approval_notifier: Arc<dyn ApprovalResolvedNotifier> = invalidation_hub.clone();
approval_queue.set_resolved_notifier(approval_notifier);
let (audit_tx, audit_drops, initial_hash) = setup_audit("gateway", "default", storage).await?;
let escalation_scheduler = start_escalation_scheduler();
let db_token = CancellationToken::new();
let db_scheduler = start_db_escalation_scheduler(Arc::clone(&approval_queue), db_token.clone()).await;
spawn_escalation_audit_task(&escalation_scheduler, audit_tx.clone(), Arc::clone(&approval_queue));
let policy_svc = PolicyServiceImpl::with_registry_approval_and_escalation(
Arc::clone(&engine),
Arc::clone(®istry),
Arc::clone(&approval_queue),
escalation_scheduler.clone(),
audit_tx.clone(),
Arc::clone(&audit_drops),
initial_hash,
)
.with_db_scheduler(db_scheduler.clone());
let audit_svc = AuditServiceImpl::new_with_registry(audit_tx, audit_drops, initial_hash, Arc::clone(®istry));
let (edge_repo, _cross_team_rx) = InMemoryEdgeRepo::with_events(Arc::clone(®istry));
let topology_svc = TopologyServiceImpl::new(Arc::clone(®istry), edge_repo);
let lifecycle_svc = AgentLifecycleServiceImpl::new(registry);
let approval_svc =
ApprovalServiceImpl::new_with_escalation(approval_queue, escalation_scheduler).with_db_scheduler(db_scheduler);
let secrets_svc = SecretsServiceImpl::new(Arc::new(InMemorySecretsStore::new()));
tracing::info!(socket = %socket_path.display(), "starting gRPC server on UDS");
if socket_path.exists() {
std::fs::remove_file(socket_path)?;
}
let uds = tokio::net::UnixListener::bind(socket_path)?;
let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
Server::builder()
.add_service(PolicyServiceServer::new(policy_svc))
.add_service(AuditServiceServer::new(audit_svc))
.add_service(AgentLifecycleServiceServer::new(lifecycle_svc))
.add_service(ApprovalServiceServer::new(approval_svc))
.add_service(TopologyServiceServer::new(topology_svc))
.add_service(SecretsServiceServer::new(secrets_svc))
.add_service(InvalidationServiceServer::new(InvalidationServiceImpl::new(
Arc::clone(&invalidation_hub),
)))
.serve_with_incoming_shutdown(incoming, async move {
shutdown_signal().await;
db_token.cancel();
})
.await?;
final_budget_save(&tracker, &budget_path);
Ok(())
}