Skip to main content

deepstrike_core/memory/
idle_pipeline.rs

1/// Idle-time memory consolidation pipeline — the agent's "dreaming" cycle.
2///
3/// # Two-phase flow
4///
5/// ```text
6/// Phase 1 — Rule-based analysis (synchronous, in-kernel)
7/// ┌─────────────────────────────────────────────────┐
8/// │ IdleEvent::Trigger { sessions, memories, now }  │
9/// │   → TraceAnalyzer   (repeated errors, seqs…)   │
10/// │   → SynthesisPromptBuilder (assembles prompt)   │
11/// │   → IdleAction::SynthesizeInsights { messages } │ ← SDK calls LLM
12/// └─────────────────────────────────────────────────┘
13///
14/// Phase 2 — LLM synthesis + curation (after SDK returns)
15/// ┌─────────────────────────────────────────────────┐
16/// │ IdleEvent::SynthesisResult { content }          │
17/// │   → SynthesisResponseParser (JSON → insights)   │
18/// │   → merge seed + synthesized insights           │
19/// │   → MemoryCurator (dedup / conflict / trim)     │
20/// │   → IdleAction::CommitMemories { delta }        │ ← SDK writes store
21/// └─────────────────────────────────────────────────┘
22/// ```
23use crate::memory::curator::{CurationPolicy, CurationResult, CurationStats, MemoryCurator};
24use crate::memory::durable::SessionData;
25use crate::memory::semantic::MemoryEntry;
26use crate::memory::synthesis::{SynthesisPolicy, SynthesisPromptBuilder, SynthesisResponseParser};
27use crate::memory::trace_analyzer::{AnalysisPolicy, TraceAnalyzer, TraceInsight};
28use crate::types::message::Message;
29
30// ---------------------------------------------------------------------------
31// Result
32// ---------------------------------------------------------------------------
33
34#[derive(Debug, Clone)]
35pub struct IdleResult {
36    pub sessions_processed: usize,
37    /// Total insights (rule-based + synthesized) before curation.
38    pub insights_extracted: usize,
39    pub stats: CurationStats,
40}
41
42// ---------------------------------------------------------------------------
43// State machine types
44// ---------------------------------------------------------------------------
45
46#[derive(Debug)]
47pub enum IdlePhase {
48    Idle,
49    /// Rule-based analysis complete; waiting for the SDK to return LLM output.
50    SynthesisPending {
51        seed_insights: Vec<TraceInsight>,
52        existing_memories: Vec<MemoryEntry>,
53        now_ms: u64,
54        sessions_processed: usize,
55    },
56    Done {
57        result: IdleResult,
58    },
59}
60
61pub enum IdleEvent {
62    /// SDK provides raw sessions + current memory snapshot; kernel does the rest.
63    Trigger {
64        sessions: Vec<SessionData>,
65        existing_memories: Vec<MemoryEntry>,
66        /// Wall-clock ms injected by the SDK — kernel never reads system time.
67        now_ms: u64,
68    },
69    /// SDK feeds back the LLM's text response from the synthesis call.
70    SynthesisResult {
71        content: String,
72    },
73    Abort,
74}
75
76pub enum IdleAction {
77    /// Call the LLM with `messages`, then feed `IdleEvent::SynthesisResult`.
78    SynthesizeInsights {
79        messages: Vec<Message>,
80    },
81    /// Apply `result` delta to the SemanticMemory store, then call `reset()`.
82    CommitMemories {
83        agent_id: String,
84        result: CurationResult,
85        run_result: IdleResult,
86    },
87    /// No sessions to process this cycle.
88    Noop,
89    Aborted,
90}
91
92// ---------------------------------------------------------------------------
93// Policy
94// ---------------------------------------------------------------------------
95
96#[derive(Debug, Clone)]
97pub struct IdlePolicy {
98    pub agent_id: String,
99    /// Sessions processed per idle cycle. Default: 20.
100    pub max_sessions_per_run: usize,
101    pub analysis: AnalysisPolicy,
102    pub curation: CurationPolicy,
103    pub synthesis: SynthesisPolicy,
104}
105
106impl IdlePolicy {
107    pub fn new(agent_id: impl Into<String>) -> Self {
108        Self {
109            agent_id: agent_id.into(),
110            max_sessions_per_run: 20,
111            analysis: AnalysisPolicy::default(),
112            curation: CurationPolicy::default(),
113            synthesis: SynthesisPolicy::default(),
114        }
115    }
116}
117
118// ---------------------------------------------------------------------------
119// Pipeline
120// ---------------------------------------------------------------------------
121
122/// Pure state machine — no I/O, no async.
123pub struct IdlePipeline {
124    pub phase: IdlePhase,
125    policy: IdlePolicy,
126    analyzer: TraceAnalyzer,
127    curator: MemoryCurator,
128    prompt_builder: SynthesisPromptBuilder,
129}
130
131impl IdlePipeline {
132    pub fn new(policy: IdlePolicy) -> Self {
133        let analyzer = TraceAnalyzer::new(policy.analysis.clone());
134        let curator = MemoryCurator::new(policy.curation.clone());
135        let prompt_builder = SynthesisPromptBuilder::new(policy.synthesis.clone());
136        Self {
137            phase: IdlePhase::Idle,
138            policy,
139            analyzer,
140            curator,
141            prompt_builder,
142        }
143    }
144
145    pub fn is_idle(&self) -> bool {
146        matches!(self.phase, IdlePhase::Idle)
147    }
148
149    pub fn feed(&mut self, event: IdleEvent) -> IdleAction {
150        match event {
151            // -- Abort -------------------------------------------------------
152            IdleEvent::Abort => {
153                self.phase = IdlePhase::Idle;
154                IdleAction::Aborted
155            }
156
157            // -- Phase 1: rule-based analysis + prompt assembly ---------------
158            IdleEvent::Trigger {
159                sessions,
160                existing_memories,
161                now_ms,
162            } => {
163                if sessions.is_empty() {
164                    return IdleAction::Noop;
165                }
166
167                let session_tuples: Vec<(String, Vec<Message>)> = sessions
168                    .into_iter()
169                    .take(self.policy.max_sessions_per_run)
170                    .map(|s| (s.session_id, s.messages))
171                    .collect();
172                let sessions_processed = session_tuples.len();
173
174                // Rule-based seed insights (pure computation).
175                let seed_insights = self.analyzer.analyze_batch(&session_tuples);
176
177                // Build LLM prompt (pure computation).
178                let messages = self.prompt_builder.build(&session_tuples, &seed_insights);
179
180                self.phase = IdlePhase::SynthesisPending {
181                    seed_insights,
182                    existing_memories,
183                    now_ms,
184                    sessions_processed,
185                };
186
187                IdleAction::SynthesizeInsights { messages }
188            }
189
190            // -- Phase 2: parse LLM output + curate --------------------------
191            IdleEvent::SynthesisResult { content } => {
192                // Extract pending state; reset to Idle on unexpected phase.
193                let (seed_insights, existing_memories, now_ms, sessions_processed) =
194                    match std::mem::replace(&mut self.phase, IdlePhase::Idle) {
195                        IdlePhase::SynthesisPending {
196                            seed_insights,
197                            existing_memories,
198                            now_ms,
199                            sessions_processed,
200                        } => (seed_insights, existing_memories, now_ms, sessions_processed),
201                        other => {
202                            self.phase = other;
203                            return IdleAction::Aborted;
204                        }
205                    };
206
207                // Parse LLM response (pure computation).
208                let synthesized = SynthesisResponseParser::parse("synthetic", &content);
209
210                // Merge: rule-based seeds first, then LLM-synthesized.
211                let mut all_insights = seed_insights;
212                all_insights.extend(synthesized);
213                let insights_extracted = all_insights.len();
214
215                // Curate the combined set against the existing memory store.
216                let curation_result =
217                    self.curator
218                        .curate(&all_insights, &existing_memories, now_ms);
219                let stats = curation_result.stats.clone();
220
221                let run_result = IdleResult {
222                    sessions_processed,
223                    insights_extracted,
224                    stats,
225                };
226                self.phase = IdlePhase::Done {
227                    result: run_result.clone(),
228                };
229
230                IdleAction::CommitMemories {
231                    agent_id: self.policy.agent_id.clone(),
232                    result: curation_result,
233                    run_result,
234                }
235            }
236        }
237    }
238
239    /// Reset to `Idle` after handling `CommitMemories`, allowing the next cycle.
240    pub fn reset(&mut self) {
241        self.phase = IdlePhase::Idle;
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use crate::memory::durable::SessionData;
249    use crate::types::message::{ContentPart, Message, ToolCall};
250    use compact_str::CompactString;
251
252    fn pipeline() -> IdlePipeline {
253        IdlePipeline::new(IdlePolicy::new("agent-1"))
254    }
255
256    fn session_with_repeated_error(session_id: &str) -> SessionData {
257        let mut call_msg = Message::assistant("");
258        call_msg.tool_calls = vec![
259            ToolCall {
260                id: CompactString::new("c1"),
261                name: CompactString::new("bash"),
262                arguments: serde_json::Value::Null,
263            },
264            ToolCall {
265                id: CompactString::new("c2"),
266                name: CompactString::new("bash"),
267                arguments: serde_json::Value::Null,
268            },
269        ];
270        let err1 = Message::tool(vec![ContentPart::ToolResult {
271            call_id: CompactString::new("c1"),
272            output: "permission denied".to_string(),
273            is_error: true,
274        }]);
275        let err2 = Message::tool(vec![ContentPart::ToolResult {
276            call_id: CompactString::new("c2"),
277            output: "permission denied".to_string(),
278            is_error: true,
279        }]);
280        SessionData {
281            session_id: session_id.to_string(),
282            agent_id: "agent-1".to_string(),
283            messages: vec![call_msg, err1, err2],
284            metadata: serde_json::Value::Null,
285            created_at_ms: 0,
286            updated_at_ms: 1000,
287        }
288    }
289
290    const VALID_JSON: &str =
291        r#"{"insights":[{"text":"Avoid bash in restricted environments","confidence":0.9}]}"#;
292    const EMPTY_JSON: &str = r#"{"insights":[]}"#;
293
294    // --- state checks -------------------------------------------------------
295
296    #[test]
297    fn starts_idle() {
298        assert!(pipeline().is_idle());
299    }
300
301    #[test]
302    fn empty_sessions_returns_noop_and_stays_idle() {
303        let mut p = pipeline();
304        let action = p.feed(IdleEvent::Trigger {
305            sessions: vec![],
306            existing_memories: vec![],
307            now_ms: 0,
308        });
309        assert!(matches!(action, IdleAction::Noop));
310        assert!(p.is_idle());
311    }
312
313    #[test]
314    fn abort_from_any_phase_resets_to_idle() {
315        let mut p = pipeline();
316        // Trigger → SynthesisPending, then abort.
317        p.feed(IdleEvent::Trigger {
318            sessions: vec![session_with_repeated_error("s1")],
319            existing_memories: vec![],
320            now_ms: 0,
321        });
322        assert!(matches!(p.phase, IdlePhase::SynthesisPending { .. }));
323        let action = p.feed(IdleEvent::Abort);
324        assert!(matches!(action, IdleAction::Aborted));
325        assert!(p.is_idle());
326    }
327
328    // --- two-phase happy path -----------------------------------------------
329
330    #[test]
331    fn trigger_emits_synthesize_insights() {
332        let mut p = pipeline();
333        let action = p.feed(IdleEvent::Trigger {
334            sessions: vec![session_with_repeated_error("s1")],
335            existing_memories: vec![],
336            now_ms: 0,
337        });
338        assert!(
339            matches!(action, IdleAction::SynthesizeInsights { .. }),
340            "expected SynthesizeInsights after Trigger"
341        );
342        assert!(matches!(p.phase, IdlePhase::SynthesisPending { .. }));
343    }
344
345    #[test]
346    fn synthesis_result_emits_commit_memories() {
347        let mut p = pipeline();
348        p.feed(IdleEvent::Trigger {
349            sessions: vec![session_with_repeated_error("s1")],
350            existing_memories: vec![],
351            now_ms: 5000,
352        });
353        let action = p.feed(IdleEvent::SynthesisResult {
354            content: VALID_JSON.to_string(),
355        });
356        match action {
357            IdleAction::CommitMemories {
358                agent_id,
359                result,
360                run_result,
361            } => {
362                assert_eq!(agent_id, "agent-1");
363                assert_eq!(run_result.sessions_processed, 1);
364                assert!(run_result.insights_extracted > 0);
365                // Expect at least the synthesized LLM insight in to_add.
366                assert!(!result.to_add.is_empty());
367            }
368            _ => panic!("expected CommitMemories"),
369        }
370        assert!(matches!(p.phase, IdlePhase::Done { .. }));
371    }
372
373    #[test]
374    fn synthesized_insights_appear_in_result() {
375        let mut p = pipeline();
376        p.feed(IdleEvent::Trigger {
377            sessions: vec![session_with_repeated_error("s1")],
378            existing_memories: vec![],
379            now_ms: 0,
380        });
381        let action = p.feed(IdleEvent::SynthesisResult {
382            content: VALID_JSON.to_string(),
383        });
384        if let IdleAction::CommitMemories { result, .. } = action {
385            let has_synthesized = result
386                .to_add
387                .iter()
388                .any(|e| e.metadata["kind"] == "synthesized");
389            assert!(has_synthesized, "expected at least one synthesized insight");
390        }
391    }
392
393    #[test]
394    fn synthesis_result_without_pending_state_returns_aborted() {
395        let mut p = pipeline();
396        // Feed SynthesisResult while still in Idle (no Trigger first).
397        let action = p.feed(IdleEvent::SynthesisResult {
398            content: VALID_JSON.to_string(),
399        });
400        assert!(matches!(action, IdleAction::Aborted));
401    }
402
403    // --- policy enforcement -------------------------------------------------
404
405    #[test]
406    fn respects_max_sessions_per_run() {
407        let policy = IdlePolicy {
408            max_sessions_per_run: 1,
409            ..IdlePolicy::new("agent-1")
410        };
411        let mut p = IdlePipeline::new(policy);
412        let sessions = vec![
413            session_with_repeated_error("s1"),
414            session_with_repeated_error("s2"),
415            session_with_repeated_error("s3"),
416        ];
417        p.feed(IdleEvent::Trigger {
418            sessions,
419            existing_memories: vec![],
420            now_ms: 0,
421        });
422        let action = p.feed(IdleEvent::SynthesisResult {
423            content: EMPTY_JSON.to_string(),
424        });
425        match action {
426            IdleAction::CommitMemories { run_result, .. } => {
427                assert_eq!(run_result.sessions_processed, 1);
428            }
429            _ => panic!("expected CommitMemories"),
430        }
431    }
432
433    // --- lifecycle ----------------------------------------------------------
434
435    #[test]
436    fn reset_allows_retriggering() {
437        let mut p = pipeline();
438
439        // First cycle.
440        p.feed(IdleEvent::Trigger {
441            sessions: vec![session_with_repeated_error("s1")],
442            existing_memories: vec![],
443            now_ms: 0,
444        });
445        p.feed(IdleEvent::SynthesisResult {
446            content: EMPTY_JSON.to_string(),
447        });
448        assert!(matches!(p.phase, IdlePhase::Done { .. }));
449
450        p.reset();
451        assert!(p.is_idle());
452
453        // Second cycle.
454        let action = p.feed(IdleEvent::Trigger {
455            sessions: vec![session_with_repeated_error("s2")],
456            existing_memories: vec![],
457            now_ms: 1000,
458        });
459        assert!(matches!(action, IdleAction::SynthesizeInsights { .. }));
460    }
461}