Skip to main content

ai_agent/services/auto_dream/
mod.rs

1// Source: ~/claudecode/openclaudecode/src/services/autoDream/autoDream.ts
2//! Background memory consolidation. Fires the /dream prompt as a forked
3//! subagent when time-gate passes AND enough sessions have accumulated.
4//!
5//! Gate order (cheapest first):
6//!   1. Time: hours since lastConsolidatedAt >= minHours (one stat)
7//!   2. Sessions: transcript count with mtime > lastConsolidatedAt >= minSessions
8//!   3. Lock: no other process mid-consolidation
9//!
10//! State is closure-scoped inside init_auto_dream() rather than module-level
11//! (tests call init_auto_dream() in beforeEach for a fresh closure).
12
13pub mod config;
14pub mod consolidation_lock;
15pub mod consolidation_prompt;
16
17pub use config::*;
18pub use consolidation_lock::*;
19pub use consolidation_prompt::*;
20
21use crate::memdir::paths::get_auto_mem_path;
22use crate::types::message::Message;
23use crate::utils::abort_controller::AbortController;
24use std::future::Future;
25use std::pin::Pin;
26
27// Scan throttle: when time-gate passes but session-gate doesn't, the lock
28// mtime doesn't advance, so the time-gate keeps passing every turn.
29const SESSION_SCAN_INTERVAL_MS: u64 = 10 * 60 * 1000; // 10 minutes
30
31/// Thresholds from tengu_onyx_plover. The enabled gate lives in config.ts
32/// (isAutoDreamEnabled); this returns only the scheduling knobs.
33#[derive(Debug, Clone)]
34struct AutoDreamConfig {
35    min_hours: u64,
36    min_sessions: u64,
37}
38
39const DEFAULTS: AutoDreamConfig = AutoDreamConfig {
40    min_hours: 24,
41    min_sessions: 5,
42};
43
44/// Build the configuration. GrowthBook feature values with validation.
45fn get_config() -> AutoDreamConfig {
46    DEFAULTS
47}
48
49/// Check if the auto-dream gate is open.
50/// Returns false when KAIROS/remote/auto-memory disabled.
51fn is_gate_open() -> bool {
52    is_auto_dream_enabled()
53}
54
55/// State for a running dream task.
56#[derive(Debug, Clone)]
57pub struct DreamTaskState {
58    pub task_id: String,
59    pub sessions_reviewing: usize,
60    pub files_touched: Vec<String>,
61    pub status: DreamTaskStatus,
62}
63
64/// Status of a dream task.
65#[derive(Debug, Clone)]
66pub enum DreamTaskStatus {
67    Running,
68    Completed,
69    Failed,
70    Killed,
71}
72
73/// Internal runner state — closure-scoped in init_auto_dream.
74struct AutoDreamRunnerState {
75    last_session_scan_at: u64,
76}
77
78impl AutoDreamRunnerState {
79    fn new() -> Self {
80        Self {
81            last_session_scan_at: 0,
82        }
83    }
84}
85
86/// Progress watcher callback for the forked agent's messages.
87fn make_dream_progress_watcher(
88    task_id: String,
89    _set_app_state: Box<dyn Fn(Box<dyn std::any::Any>) -> Box<dyn std::any::Any> + Send + Sync>,
90) -> Box<dyn Fn(Message) + Send + Sync> {
91    Box::new(move |msg: Message| {
92        if matches!(msg, Message::Assistant(_)) {
93            log::debug!(
94                "[autoDream] progress watcher received assistant message for task {}",
95                task_id
96            );
97        }
98    })
99}
100
101/// Future type alias for the auto-dream runner body.
102type AutoDreamFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
103
104/// Runner handle returned by init_auto_dream.
105pub struct AutoDreamHandle {
106    inner: Box<dyn FnOnce() -> AutoDreamFuture + Send>,
107}
108
109impl AutoDreamHandle {
110    /// Execute the runner. Spawns on the current tokio runtime.
111    pub fn run(self) {
112        let future = (self.inner)();
113        tokio::spawn(async move { future.await });
114    }
115}
116
117/// Initialize the auto-dream background consolidation service.
118/// Call once at startup (from background_housekeeping alongside
119/// init_extract_memories), or per-test for a fresh closure.
120///
121/// Returns a runner handle that can be invoked on each turn.
122pub fn init_auto_dream() -> AutoDreamHandle {
123    let state = std::sync::Arc::new(tokio::sync::Mutex::new(AutoDreamRunnerState::new()));
124
125    AutoDreamHandle {
126        inner: Box::new(move || {
127            let state = state.clone();
128            Box::pin(async move {
129                let cfg = get_config();
130
131                // Gate check
132                if !is_gate_open() {
133                    return;
134                }
135
136                // --- Time gate ---
137                let last_at = consolidation_lock::read_last_consolidated_at().await;
138                let now = chrono::Utc::now().timestamp_millis() as u64;
139                let hours_since = (now - last_at) as f64 / 3_600_000.0;
140
141                if hours_since < cfg.min_hours as f64 {
142                    log::debug!(
143                        "[autoDream] time gate: {:.1}h since last consolidation, need {}h",
144                        hours_since,
145                        cfg.min_hours
146                    );
147                    return;
148                }
149
150                // --- Scan throttle ---
151                let mut state_guard = state.lock().await;
152                let since_scan_ms = now - state_guard.last_session_scan_at;
153
154                if since_scan_ms < SESSION_SCAN_INTERVAL_MS {
155                    log::debug!(
156                        "[autoDream] scan throttle — time-gate passed but last scan was {}s ago",
157                        since_scan_ms / 1000
158                    );
159                    return;
160                }
161                state_guard.last_session_scan_at = now;
162                drop(state_guard);
163
164                // --- Session gate ---
165                let session_ids = consolidation_lock::list_sessions_touched_since(last_at).await;
166
167                // In the full impl, the current session is excluded:
168                //   const currentSession = getSessionId();
169                //   sessionIds = sessionIds.filter(id => id !== currentSession);
170
171                if session_ids.len() < cfg.min_sessions as usize {
172                    log::debug!(
173                        "[autoDream] session gate: {} sessions since last consolidation, need {}",
174                        session_ids.len(),
175                        cfg.min_sessions
176                    );
177                    return;
178                }
179
180                // --- Lock ---
181                let prior_mtime = match consolidation_lock::try_acquire_consolidation_lock().await {
182                    Some(mtime) => mtime,
183                    None => {
184                        log::debug!(
185                            "[autoDream] lock acquisition failed or another process is consolidating"
186                        );
187                        return;
188                    }
189                };
190
191                log::debug!(
192                    "[autoDream] firing — {:.1}h since last, {} sessions to review",
193                    hours_since,
194                    session_ids.len()
195                );
196
197                // Build the extra context string
198                let session_list: String = session_ids
199                    .iter()
200                    .map(|id| format!("- {}", id))
201                    .collect::<Vec<_>>()
202                    .join("\n");
203
204                let extra = format!(
205                    "\n\nSessions since last consolidation ({}):\n{}",
206                    session_ids.len(),
207                    session_list
208                );
209
210                let memory_root = get_auto_mem_path();
211                let memory_root_str = memory_root.to_string_lossy().to_string();
212
213                // In the full impl, transcript_dir comes from get_project_dir(getOriginalCwd())
214                let transcript_dir = std::env::current_dir()
215                    .map(|p| p.join("sessions").to_string_lossy().to_string())
216                    .unwrap_or_else(|_| "sessions/".to_string());
217
218                let _prompt = consolidation_prompt::build_consolidation_prompt(
219                    &memory_root_str,
220                    &transcript_dir,
221                    &extra,
222                );
223
224                // In the full implementation, this would call run_forked_agent:
225                //   let result = run_forked_agent(ForkedAgentConfig {
226                //       prompt_messages: [createUserMessage({ content: prompt })],
227                //       cache_safe_params: create_cache_safe_params(context),
228                //       can_use_tool: create_auto_mem_can_use_tool(memory_root),
229                //       query_source: QuerySource("auto_dream".to_string()),
230                //       fork_label: "auto_dream".to_string(),
231                //       skip_transcript: true,
232                //       overrides: Some(SubagentContextOverrides {
233                //           abort_controller: Some(Arc::new(AbortController::default())),
234                //           ..Default::default()
235                //       }),
236                //       on_message: make_dream_progress_watcher(task_id, set_app_state),
237                //       ..Default::default()
238                //   }).await;
239                //
240                // On success: complete the task, show inline completion.
241                // On failure: rollback the lock mtime so the time-gate fires again.
242
243                // SDK port: log the attempt without executing the query loop
244                log::debug!(
245                    "[autoDream] would run forked agent — {} sessions, prompt chars: {}",
246                    session_ids.len(),
247                    _prompt.len()
248                );
249
250                // prior_mtime intentionally not rolled back on success.
251                let _ = prior_mtime;
252            })
253        }),
254    }
255}
256
257/// Execute the auto-dream consolidation runner.
258/// Per-turn cost when enabled: one stat + one file read.
259pub async fn execute_auto_dream() {
260    let handle = init_auto_dream();
261    handle.run();
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[test]
269    fn test_default_config() {
270        let cfg = get_config();
271        assert_eq!(cfg.min_hours, 24);
272        assert_eq!(cfg.min_sessions, 5);
273    }
274
275    #[test]
276    fn test_is_gate_open_returns_true_when_enabled() {
277        assert!(is_gate_open());
278    }
279
280    #[test]
281    fn test_scan_interval() {
282        assert_eq!(SESSION_SCAN_INTERVAL_MS, 10 * 60 * 1000);
283    }
284
285    #[test]
286    fn test_read_last_consolidated_at_no_lock() {
287        let rt = tokio::runtime::Runtime::new().unwrap();
288        let result = rt.block_on(consolidation_lock::read_last_consolidated_at());
289        assert_eq!(result, 0);
290    }
291
292    #[test]
293    fn test_lock_path_is_inside_memory_dir() {
294        let lock = consolidation_lock::lock_path();
295        let mem = get_auto_mem_path();
296        let lock_str = lock.to_string_lossy();
297        let mem_str = mem.to_string_lossy();
298        assert!(lock_str.starts_with(mem_str.as_ref()));
299    }
300
301    #[test]
302    fn test_init_auto_dream_returns_handle() {
303        let handle = init_auto_dream();
304        assert!(std::mem::size_of_val(&handle) > 0);
305    }
306
307    #[test]
308    fn test_dream_task_state() {
309        let state = DreamTaskState {
310            task_id: "task-1".to_string(),
311            sessions_reviewing: 5,
312            files_touched: vec!["mem1.md".to_string(), "mem2.md".to_string()],
313            status: DreamTaskStatus::Running,
314        };
315        assert_eq!(state.sessions_reviewing, 5);
316        assert_eq!(state.files_touched.len(), 2);
317        assert!(matches!(state.status, DreamTaskStatus::Running));
318    }
319}