use chrono::{DateTime, Utc};
use roboticus_core::config::{DigestConfig, LearningConfig, SessionConfig};
use roboticus_db::Database;
use roboticus_llm::format::UnifiedMessage;
use std::path::PathBuf;
pub struct SessionGovernor {
config: SessionConfig,
digest_config: DigestConfig,
learning_config: LearningConfig,
skills_dir: Option<PathBuf>,
}
impl SessionGovernor {
pub fn new(config: SessionConfig) -> Self {
Self {
config,
digest_config: DigestConfig::default(),
learning_config: LearningConfig::default(),
skills_dir: None,
}
}
pub fn with_digest(mut self, digest_config: DigestConfig) -> Self {
self.digest_config = digest_config;
self
}
pub fn with_learning(mut self, learning_config: LearningConfig, skills_dir: PathBuf) -> Self {
self.learning_config = learning_config;
self.skills_dir = Some(skills_dir);
self
}
pub fn tick(&self, db: &Database) -> roboticus_core::Result<usize> {
let stale =
roboticus_db::sessions::list_stale_active_session_ids(db, self.config.ttl_seconds)?;
let mut expired = 0usize;
for session_id in &stale {
if let Err(e) = self.compact_before_archive(db, session_id) {
tracing::warn!(error = %e, session_id = %session_id, "compaction failed before archive, proceeding with expiry");
}
if let Ok(Some(session)) = roboticus_db::sessions::get_session(db, session_id) {
crate::digest::digest_on_close(db, &self.digest_config, &session);
if let Some(ref skills_dir) = self.skills_dir {
crate::learning::learn_on_close(
db,
&self.learning_config,
&session,
skills_dir,
);
}
}
if let Err(e) = roboticus_db::checkpoint::clear_checkpoints(db, session_id) {
tracing::warn!(error = %e, session_id = %session_id, "failed to clear checkpoints");
}
if let Err(e) = roboticus_db::sessions::set_session_status(
db,
session_id,
roboticus_db::sessions::SessionStatus::Expired,
) {
tracing::error!(error = %e, session_id = %session_id, "failed to expire session");
continue;
}
expired += 1;
}
if let Err(e) = self.decay_episodic_importance(db) {
tracing::warn!(error = %e, "episodic importance decay failed during governor tick");
}
if self.skills_dir.is_some()
&& let Err(e) = self.adjust_learned_skill_priorities(db)
{
tracing::warn!(error = %e, "learned skill priority adjustment failed during governor tick");
}
self.run_retrieval_hygiene(db);
Ok(expired)
}
pub fn spawn(
&self,
db: &Database,
agent_id: &str,
scope: Option<&roboticus_db::sessions::SessionScope>,
) -> roboticus_core::Result<String> {
roboticus_db::sessions::find_or_create(db, agent_id, scope)
}
pub fn rotate_agent_scope_sessions(
&self,
db: &Database,
agent_id: &str,
) -> roboticus_core::Result<usize> {
let sessions = roboticus_db::sessions::list_active_sessions(db, Some(agent_id))?;
let agent_scoped: Vec<_> = sessions
.into_iter()
.filter(|s| s.scope_key.as_deref() == Some("agent"))
.collect();
for s in &agent_scoped {
if let Err(e) = self.compact_before_archive(db, &s.id) {
tracing::warn!(error = %e, session_id = %s.id, "compaction failed before rotation");
}
crate::digest::digest_on_close(db, &self.digest_config, s);
if let Some(ref skills_dir) = self.skills_dir {
crate::learning::learn_on_close(db, &self.learning_config, s, skills_dir);
}
if let Err(e) = roboticus_db::checkpoint::clear_checkpoints(db, &s.id) {
tracing::warn!(error = %e, session_id = %s.id, "failed to clear checkpoints on rotation");
}
}
let archived = agent_scoped.len();
if archived == 0 {
return Ok(0);
}
let _ = roboticus_db::sessions::rotate_agent_session(db, agent_id)?;
Ok(archived)
}
fn compact_before_archive(
&self,
db: &Database,
session_id: &str,
) -> roboticus_core::Result<()> {
let msgs = roboticus_db::sessions::list_messages(db, session_id, None)?;
if msgs.len() < 4 {
return Ok(());
}
if msgs
.iter()
.any(|m| m.role == "system" && m.content.contains("[Conversation Summary"))
{
return Ok(());
}
let keep_recent = 4usize;
let trim_end = msgs.len().saturating_sub(keep_recent);
let trimmed: Vec<UnifiedMessage> = msgs[..trim_end]
.iter()
.map(|m| UnifiedMessage {
role: m.role.clone(),
content: m.content.clone(),
parts: None,
})
.collect();
if trimmed.is_empty() {
return Ok(());
}
let current_tokens = crate::context::count_tokens(&trimmed);
let target_tokens = 500usize;
let excess_ratio = current_tokens as f64 / target_tokens.max(1) as f64;
let stage = crate::context::CompactionStage::from_excess(excess_ratio);
let compacted = crate::context::compact_to_stage(&trimmed, stage);
let summary_lines: Vec<String> = compacted
.iter()
.filter(|m| m.role != "system")
.map(|m| format!("{}: {}", m.role, m.content))
.collect();
let summary_body = if summary_lines.is_empty() {
compacted
.iter()
.map(|m| m.content.clone())
.collect::<Vec<_>>()
.join("\n")
} else {
summary_lines.join("\n")
};
let digest = format!(
"[Conversation Summary — {stage:?}]\n{}",
summary_body.chars().take(2_000).collect::<String>()
);
roboticus_db::sessions::append_message(db, session_id, "system", &digest)?;
Ok(())
}
fn adjust_learned_skill_priorities(&self, db: &Database) -> roboticus_core::Result<usize> {
if !self.learning_config.enabled {
return Ok(0);
}
let skills = roboticus_db::learned_skills::list_learned_skills(db, 200)?;
let mut adjusted = 0usize;
let boost = self.learning_config.priority_boost_on_success as i64;
let decay = self.learning_config.priority_decay_on_failure as i64;
for skill in &skills {
let total = skill.success_count + skill.failure_count;
let ratio = if total > 0 {
skill.success_count as f64 / total as f64
} else {
0.0
};
let new_priority = if total > 5 && ratio > 0.8 {
(skill.priority + boost).min(100)
} else if skill.failure_count > skill.success_count {
(skill.priority - decay).max(0)
} else {
continue;
};
if new_priority != skill.priority {
if let Err(e) = roboticus_db::learned_skills::update_learned_skill_priority(
db,
&skill.name,
new_priority,
) {
tracing::warn!(error = %e, skill = %skill.name, "failed to adjust skill priority");
} else {
adjusted += 1;
}
}
}
Ok(adjusted)
}
fn run_retrieval_hygiene(&self, db: &Database) {
let stale_days = self.learning_config.stale_procedural_days;
let dead_threshold = self.learning_config.dead_skill_priority_threshold;
let conn = db.conn();
let proc_total: i64 = conn
.query_row("SELECT COUNT(*) FROM procedural_memory", [], |r| r.get(0))
.unwrap_or(0);
let proc_stale: i64 = conn
.query_row(
"SELECT COUNT(*) FROM procedural_memory \
WHERE success_count = 0 AND failure_count = 0 \
AND updated_at < datetime('now', ?1)",
[format!("-{stale_days} days")],
|r| r.get(0),
)
.unwrap_or(0);
let skills_total: i64 = conn
.query_row("SELECT COUNT(*) FROM learned_skills", [], |r| r.get(0))
.unwrap_or(0);
let skills_dead: i64 = conn
.query_row(
"SELECT COUNT(*) FROM learned_skills WHERE priority <= ?1",
[dead_threshold],
|r| r.get(0),
)
.unwrap_or(0);
let avg_skill_priority: f64 = conn
.query_row(
"SELECT COALESCE(AVG(priority), 0) FROM learned_skills",
[],
|r| r.get(0),
)
.unwrap_or(0.0);
drop(conn);
let proc_pruned: i64 = match roboticus_db::memory::prune_stale_procedural(db, stale_days) {
Ok(0) => 0,
Ok(n) => {
tracing::info!(count = n, "pruned stale procedural memory entries");
n as i64
}
Err(e) => {
tracing::warn!(error = %e, "stale procedural pruning failed");
0
}
};
let skills_pruned: i64 =
match roboticus_db::learned_skills::find_dead_learned_skills(db, dead_threshold) {
Ok(dead) if dead.is_empty() => 0,
Ok(dead) => {
let count = dead.len() as i64;
for (name, md_path) in &dead {
if let Some(path) = md_path
&& let Err(e) = std::fs::remove_file(path)
&& e.kind() != std::io::ErrorKind::NotFound
{
tracing::warn!(
error = %e, skill = %name, path = %path,
"failed to remove dead learned skill file"
);
}
tracing::info!(skill = %name, "pruned dead learned skill");
}
let names: Vec<String> = dead.iter().map(|(n, _)| n.clone()).collect();
if let Err(e) =
roboticus_db::learned_skills::delete_learned_skills_by_names(db, &names)
{
tracing::warn!(error = %e, "failed to delete dead learned skill DB rows");
}
count
}
Err(e) => {
tracing::warn!(error = %e, "dead learned skill pruning failed");
0
}
};
let sweep_input = roboticus_db::hygiene_log::HygieneSweepInput {
stale_procedural_days: stale_days,
dead_skill_priority_threshold: dead_threshold,
proc_total,
proc_stale,
proc_pruned,
skills_total,
skills_dead,
skills_pruned,
avg_skill_priority,
};
if let Err(e) = roboticus_db::hygiene_log::log_hygiene_sweep(db, &sweep_input) {
tracing::warn!(error = %e, "failed to log hygiene sweep");
}
}
pub fn diagnose_retrieval_health(&self, db: &Database) -> String {
let mut lines = Vec::new();
let conn = db.conn();
let proc_total: i64 = conn
.query_row("SELECT COUNT(*) FROM procedural_memory", [], |r| r.get(0))
.unwrap_or(0);
let proc_stale: i64 = conn
.query_row(
"SELECT COUNT(*) FROM procedural_memory \
WHERE success_count = 0 AND failure_count = 0 \
AND updated_at < datetime('now', ?1)",
[format!(
"-{} days",
self.learning_config.stale_procedural_days
)],
|r| r.get(0),
)
.unwrap_or(0);
lines.push(format!(
"procedural_memory: {proc_total} total, {proc_stale} stale \
(zero-activity, >{} days)",
self.learning_config.stale_procedural_days
));
let skill_total: i64 = conn
.query_row("SELECT COUNT(*) FROM learned_skills", [], |r| r.get(0))
.unwrap_or(0);
let skill_dead: i64 = conn
.query_row(
"SELECT COUNT(*) FROM learned_skills WHERE priority <= ?1",
[self.learning_config.dead_skill_priority_threshold],
|r| r.get(0),
)
.unwrap_or(0);
let skill_low: i64 = conn
.query_row(
"SELECT COUNT(*) FROM learned_skills WHERE priority > ?1 AND priority < 20",
[self.learning_config.dead_skill_priority_threshold],
|r| r.get(0),
)
.unwrap_or(0);
let avg_priority: f64 = conn
.query_row(
"SELECT COALESCE(AVG(priority), 0) FROM learned_skills",
[],
|r| r.get(0),
)
.unwrap_or(0.0);
lines.push(format!(
"learned_skills: {skill_total} total, {skill_dead} dead (priority ≤ {}), \
{skill_low} low (< 20), avg priority {avg_priority:.0}",
self.learning_config.dead_skill_priority_threshold
));
lines.push(format!(
"config: stale_procedural_days={}, dead_skill_priority_threshold={}, \
max_learned_skills={}",
self.learning_config.stale_procedural_days,
self.learning_config.dead_skill_priority_threshold,
self.learning_config.max_learned_skills,
));
lines.join("\n")
}
fn decay_episodic_importance(&self, db: &Database) -> roboticus_core::Result<usize> {
let half_life_days = self.digest_config.decay_half_life_days as f64;
if half_life_days <= 0.0 {
return Ok(0);
}
let now = Utc::now();
let conn = db.conn();
let mut stmt = conn
.prepare("SELECT id, importance, created_at FROM episodic_memory")
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?;
let rows = stmt
.query_map([], |row| {
let id: String = row.get(0)?;
let importance: i32 = row.get(1)?;
let created_at: String = row.get(2)?;
Ok((id, importance, created_at))
})
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?;
let mut updates: Vec<(String, i32)> = Vec::new();
for row in rows {
let (id, importance, created_at) =
row.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?;
if let Ok(created_dt) = DateTime::parse_from_rfc3339(&created_at) {
let age_days = (now - created_dt.with_timezone(&Utc))
.to_std()
.map(|d| d.as_secs_f64() / 86_400.0)
.unwrap_or(0.0);
let decayed = crate::digest::decay_importance(importance, age_days, half_life_days);
if decayed != importance {
updates.push((id, decayed));
}
}
}
drop(stmt);
if !updates.is_empty() {
conn.execute_batch("BEGIN")
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?;
for (id, new_importance) in &updates {
conn.execute(
"UPDATE episodic_memory SET importance = ?1 WHERE id = ?2",
(&new_importance, id),
)
.map_err(|e| {
let _ = conn.execute_batch("ROLLBACK");
roboticus_core::RoboticusError::Database(e.to_string())
})?;
}
conn.execute_batch("COMMIT")
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?;
}
Ok(updates.len())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_db() -> Database {
Database::new(":memory:").unwrap()
}
#[test]
fn governor_tick_no_sessions() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
let expired = gov.tick(&db).unwrap();
assert_eq!(expired, 0);
}
#[test]
fn governor_spawn_creates_session() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
let sid = gov.spawn(&db, "gov-agent", None).unwrap();
assert!(!sid.is_empty());
let sid2 = gov.spawn(&db, "gov-agent", None).unwrap();
assert_eq!(sid, sid2, "same agent should reuse session");
}
#[test]
fn governor_spawn_with_scope() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
let scope = roboticus_db::sessions::SessionScope::Peer {
peer_id: "alice".into(),
channel: "telegram".into(),
};
let sid_scoped = gov.spawn(&db, "gov-agent", Some(&scope)).unwrap();
let sid_plain = gov.spawn(&db, "gov-agent", None).unwrap();
assert_ne!(sid_scoped, sid_plain);
}
#[test]
fn rotate_agent_scope_sessions_keeps_single_active_session() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
let sid1 = roboticus_db::sessions::create_new(&db, "gov-rotate", None).unwrap();
let rotated = gov.rotate_agent_scope_sessions(&db, "gov-rotate").unwrap();
assert_eq!(rotated, 1);
let active = roboticus_db::sessions::list_active_sessions(&db, Some("gov-rotate")).unwrap();
assert_eq!(active.len(), 1);
assert_eq!(active[0].scope_key.as_deref(), Some("agent"));
assert_ne!(active[0].id, sid1);
let archived = roboticus_db::sessions::get_session(&db, &sid1)
.unwrap()
.unwrap();
assert_eq!(archived.status, "archived");
}
#[test]
fn compact_before_archive_fewer_than_4_messages_is_noop() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
let sid = roboticus_db::sessions::create_new(&db, "compact-few", None).unwrap();
roboticus_db::sessions::append_message(&db, &sid, "user", "hello").unwrap();
roboticus_db::sessions::append_message(&db, &sid, "assistant", "hi there").unwrap();
gov.compact_before_archive(&db, &sid).unwrap();
let msgs = roboticus_db::sessions::list_messages(&db, &sid, Some(50)).unwrap();
assert_eq!(msgs.len(), 2);
assert!(
!msgs
.iter()
.any(|m| m.content.contains("[Conversation Summary"))
);
}
#[test]
fn compact_before_archive_with_enough_messages_appends_digest() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
let sid = roboticus_db::sessions::create_new(&db, "compact-enough", None).unwrap();
for i in 0..6 {
let role = if i % 2 == 0 { "user" } else { "assistant" };
roboticus_db::sessions::append_message(&db, &sid, role, &format!("message number {i}"))
.unwrap();
}
gov.compact_before_archive(&db, &sid).unwrap();
let msgs = roboticus_db::sessions::list_messages(&db, &sid, Some(50)).unwrap();
assert_eq!(msgs.len(), 7);
let last = msgs.last().unwrap();
assert_eq!(last.role, "system");
assert!(
last.content.contains("[Conversation Summary"),
"expected summary header"
);
}
#[test]
fn compact_before_archive_trims_old_keeps_recent_4() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
let sid = roboticus_db::sessions::create_new(&db, "compact-trim", None).unwrap();
for i in 0..8 {
let role = if i % 2 == 0 { "user" } else { "assistant" };
roboticus_db::sessions::append_message(&db, &sid, role, &format!("content-{i}"))
.unwrap();
}
gov.compact_before_archive(&db, &sid).unwrap();
let msgs = roboticus_db::sessions::list_messages(&db, &sid, Some(50)).unwrap();
let summary_msg = msgs
.iter()
.find(|m| m.content.contains("[Conversation Summary"))
.unwrap();
assert!(
summary_msg.content.contains("content-0"),
"summary should include trimmed message 0"
);
assert!(
summary_msg.content.contains("content-3"),
"summary should include trimmed message 3"
);
}
#[test]
fn compact_before_archive_exactly_4_messages_is_noop() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
let sid = roboticus_db::sessions::create_new(&db, "compact-exact", None).unwrap();
for i in 0..4 {
let role = if i % 2 == 0 { "user" } else { "assistant" };
roboticus_db::sessions::append_message(&db, &sid, role, &format!("msg-{i}")).unwrap();
}
gov.compact_before_archive(&db, &sid).unwrap();
let msgs = roboticus_db::sessions::list_messages(&db, &sid, Some(50)).unwrap();
assert_eq!(msgs.len(), 4);
}
#[test]
fn compact_before_archive_idempotent_on_double_call() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
let sid = roboticus_db::sessions::create_new(&db, "compact-idem", None).unwrap();
for i in 0..6 {
let role = if i % 2 == 0 { "user" } else { "assistant" };
roboticus_db::sessions::append_message(&db, &sid, role, &format!("msg-{i}")).unwrap();
}
gov.compact_before_archive(&db, &sid).unwrap();
let msgs_after_first = roboticus_db::sessions::list_messages(&db, &sid, Some(50)).unwrap();
let summary_count_1 = msgs_after_first
.iter()
.filter(|m| m.content.contains("[Conversation Summary"))
.count();
assert_eq!(summary_count_1, 1);
gov.compact_before_archive(&db, &sid).unwrap();
let msgs_after_second = roboticus_db::sessions::list_messages(&db, &sid, Some(50)).unwrap();
let summary_count_2 = msgs_after_second
.iter()
.filter(|m| m.content.contains("[Conversation Summary"))
.count();
assert_eq!(summary_count_2, 1, "should not append a second summary");
}
#[test]
fn tick_expires_stale_sessions_with_compaction() {
let gov = SessionGovernor::new(SessionConfig {
ttl_seconds: 0, ..SessionConfig::default()
});
let db = test_db();
let sid = roboticus_db::sessions::create_new(&db, "stale-agent", None).unwrap();
for i in 0..6 {
let role = if i % 2 == 0 { "user" } else { "assistant" };
roboticus_db::sessions::append_message(&db, &sid, role, &format!("stale-msg-{i}"))
.unwrap();
}
std::thread::sleep(std::time::Duration::from_millis(50));
let expired = gov.tick(&db).unwrap();
assert_eq!(expired, 1);
let session = roboticus_db::sessions::get_session(&db, &sid)
.unwrap()
.unwrap();
assert_eq!(session.status, "expired");
let msgs = roboticus_db::sessions::list_messages(&db, &sid, Some(50)).unwrap();
assert!(
msgs.iter()
.any(|m| m.content.contains("[Conversation Summary")),
"compaction should have appended a summary"
);
}
#[test]
fn rotate_with_no_sessions_returns_zero() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
let rotated = gov
.rotate_agent_scope_sessions(&db, "nonexistent-agent")
.unwrap();
assert_eq!(rotated, 0);
}
#[test]
fn adjust_priorities_boosts_reliable_skills() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
roboticus_db::learned_skills::store_learned_skill(
&db,
"reliable-skill",
"A reliable skill",
"[]",
"[]",
None,
)
.unwrap();
for _ in 0..6 {
roboticus_db::learned_skills::record_learned_skill_success(&db, "reliable-skill")
.unwrap();
}
let adjusted = gov.adjust_learned_skill_priorities(&db).unwrap();
assert_eq!(adjusted, 1);
let skill = roboticus_db::learned_skills::get_learned_skill_by_name(&db, "reliable-skill")
.unwrap()
.unwrap();
assert!(
skill.priority > 50,
"priority should have been boosted from 50, got {}",
skill.priority
);
}
#[test]
fn adjust_priorities_decays_unreliable_skills() {
let gov = SessionGovernor::new(SessionConfig::default());
let db = test_db();
roboticus_db::learned_skills::store_learned_skill(
&db,
"flaky-skill",
"An unreliable skill",
"[]",
"[]",
None,
)
.unwrap();
for _ in 0..3 {
roboticus_db::learned_skills::record_learned_skill_failure(&db, "flaky-skill").unwrap();
}
let adjusted = gov.adjust_learned_skill_priorities(&db).unwrap();
assert_eq!(adjusted, 1);
let skill = roboticus_db::learned_skills::get_learned_skill_by_name(&db, "flaky-skill")
.unwrap()
.unwrap();
assert!(
skill.priority < 50,
"priority should have decayed from 50, got {}",
skill.priority
);
}
#[test]
fn adjust_priorities_disabled_config_skips() {
let learning_config = LearningConfig {
enabled: false,
..Default::default()
};
let gov = SessionGovernor::new(SessionConfig::default())
.with_learning(learning_config, PathBuf::from("/tmp"));
let db = test_db();
let adjusted = gov.adjust_learned_skill_priorities(&db).unwrap();
assert_eq!(adjusted, 0);
}
}