1use crate::memory::curator::{CurationPolicy, CurationResult, CurationStats, MemoryCurator};
24use crate::memory::durable::SessionData;
25use crate::memory::semantic::MemoryEntry;
26use crate::memory::synthesis::{SynthesisPolicy, SynthesisPromptBuilder, SynthesisResponseParser};
27use crate::memory::trace_analyzer::{AnalysisPolicy, TraceAnalyzer, TraceInsight};
28use crate::types::message::Message;
29
30#[derive(Debug, Clone)]
35pub struct IdleResult {
36 pub sessions_processed: usize,
37 pub insights_extracted: usize,
39 pub stats: CurationStats,
40}
41
42#[derive(Debug)]
47pub enum IdlePhase {
48 Idle,
49 SynthesisPending {
51 seed_insights: Vec<TraceInsight>,
52 existing_memories: Vec<MemoryEntry>,
53 now_ms: u64,
54 sessions_processed: usize,
55 },
56 Done {
57 result: IdleResult,
58 },
59}
60
61pub enum IdleEvent {
62 Trigger {
64 sessions: Vec<SessionData>,
65 existing_memories: Vec<MemoryEntry>,
66 now_ms: u64,
68 },
69 SynthesisResult {
71 content: String,
72 },
73 Abort,
74}
75
76pub enum IdleAction {
77 SynthesizeInsights {
79 messages: Vec<Message>,
80 },
81 CommitMemories {
83 agent_id: String,
84 result: CurationResult,
85 run_result: IdleResult,
86 },
87 Noop,
89 Aborted,
90}
91
92#[derive(Debug, Clone)]
97pub struct IdlePolicy {
98 pub agent_id: String,
99 pub max_sessions_per_run: usize,
101 pub analysis: AnalysisPolicy,
102 pub curation: CurationPolicy,
103 pub synthesis: SynthesisPolicy,
104}
105
106impl IdlePolicy {
107 pub fn new(agent_id: impl Into<String>) -> Self {
108 Self {
109 agent_id: agent_id.into(),
110 max_sessions_per_run: 20,
111 analysis: AnalysisPolicy::default(),
112 curation: CurationPolicy::default(),
113 synthesis: SynthesisPolicy::default(),
114 }
115 }
116}
117
118pub struct IdlePipeline {
124 pub phase: IdlePhase,
125 policy: IdlePolicy,
126 analyzer: TraceAnalyzer,
127 curator: MemoryCurator,
128 prompt_builder: SynthesisPromptBuilder,
129}
130
131impl IdlePipeline {
132 pub fn new(policy: IdlePolicy) -> Self {
133 let analyzer = TraceAnalyzer::new(policy.analysis.clone());
134 let curator = MemoryCurator::new(policy.curation.clone());
135 let prompt_builder = SynthesisPromptBuilder::new(policy.synthesis.clone());
136 Self {
137 phase: IdlePhase::Idle,
138 policy,
139 analyzer,
140 curator,
141 prompt_builder,
142 }
143 }
144
145 pub fn is_idle(&self) -> bool {
146 matches!(self.phase, IdlePhase::Idle)
147 }
148
149 pub fn feed(&mut self, event: IdleEvent) -> IdleAction {
150 match event {
151 IdleEvent::Abort => {
153 self.phase = IdlePhase::Idle;
154 IdleAction::Aborted
155 }
156
157 IdleEvent::Trigger {
159 sessions,
160 existing_memories,
161 now_ms,
162 } => {
163 if sessions.is_empty() {
164 return IdleAction::Noop;
165 }
166
167 let session_tuples: Vec<(String, Vec<Message>)> = sessions
168 .into_iter()
169 .take(self.policy.max_sessions_per_run)
170 .map(|s| (s.session_id, s.messages))
171 .collect();
172 let sessions_processed = session_tuples.len();
173
174 let seed_insights = self.analyzer.analyze_batch(&session_tuples);
176
177 let messages = self.prompt_builder.build(&session_tuples, &seed_insights);
179
180 self.phase = IdlePhase::SynthesisPending {
181 seed_insights,
182 existing_memories,
183 now_ms,
184 sessions_processed,
185 };
186
187 IdleAction::SynthesizeInsights { messages }
188 }
189
190 IdleEvent::SynthesisResult { content } => {
192 let (seed_insights, existing_memories, now_ms, sessions_processed) =
194 match std::mem::replace(&mut self.phase, IdlePhase::Idle) {
195 IdlePhase::SynthesisPending {
196 seed_insights,
197 existing_memories,
198 now_ms,
199 sessions_processed,
200 } => (seed_insights, existing_memories, now_ms, sessions_processed),
201 other => {
202 self.phase = other;
203 return IdleAction::Aborted;
204 }
205 };
206
207 let synthesized = SynthesisResponseParser::parse("synthetic", &content);
209
210 let mut all_insights = seed_insights;
212 all_insights.extend(synthesized);
213 let insights_extracted = all_insights.len();
214
215 let curation_result =
217 self.curator
218 .curate(&all_insights, &existing_memories, now_ms);
219 let stats = curation_result.stats.clone();
220
221 let run_result = IdleResult {
222 sessions_processed,
223 insights_extracted,
224 stats,
225 };
226 self.phase = IdlePhase::Done {
227 result: run_result.clone(),
228 };
229
230 IdleAction::CommitMemories {
231 agent_id: self.policy.agent_id.clone(),
232 result: curation_result,
233 run_result,
234 }
235 }
236 }
237 }
238
239 pub fn reset(&mut self) {
241 self.phase = IdlePhase::Idle;
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use crate::memory::durable::SessionData;
249 use crate::types::message::{ContentPart, Message, ToolCall};
250 use compact_str::CompactString;
251
252 fn pipeline() -> IdlePipeline {
253 IdlePipeline::new(IdlePolicy::new("agent-1"))
254 }
255
256 fn session_with_repeated_error(session_id: &str) -> SessionData {
257 let mut call_msg = Message::assistant("");
258 call_msg.tool_calls = vec![
259 ToolCall {
260 id: CompactString::new("c1"),
261 name: CompactString::new("bash"),
262 arguments: serde_json::Value::Null,
263 },
264 ToolCall {
265 id: CompactString::new("c2"),
266 name: CompactString::new("bash"),
267 arguments: serde_json::Value::Null,
268 },
269 ];
270 let err1 = Message::tool(vec![ContentPart::ToolResult {
271 call_id: CompactString::new("c1"),
272 output: "permission denied".to_string(),
273 is_error: true,
274 }]);
275 let err2 = Message::tool(vec![ContentPart::ToolResult {
276 call_id: CompactString::new("c2"),
277 output: "permission denied".to_string(),
278 is_error: true,
279 }]);
280 SessionData {
281 session_id: session_id.to_string(),
282 agent_id: "agent-1".to_string(),
283 messages: vec![call_msg, err1, err2],
284 metadata: serde_json::Value::Null,
285 created_at_ms: 0,
286 updated_at_ms: 1000,
287 }
288 }
289
290 const VALID_JSON: &str =
291 r#"{"insights":[{"text":"Avoid bash in restricted environments","confidence":0.9}]}"#;
292 const EMPTY_JSON: &str = r#"{"insights":[]}"#;
293
294 #[test]
297 fn starts_idle() {
298 assert!(pipeline().is_idle());
299 }
300
301 #[test]
302 fn empty_sessions_returns_noop_and_stays_idle() {
303 let mut p = pipeline();
304 let action = p.feed(IdleEvent::Trigger {
305 sessions: vec![],
306 existing_memories: vec![],
307 now_ms: 0,
308 });
309 assert!(matches!(action, IdleAction::Noop));
310 assert!(p.is_idle());
311 }
312
313 #[test]
314 fn abort_from_any_phase_resets_to_idle() {
315 let mut p = pipeline();
316 p.feed(IdleEvent::Trigger {
318 sessions: vec![session_with_repeated_error("s1")],
319 existing_memories: vec![],
320 now_ms: 0,
321 });
322 assert!(matches!(p.phase, IdlePhase::SynthesisPending { .. }));
323 let action = p.feed(IdleEvent::Abort);
324 assert!(matches!(action, IdleAction::Aborted));
325 assert!(p.is_idle());
326 }
327
328 #[test]
331 fn trigger_emits_synthesize_insights() {
332 let mut p = pipeline();
333 let action = p.feed(IdleEvent::Trigger {
334 sessions: vec![session_with_repeated_error("s1")],
335 existing_memories: vec![],
336 now_ms: 0,
337 });
338 assert!(
339 matches!(action, IdleAction::SynthesizeInsights { .. }),
340 "expected SynthesizeInsights after Trigger"
341 );
342 assert!(matches!(p.phase, IdlePhase::SynthesisPending { .. }));
343 }
344
345 #[test]
346 fn synthesis_result_emits_commit_memories() {
347 let mut p = pipeline();
348 p.feed(IdleEvent::Trigger {
349 sessions: vec![session_with_repeated_error("s1")],
350 existing_memories: vec![],
351 now_ms: 5000,
352 });
353 let action = p.feed(IdleEvent::SynthesisResult {
354 content: VALID_JSON.to_string(),
355 });
356 match action {
357 IdleAction::CommitMemories {
358 agent_id,
359 result,
360 run_result,
361 } => {
362 assert_eq!(agent_id, "agent-1");
363 assert_eq!(run_result.sessions_processed, 1);
364 assert!(run_result.insights_extracted > 0);
365 assert!(!result.to_add.is_empty());
367 }
368 _ => panic!("expected CommitMemories"),
369 }
370 assert!(matches!(p.phase, IdlePhase::Done { .. }));
371 }
372
373 #[test]
374 fn synthesized_insights_appear_in_result() {
375 let mut p = pipeline();
376 p.feed(IdleEvent::Trigger {
377 sessions: vec![session_with_repeated_error("s1")],
378 existing_memories: vec![],
379 now_ms: 0,
380 });
381 let action = p.feed(IdleEvent::SynthesisResult {
382 content: VALID_JSON.to_string(),
383 });
384 if let IdleAction::CommitMemories { result, .. } = action {
385 let has_synthesized = result
386 .to_add
387 .iter()
388 .any(|e| e.metadata["kind"] == "synthesized");
389 assert!(has_synthesized, "expected at least one synthesized insight");
390 }
391 }
392
393 #[test]
394 fn synthesis_result_without_pending_state_returns_aborted() {
395 let mut p = pipeline();
396 let action = p.feed(IdleEvent::SynthesisResult {
398 content: VALID_JSON.to_string(),
399 });
400 assert!(matches!(action, IdleAction::Aborted));
401 }
402
403 #[test]
406 fn respects_max_sessions_per_run() {
407 let policy = IdlePolicy {
408 max_sessions_per_run: 1,
409 ..IdlePolicy::new("agent-1")
410 };
411 let mut p = IdlePipeline::new(policy);
412 let sessions = vec![
413 session_with_repeated_error("s1"),
414 session_with_repeated_error("s2"),
415 session_with_repeated_error("s3"),
416 ];
417 p.feed(IdleEvent::Trigger {
418 sessions,
419 existing_memories: vec![],
420 now_ms: 0,
421 });
422 let action = p.feed(IdleEvent::SynthesisResult {
423 content: EMPTY_JSON.to_string(),
424 });
425 match action {
426 IdleAction::CommitMemories { run_result, .. } => {
427 assert_eq!(run_result.sessions_processed, 1);
428 }
429 _ => panic!("expected CommitMemories"),
430 }
431 }
432
433 #[test]
436 fn reset_allows_retriggering() {
437 let mut p = pipeline();
438
439 p.feed(IdleEvent::Trigger {
441 sessions: vec![session_with_repeated_error("s1")],
442 existing_memories: vec![],
443 now_ms: 0,
444 });
445 p.feed(IdleEvent::SynthesisResult {
446 content: EMPTY_JSON.to_string(),
447 });
448 assert!(matches!(p.phase, IdlePhase::Done { .. }));
449
450 p.reset();
451 assert!(p.is_idle());
452
453 let action = p.feed(IdleEvent::Trigger {
455 sessions: vec![session_with_repeated_error("s2")],
456 existing_memories: vec![],
457 now_ms: 1000,
458 });
459 assert!(matches!(action, IdleAction::SynthesizeInsights { .. }));
460 }
461}