ai_agent/services/auto_dream/
mod.rs1pub mod config;
14pub mod consolidation_lock;
15pub mod consolidation_prompt;
16
17pub use config::*;
18pub use consolidation_lock::*;
19pub use consolidation_prompt::*;
20
21use crate::memdir::paths::get_auto_mem_path;
22use crate::types::message::Message;
23use crate::utils::abort_controller::AbortController;
24use std::future::Future;
25use std::pin::Pin;
26
27const SESSION_SCAN_INTERVAL_MS: u64 = 10 * 60 * 1000; #[derive(Debug, Clone)]
34struct AutoDreamConfig {
35 min_hours: u64,
36 min_sessions: u64,
37}
38
39const DEFAULTS: AutoDreamConfig = AutoDreamConfig {
40 min_hours: 24,
41 min_sessions: 5,
42};
43
44fn get_config() -> AutoDreamConfig {
46 DEFAULTS
47}
48
49fn is_gate_open() -> bool {
52 is_auto_dream_enabled()
53}
54
55#[derive(Debug, Clone)]
57pub struct DreamTaskState {
58 pub task_id: String,
59 pub sessions_reviewing: usize,
60 pub files_touched: Vec<String>,
61 pub status: DreamTaskStatus,
62}
63
64#[derive(Debug, Clone)]
66pub enum DreamTaskStatus {
67 Running,
68 Completed,
69 Failed,
70 Killed,
71}
72
73struct AutoDreamRunnerState {
75 last_session_scan_at: u64,
76}
77
78impl AutoDreamRunnerState {
79 fn new() -> Self {
80 Self {
81 last_session_scan_at: 0,
82 }
83 }
84}
85
86fn make_dream_progress_watcher(
88 task_id: String,
89 _set_app_state: Box<dyn Fn(Box<dyn std::any::Any>) -> Box<dyn std::any::Any> + Send + Sync>,
90) -> Box<dyn Fn(Message) + Send + Sync> {
91 Box::new(move |msg: Message| {
92 if matches!(msg, Message::Assistant(_)) {
93 log::debug!(
94 "[autoDream] progress watcher received assistant message for task {}",
95 task_id
96 );
97 }
98 })
99}
100
101type AutoDreamFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
103
104pub struct AutoDreamHandle {
106 inner: Box<dyn FnOnce() -> AutoDreamFuture + Send>,
107}
108
109impl AutoDreamHandle {
110 pub fn run(self) {
112 let future = (self.inner)();
113 tokio::spawn(async move { future.await });
114 }
115}
116
117pub fn init_auto_dream() -> AutoDreamHandle {
123 let state = std::sync::Arc::new(tokio::sync::Mutex::new(AutoDreamRunnerState::new()));
124
125 AutoDreamHandle {
126 inner: Box::new(move || {
127 let state = state.clone();
128 Box::pin(async move {
129 let cfg = get_config();
130
131 if !is_gate_open() {
133 return;
134 }
135
136 let last_at = consolidation_lock::read_last_consolidated_at().await;
138 let now = chrono::Utc::now().timestamp_millis() as u64;
139 let hours_since = (now - last_at) as f64 / 3_600_000.0;
140
141 if hours_since < cfg.min_hours as f64 {
142 log::debug!(
143 "[autoDream] time gate: {:.1}h since last consolidation, need {}h",
144 hours_since,
145 cfg.min_hours
146 );
147 return;
148 }
149
150 let mut state_guard = state.lock().await;
152 let since_scan_ms = now - state_guard.last_session_scan_at;
153
154 if since_scan_ms < SESSION_SCAN_INTERVAL_MS {
155 log::debug!(
156 "[autoDream] scan throttle — time-gate passed but last scan was {}s ago",
157 since_scan_ms / 1000
158 );
159 return;
160 }
161 state_guard.last_session_scan_at = now;
162 drop(state_guard);
163
164 let session_ids = consolidation_lock::list_sessions_touched_since(last_at).await;
166
167 if session_ids.len() < cfg.min_sessions as usize {
172 log::debug!(
173 "[autoDream] session gate: {} sessions since last consolidation, need {}",
174 session_ids.len(),
175 cfg.min_sessions
176 );
177 return;
178 }
179
180 let prior_mtime = match consolidation_lock::try_acquire_consolidation_lock().await {
182 Some(mtime) => mtime,
183 None => {
184 log::debug!(
185 "[autoDream] lock acquisition failed or another process is consolidating"
186 );
187 return;
188 }
189 };
190
191 log::debug!(
192 "[autoDream] firing — {:.1}h since last, {} sessions to review",
193 hours_since,
194 session_ids.len()
195 );
196
197 let session_list: String = session_ids
199 .iter()
200 .map(|id| format!("- {}", id))
201 .collect::<Vec<_>>()
202 .join("\n");
203
204 let extra = format!(
205 "\n\nSessions since last consolidation ({}):\n{}",
206 session_ids.len(),
207 session_list
208 );
209
210 let memory_root = get_auto_mem_path();
211 let memory_root_str = memory_root.to_string_lossy().to_string();
212
213 let transcript_dir = std::env::current_dir()
215 .map(|p| p.join("sessions").to_string_lossy().to_string())
216 .unwrap_or_else(|_| "sessions/".to_string());
217
218 let _prompt = consolidation_prompt::build_consolidation_prompt(
219 &memory_root_str,
220 &transcript_dir,
221 &extra,
222 );
223
224 log::debug!(
245 "[autoDream] would run forked agent — {} sessions, prompt chars: {}",
246 session_ids.len(),
247 _prompt.len()
248 );
249
250 let _ = prior_mtime;
252 })
253 }),
254 }
255}
256
257pub async fn execute_auto_dream() {
260 let handle = init_auto_dream();
261 handle.run();
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267
268 #[test]
269 fn test_default_config() {
270 let cfg = get_config();
271 assert_eq!(cfg.min_hours, 24);
272 assert_eq!(cfg.min_sessions, 5);
273 }
274
275 #[test]
276 fn test_is_gate_open_returns_true_when_enabled() {
277 assert!(is_gate_open());
278 }
279
280 #[test]
281 fn test_scan_interval() {
282 assert_eq!(SESSION_SCAN_INTERVAL_MS, 10 * 60 * 1000);
283 }
284
285 #[test]
286 fn test_read_last_consolidated_at_no_lock() {
287 let rt = tokio::runtime::Runtime::new().unwrap();
288 let result = rt.block_on(consolidation_lock::read_last_consolidated_at());
289 assert_eq!(result, 0);
290 }
291
292 #[test]
293 fn test_lock_path_is_inside_memory_dir() {
294 let lock = consolidation_lock::lock_path();
295 let mem = get_auto_mem_path();
296 let lock_str = lock.to_string_lossy();
297 let mem_str = mem.to_string_lossy();
298 assert!(lock_str.starts_with(mem_str.as_ref()));
299 }
300
301 #[test]
302 fn test_init_auto_dream_returns_handle() {
303 let handle = init_auto_dream();
304 assert!(std::mem::size_of_val(&handle) > 0);
305 }
306
307 #[test]
308 fn test_dream_task_state() {
309 let state = DreamTaskState {
310 task_id: "task-1".to_string(),
311 sessions_reviewing: 5,
312 files_touched: vec!["mem1.md".to_string(), "mem2.md".to_string()],
313 status: DreamTaskStatus::Running,
314 };
315 assert_eq!(state.sessions_reviewing, 5);
316 assert_eq!(state.files_touched.len(), 2);
317 assert!(matches!(state.status, DreamTaskStatus::Running));
318 }
319}