use std::collections::{HashMap, VecDeque};
use std::path::PathBuf;
use std::sync::Arc;
use dashmap::DashMap;
use parking_lot::{Mutex, RwLock};
use crate::core::agent::Delegation;
use crate::core::circuit::{CircuitBreaker, CircuitConfig};
use crate::core::hook::HookEventRecord;
use crate::core::memory::{MemoryConfig, MemoryUsage};
use crate::core::overseer::Overseer;
use crate::core::overseer_config::OverseerConfig;
use crate::core::paths::FrameworkPaths;
use crate::core::project::ProjectInfo;
use crate::core::session::{Session, SessionId};
use crate::daemon::audit::AuditLogger;
use crate::daemon::optimizer::OptimizerConfig;
use super::overseer::{build_overseer, load_optimizer_config, load_overseer, make_audit_logger};
use super::sm::build_session_manager_agent;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ReapResult {
pub reaped: usize,
pub stopped: usize,
}
pub const HOOK_HISTORY_LIMIT: usize = 1024;
pub const EVENT_CHANNEL_CAPACITY: usize = 1024;
pub const PAIR_CODE_TTL: std::time::Duration = std::time::Duration::from_secs(300);
#[derive(Debug)]
pub struct DaemonState {
pub(super) sessions: DashMap<SessionId, Session>,
pub(super) delegations: DashMap<uuid::Uuid, Delegation>,
pub(super) breakers: DashMap<String, CircuitBreaker>,
pub(super) memory: DashMap<SessionId, MemoryUsage>,
pub(super) hook_history: Mutex<std::collections::VecDeque<HookEventRecord>>,
pub memory_config: MemoryConfig,
pub circuit_config: CircuitConfig,
pub(super) trusty_addrs: Mutex<Option<crate::daemon::discover::TrustyAddrs>>,
pub(super) optimizer: Arc<parking_lot::RwLock<OptimizerConfig>>,
pub(super) projects: Arc<RwLock<HashMap<PathBuf, ProjectInfo>>>,
pub(super) overseer: Arc<dyn Overseer>,
pub(super) overseer_handler: String,
pub(super) llm: Option<Arc<crate::daemon::llm_overseer::LlmOverseer>>,
pub(super) session_manager_agent: Arc<crate::core::sm::SessionManagerAgent>,
pub(super) audit: Arc<AuditLogger>,
pub(super) paired_chat_id: Mutex<Option<i64>>,
pub(super) pair_code: Mutex<Option<(String, std::time::Instant)>>,
pub(super) framework_root: PathBuf,
pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
pub(super) managed_sessions:
tokio::sync::OnceCell<std::sync::Arc<crate::session_manager::SessionManager>>,
pub(super) activity_monitor: std::sync::OnceLock<
std::sync::Arc<
crate::activity::monitor::ActivityMonitor<
crate::activity::monitor::OpenRouterClassifier,
>,
>,
>,
}
impl Default for DaemonState {
fn default() -> Self {
Self::new()
}
}
impl DaemonState {
pub fn new() -> Self {
let optimizer = load_optimizer_config();
let build = load_overseer();
let framework_root = FrameworkPaths::default().root;
let paired = crate::daemon::pairing_store::load(&framework_root).map(|r| r.chat_id);
if let Some(chat_id) = paired {
tracing::info!("restored persisted Telegram pairing (chat {chat_id})");
}
let (event_tx, _) = tokio::sync::broadcast::channel(EVENT_CHANNEL_CAPACITY);
Self {
sessions: DashMap::new(),
delegations: DashMap::new(),
breakers: DashMap::new(),
memory: DashMap::new(),
hook_history: Mutex::new(VecDeque::with_capacity(HOOK_HISTORY_LIMIT)),
memory_config: MemoryConfig::default(),
circuit_config: CircuitConfig::default(),
trusty_addrs: Mutex::new(None),
optimizer: Arc::new(parking_lot::RwLock::new(optimizer)),
projects: Arc::new(RwLock::new(HashMap::new())),
overseer: build.overseer,
overseer_handler: build.handler,
llm: build.llm,
session_manager_agent: build_session_manager_agent(&framework_root),
audit: make_audit_logger(&framework_root),
paired_chat_id: Mutex::new(paired),
pair_code: Mutex::new(None),
framework_root,
event_tx,
managed_sessions: tokio::sync::OnceCell::new(),
activity_monitor: std::sync::OnceLock::new(),
}
}
pub fn shared() -> Arc<Self> {
Arc::new(Self::new())
}
#[doc(hidden)]
pub fn with_root(root: PathBuf) -> Self {
let mut state = Self::new();
let paired = crate::daemon::pairing_store::load(&root).map(|r| r.chat_id);
*state.paired_chat_id.lock() = paired;
state.framework_root = root;
state
}
pub fn with_paths(paths: &FrameworkPaths) -> Self {
let optimizer = match OptimizerConfig::load_from_file(&paths.optimizer_config()) {
Ok(cfg) => cfg,
Err(e) => {
tracing::warn!("failed to load optimizer config: {e}; using defaults");
OptimizerConfig::default()
}
};
let overseer_cfg = OverseerConfig::load_from(&paths.overseer_config());
let build = build_overseer(overseer_cfg);
let framework_root = paths.root.clone();
let paired = crate::daemon::pairing_store::load(&framework_root).map(|r| r.chat_id);
let (event_tx, _) = tokio::sync::broadcast::channel(EVENT_CHANNEL_CAPACITY);
Self {
sessions: DashMap::new(),
delegations: DashMap::new(),
breakers: DashMap::new(),
memory: DashMap::new(),
hook_history: Mutex::new(VecDeque::with_capacity(HOOK_HISTORY_LIMIT)),
memory_config: MemoryConfig::default(),
circuit_config: CircuitConfig::default(),
trusty_addrs: Mutex::new(None),
optimizer: Arc::new(parking_lot::RwLock::new(optimizer)),
projects: Arc::new(RwLock::new(HashMap::new())),
overseer: build.overseer,
overseer_handler: build.handler,
llm: build.llm,
session_manager_agent: build_session_manager_agent(&framework_root),
audit: make_audit_logger(&framework_root),
paired_chat_id: Mutex::new(paired),
pair_code: Mutex::new(None),
framework_root,
event_tx,
managed_sessions: tokio::sync::OnceCell::new(),
activity_monitor: std::sync::OnceLock::new(),
}
}
pub fn activity_monitor(
&self,
) -> std::sync::Arc<
crate::activity::monitor::ActivityMonitor<crate::activity::monitor::OpenRouterClassifier>,
> {
self.activity_monitor
.get_or_init(|| {
let model = std::env::var("TRUSTY_LLM_MODEL")
.unwrap_or_else(|_| "openai/gpt-4o-mini".to_owned());
let classifier = crate::activity::monitor::OpenRouterClassifier::new();
Arc::new(crate::activity::monitor::ActivityMonitor::new(
classifier, model,
))
})
.clone()
}
pub async fn session_manager(&self) -> std::sync::Arc<crate::session_manager::SessionManager> {
self.managed_sessions
.get_or_init(|| async {
let data_dir = self.framework_root.join("session-manager");
let tmux: std::sync::Arc<dyn crate::session_manager::ManagedTmuxDriver> =
match crate::session_manager::RealTmuxDriver::discover() {
Ok(d) => std::sync::Arc::new(d),
Err(e) => {
tracing::warn!("tmux unavailable for managed sessions: {e}");
std::sync::Arc::new(crate::session_manager::real_tmux::NoopTmuxDriver)
}
};
let mgr = match crate::session_manager::SessionManager::new(&data_dir, tmux.clone())
.await
{
Ok(mgr) => mgr,
Err(e) => {
tracing::error!(
"failed to load managed session store at {}: {e}; using temp dir",
data_dir.display()
);
let tmp = std::env::temp_dir().join("trusty-mpm-session-manager");
let _ = std::fs::create_dir_all(&tmp);
crate::session_manager::SessionManager::new(&tmp, tmux)
.await
.expect("temp-dir session store must load")
}
};
let auto_resume = std::env::var("TRUSTY_MPM_AUTO_RESUME")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
match mgr.reconcile_on_boot(auto_resume).await {
Ok(report) => {
let n_adopted = report.adopted.len();
let n_stopped = report.stopped.len();
let n_external = report.external_adopted.len();
if n_adopted > 0 || n_stopped > 0 || n_external > 0 {
tracing::info!(
adopted = n_adopted,
stopped = n_stopped,
external = n_external,
"session-manager reconcile complete"
);
}
}
Err(e) => {
tracing::warn!("session-manager reconcile failed: {e}");
}
}
std::sync::Arc::new(mgr)
})
.await
.clone()
}
#[cfg(test)]
pub fn with_session_manager(
mgr: std::sync::Arc<crate::session_manager::SessionManager>,
) -> Self {
let state = Self::new();
let _ = state.managed_sessions.set(mgr);
state
}
#[cfg(test)]
pub fn with_session_manager_agent(agent: Arc<crate::core::sm::SessionManagerAgent>) -> Self {
let mut state = Self::new();
state.session_manager_agent = agent;
state
}
}