use std::sync::Arc;
use tokio::sync::watch;
use tracing::{info, warn};
use trusty_common::memory_core::dream::{DreamConfig, Dreamer};
use trusty_common::memory_core::PalaceRegistry;
pub const DREAM_DISABLED_ENV: &str = "TRUSTY_DREAM_DISABLED";
pub fn spawn_dream_scheduler(
registry: &PalaceRegistry,
shutdown_rx: watch::Receiver<bool>,
) -> usize {
if std::env::var(DREAM_DISABLED_ENV).is_ok_and(|v| !v.is_empty()) {
info!(
env = DREAM_DISABLED_ENV,
"autonomous dream scheduling disabled ({DREAM_DISABLED_ENV} is set)"
);
return 0;
}
let palace_ids = registry.list();
let mut spawned: usize = 0;
for palace_id in palace_ids {
let handle = match registry.get(&palace_id) {
Some(h) => h,
None => {
warn!(
palace = %palace_id,
"dream_scheduler: handle not in registry after list(); skipping"
);
continue;
}
};
let config = DreamConfig::default();
let idle_secs = config.idle_secs;
let dreamer = Arc::new(Dreamer::new(config));
let rx = shutdown_rx.clone();
dreamer.start_with_shutdown(handle, rx);
spawned += 1;
info!(
palace = %palace_id,
idle_secs,
"dream_scheduler: spawned background dream loop"
);
}
info!(
loops_spawned = spawned,
"dream_scheduler: all per-palace loops running"
);
spawned
}
pub fn make_shutdown_watch() -> (watch::Sender<bool>, watch::Receiver<bool>) {
watch::channel(false)
}
pub fn spawn_shutdown_bridge(tx: watch::Sender<bool>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
trusty_common::shutdown_signal().await;
let _ = tx.send(true);
})
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use std::sync::Arc;
use std::time::Duration;
use trusty_common::memory_core::dream::DreamConfig;
use trusty_common::memory_core::dream::Dreamer;
use trusty_common::memory_core::palace::{Palace, PalaceId};
use trusty_common::memory_core::PalaceRegistry;
fn register_temp_palace(registry: &PalaceRegistry, tmp: &std::path::Path) -> PalaceId {
let palace = Palace {
id: PalaceId::new(format!("test-palace-{}", uuid::Uuid::new_v4())),
name: "Test Palace".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: tmp.to_path_buf(),
};
let id = palace.id.clone();
registry
.create_palace(tmp, palace)
.expect("create test palace");
id
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn dream_scheduler_honors_disable_flag() {
let tmp = tempfile::tempdir().expect("tempdir");
let registry = PalaceRegistry::new();
register_temp_palace(®istry, tmp.path());
unsafe {
std::env::set_var(DREAM_DISABLED_ENV, "1");
}
let (_tx, rx) = make_shutdown_watch();
let spawned = spawn_dream_scheduler(®istry, rx);
unsafe {
std::env::remove_var(DREAM_DISABLED_ENV);
}
assert_eq!(
spawned, 0,
"expect 0 loops when TRUSTY_DREAM_DISABLED=1; got {spawned}"
);
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn dream_scheduler_spawns_per_palace_loop() {
unsafe {
std::env::remove_var(DREAM_DISABLED_ENV);
}
let tmp = tempfile::tempdir().expect("tempdir");
let tmp2 = tempfile::tempdir().expect("tempdir2");
let registry = PalaceRegistry::new();
register_temp_palace(®istry, tmp.path());
register_temp_palace(®istry, tmp2.path());
let (tx, rx) = make_shutdown_watch();
let spawned = spawn_dream_scheduler(®istry, rx);
let _ = tx.send(true);
assert_eq!(
spawned, 2,
"expect 1 loop per palace (2 total); got {spawned}"
);
}
#[tokio::test]
async fn dream_scheduler_shutdown_stops_loop() {
let tmp = tempfile::tempdir().expect("tempdir");
let registry = PalaceRegistry::new();
let id = register_temp_palace(®istry, tmp.path());
let handle = registry.get(&id).expect("handle after create");
let config = DreamConfig {
idle_secs: 1,
..DreamConfig::default()
};
let dreamer = Arc::new(Dreamer::new(config));
let (tx, rx) = make_shutdown_watch();
let join = dreamer.start_with_shutdown(handle, rx);
tx.send(true).expect("send shutdown");
let result = tokio::time::timeout(Duration::from_secs(2), join).await;
assert!(
result.is_ok(),
"dream loop did not exit within 2s after shutdown signal"
);
}
#[tokio::test]
async fn dream_scheduler_no_panic_with_empty_registry() {
unsafe {
std::env::remove_var(DREAM_DISABLED_ENV);
}
let registry = PalaceRegistry::new();
let (_tx, rx) = make_shutdown_watch();
let count = spawn_dream_scheduler(®istry, rx);
assert_eq!(count, 0, "empty registry should produce 0 loops");
}
#[tokio::test]
async fn make_shutdown_watch_pair_works() {
let (tx, mut rx) = make_shutdown_watch();
assert!(!*rx.borrow(), "initial value should be false");
tx.send(true).expect("send");
rx.changed().await.expect("changed");
assert!(*rx.borrow(), "value should flip to true after send");
}
#[tokio::test]
async fn dream_scheduler_loops_actually_run_cycles() {
let tmp = tempfile::tempdir().expect("tempdir");
let registry = PalaceRegistry::new();
let id = register_temp_palace(®istry, tmp.path());
let handle = registry.get(&id).expect("handle after create");
let config = DreamConfig {
idle_secs: 0, ..DreamConfig::default()
};
let dreamer = Arc::new(Dreamer::new(config));
assert!(dreamer.is_idle(), "idle_secs=0 should mean always idle");
let stats = dreamer
.dream_cycle(&handle)
.await
.expect("dream_cycle should not error on a fresh empty palace");
let _ = stats.merged;
let _ = stats.pruned;
let _ = stats.duration_ms;
let palace_data_dir = tmp.path().join(id.as_str());
let persisted =
trusty_common::memory_core::dream::PersistedDreamStats::load(&palace_data_dir)
.expect("load persisted stats")
.expect("persisted stats should exist after a cycle");
assert_eq!(
persisted.stats.merged, stats.merged,
"persisted merged should match returned stats"
);
}
}