1use std::sync::Arc;
4
5use chrono::{DateTime, Utc};
6use serde::Deserialize;
7use storage::SqlitePool;
8
9use crate::{GangliaError, ProactiveMessage};
10
11#[derive(Debug, Clone)]
13pub struct OpenLoopConfig {
14 pub scan_window_hours: u32,
16 pub resolution_window_hours: u32,
18 pub max_reminders: usize,
20}
21
22impl Default for OpenLoopConfig {
23 fn default() -> Self {
24 Self {
25 scan_window_hours: 72,
26 resolution_window_hours: 24,
27 max_reminders: 3,
28 }
29 }
30}
31
32#[derive(Debug, Clone)]
34pub struct OpenLoop {
35 pub commitment: String,
36 pub topic: String,
37 pub committed_at: String,
38 pub agent: Option<String>,
39}
40
41pub struct OpenLoopDetector {
51 db: SqlitePool,
52 config: OpenLoopConfig,
53 llm: Option<Arc<dyn cortex::LlmProvider>>,
54}
55
56const COMMITMENT_PHRASES: &[&str] = &[
58 "i'll",
59 "i will",
60 "i need to",
61 "i should",
62 "i must",
63 "remind me to",
64 "don't forget",
65 "need to remember",
66 "going to",
67 "plan to",
68 "want to",
69 "have to",
70 "todo",
71 "to-do",
72];
73
74const RESOLUTION_MARKERS: &[&str] = &[
76 "done",
77 "finished",
78 "completed",
79 "did it",
80 "checked off",
81 "resolved",
82 "took care",
83 "handled",
84 "sorted",
85 "already",
86];
87
88type EpisodeRow = (String, String, String, Option<String>);
90
91impl OpenLoopDetector {
92 pub fn new(db: SqlitePool, config: OpenLoopConfig) -> Self {
93 Self {
94 db,
95 config,
96 llm: None,
97 }
98 }
99
100 pub fn with_llm(
102 db: SqlitePool,
103 config: OpenLoopConfig,
104 llm: Arc<dyn cortex::LlmProvider>,
105 ) -> Self {
106 Self {
107 db,
108 config,
109 llm: Some(llm),
110 }
111 }
112
113 fn fetch_episodes(&self) -> Result<Vec<EpisodeRow>, GangliaError> {
115 let scan_cutoff =
116 Utc::now() - chrono::TimeDelta::hours(self.config.scan_window_hours as i64);
117 let scan_str = scan_cutoff.to_rfc3339();
118
119 self.db
120 .with_conn(|conn| {
121 let mut stmt = conn.prepare(
122 "SELECT id, content, timestamp, agent FROM episodes
123 WHERE timestamp >= ?1 AND role = 'user'
124 ORDER BY timestamp ASC",
125 )?;
126 let rows = stmt
127 .query_map([&scan_str], |row| {
128 Ok((
129 row.get::<_, String>(0)?,
130 row.get::<_, String>(1)?,
131 row.get::<_, String>(2)?,
132 row.get::<_, Option<String>>(3)?,
133 ))
134 })?
135 .collect::<Result<Vec<_>, _>>()?;
136 Ok(rows)
137 })
138 .map_err(Into::into)
139 }
140
141 pub fn detect_open_loops(&self) -> Result<Vec<OpenLoop>, GangliaError> {
143 let now = Utc::now();
144 let resolution_cutoff =
145 now - chrono::TimeDelta::hours(self.config.resolution_window_hours as i64);
146
147 let episodes = self.fetch_episodes()?;
148
149 let mut open_loops = Vec::new();
150
151 for (id, content, timestamp, agent) in &episodes {
152 let lower = content.to_lowercase();
153
154 let topic = COMMITMENT_PHRASES.iter().find_map(|phrase| {
158 lower.find(phrase).map(|pos| {
159 let after = &lower[pos + phrase.len()..];
160 let trimmed = after
161 .trim()
162 .trim_start_matches(|c: char| !c.is_alphanumeric());
163 trimmed
164 .split_whitespace()
165 .take(8)
166 .collect::<Vec<_>>()
167 .join(" ")
168 })
169 });
170
171 let topic = match topic {
172 Some(t) if t.len() >= 3 => t,
173 _ => continue,
174 };
175
176 let committed_dt = DateTime::parse_from_rfc3339(timestamp)
178 .map(|d| d.with_timezone(&Utc))
179 .unwrap_or(now);
180 if committed_dt > resolution_cutoff {
181 continue;
182 }
183
184 let topic_words: Vec<String> = topic
186 .split_whitespace()
187 .map(brain::normalize_keyword)
188 .filter(|w| w.len() >= 4)
189 .collect();
190 if topic_words.is_empty() {
191 continue;
192 }
193
194 let resolved = episodes.iter().any(|(eid, econtent, ets, _)| {
196 if eid == id {
197 return false;
198 }
199 let edt = DateTime::parse_from_rfc3339(ets)
200 .map(|d| d.with_timezone(&Utc))
201 .unwrap_or(now);
202 if edt <= committed_dt {
203 return false;
204 }
205 let elower = econtent.to_lowercase();
206
207 let has_topic_ref = topic_words
208 .iter()
209 .filter(|w| elower.contains(w.as_str()))
210 .count()
211 >= topic_words.len().clamp(1, 2);
212
213 let has_resolution_marker = RESOLUTION_MARKERS.iter().any(|m| elower.contains(m));
214
215 (has_topic_ref && has_resolution_marker)
216 || topic_words
217 .iter()
218 .filter(|w| elower.contains(w.as_str()))
219 .count()
220 >= topic_words.len().max(2)
221 });
222
223 if !resolved {
224 open_loops.push(OpenLoop {
225 commitment: content.clone(),
226 topic: topic.clone(),
227 committed_at: timestamp.clone(),
228 agent: agent.clone(),
229 });
230 }
231 }
232
233 open_loops.truncate(self.config.max_reminders);
234 Ok(open_loops)
235 }
236
237 pub async fn detect_open_loops_async(&self) -> Result<Vec<OpenLoop>, GangliaError> {
239 if self.llm.is_none() {
240 return self.detect_open_loops();
241 }
242
243 let episodes = self.fetch_episodes()?;
244 if episodes.is_empty() {
245 return Ok(Vec::new());
246 }
247
248 let llm = match self.llm.as_ref() {
249 Some(l) => l,
250 None => return self.detect_open_loops(),
251 };
252 let timeout = tokio::time::Duration::from_millis(2000);
253 match tokio::time::timeout(timeout, self.detect_with_llm(llm, &episodes)).await {
254 Ok(Ok(loops)) => Ok(loops),
255 Ok(Err(e)) => {
256 tracing::debug!("LLM open-loop detection failed: {e}, falling back to keywords");
257 self.detect_open_loops()
258 }
259 Err(_) => {
260 tracing::debug!("LLM open-loop detection timed out, falling back to keywords");
261 self.detect_open_loops()
262 }
263 }
264 }
265
266 async fn detect_with_llm(
268 &self,
269 llm: &Arc<dyn cortex::LlmProvider>,
270 episodes: &[EpisodeRow],
271 ) -> Result<Vec<OpenLoop>, cortex::LlmError> {
272 let resolution_cutoff =
273 Utc::now() - chrono::TimeDelta::hours(self.config.resolution_window_hours as i64);
274 let cutoff_str = resolution_cutoff.to_rfc3339();
275
276 let mut message_lines = String::new();
277 for (i, (_id, content, ts, _agent)) in episodes.iter().enumerate() {
278 message_lines.push_str(&format!("[{i}] ({ts}) {content}\n"));
279 }
280
281 let prompt = format!(
282 "Analyze these user messages for unresolved commitments.\n\
283 A commitment is when the user says they will/need/should/plan to do something.\n\
284 A commitment is resolved if a later message indicates it was done/finished/completed.\n\
285 Only flag commitments older than: {cutoff_str}\n\
286 Return ONLY JSON array: [{{\"index\":N,\"topic\":\"brief description\",\"resolved\":false}}]\n\
287 Return [] if none found.\n\
288 Messages:\n{message_lines}"
289 );
290
291 let messages = vec![cortex::Message::user(prompt)];
292
293 let response = llm.generate(&messages).await?;
294 let parsed =
295 parse_commitment_response(&response.content, episodes, self.config.max_reminders)?;
296 Ok(parsed)
297 }
298
299 pub fn generate_reminders(&self) -> Result<Vec<ProactiveMessage>, GangliaError> {
301 let loops = self.detect_open_loops()?;
302 Ok(format_reminders(loops))
303 }
304
305 pub async fn generate_reminders_async(&self) -> Result<Vec<ProactiveMessage>, GangliaError> {
307 let loops = self.detect_open_loops_async().await?;
308 Ok(format_reminders(loops))
309 }
310}
311
312fn format_reminders(loops: Vec<OpenLoop>) -> Vec<ProactiveMessage> {
314 loops
315 .into_iter()
316 .map(|ol| {
317 let content = if let Some(ref agent) = ol.agent {
318 format!(
319 "Open loop from {}: you mentioned \"{}\" — still pending. Want to follow up?",
320 agent, ol.topic
321 )
322 } else {
323 format!(
324 "Open loop: you mentioned \"{}\" — still pending. Want to follow up?",
325 ol.topic
326 )
327 };
328 ProactiveMessage {
329 content,
330 triggered_by: format!("open_loop:{}", ol.topic),
331 created_at: Utc::now(),
332 agent: ol.agent,
333 }
334 })
335 .collect()
336}
337
338fn parse_commitment_response(
340 raw: &str,
341 episodes: &[EpisodeRow],
342 max_reminders: usize,
343) -> Result<Vec<OpenLoop>, cortex::LlmError> {
344 #[derive(Deserialize)]
345 struct CommitmentEntry {
346 index: usize,
347 topic: String,
348 #[serde(default)]
349 resolved: bool,
350 }
351
352 let trimmed = raw.trim();
353
354 let entries: Vec<CommitmentEntry> = if let Ok(parsed) =
355 serde_json::from_str::<Vec<CommitmentEntry>>(trimmed)
356 {
357 parsed
358 } else if let Some(start) = trimmed.find('[') {
359 if let Some(end) = trimmed.rfind(']') {
360 serde_json::from_str::<Vec<CommitmentEntry>>(&trimmed[start..=end]).map_err(|e| {
361 cortex::LlmError::InvalidFormat(format!("Could not parse commitment JSON: {e}"))
362 })?
363 } else {
364 return Err(cortex::LlmError::InvalidFormat(
365 "No closing bracket in commitment response".to_string(),
366 ));
367 }
368 } else {
369 return Err(cortex::LlmError::InvalidFormat(
370 "No JSON array in commitment response".to_string(),
371 ));
372 };
373
374 let mut loops: Vec<OpenLoop> = entries
375 .into_iter()
376 .filter(|e| !e.resolved && e.index < episodes.len())
377 .map(|e| {
378 let (_, content, ts, agent) = &episodes[e.index];
379 OpenLoop {
380 commitment: content.clone(),
381 topic: e.topic,
382 committed_at: ts.clone(),
383 agent: agent.clone(),
384 }
385 })
386 .collect();
387
388 loops.truncate(max_reminders);
389 Ok(loops)
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395
396 fn test_detector() -> (OpenLoopDetector, SqlitePool) {
397 let pool = storage::SqlitePool::open_memory().unwrap();
398 pool.with_conn(|conn| {
399 conn.execute(
400 "INSERT INTO sessions (id, channel) VALUES ('test-session', 'test')",
401 [],
402 )
403 .unwrap();
404 Ok(())
405 })
406 .unwrap();
407 let config = OpenLoopConfig {
408 scan_window_hours: 72,
409 resolution_window_hours: 1,
410 max_reminders: 5,
411 };
412 let detector = OpenLoopDetector::new(pool.clone(), config);
413 (detector, pool)
414 }
415
416 fn insert_episode(pool: &SqlitePool, id: &str, content: &str, hours_ago: i64) {
417 let ts = (Utc::now() - chrono::TimeDelta::hours(hours_ago)).to_rfc3339();
418 pool.with_conn(|conn| {
419 conn.execute(
420 "INSERT INTO episodes (id, session_id, role, content, timestamp)
421 VALUES (?1, 'test-session', 'user', ?2, ?3)",
422 rusqlite::params![id, content, ts],
423 )
424 .unwrap();
425 Ok(())
426 })
427 .unwrap();
428 }
429
430 #[test]
431 fn test_open_loop_no_episodes() {
432 let (detector, _) = test_detector();
433 let loops = detector.detect_open_loops().unwrap();
434 assert!(loops.is_empty());
435 }
436
437 #[test]
438 fn test_open_loop_unresolved_commitment() {
439 let (detector, pool) = test_detector();
440 insert_episode(
441 &pool,
442 "e1",
443 "I need to update the documentation for the API",
444 2,
445 );
446 let loops = detector.detect_open_loops().unwrap();
447 assert_eq!(loops.len(), 1);
448 assert!(loops[0].topic.contains("update"));
449 assert!(loops[0].topic.contains("documentation"));
450 }
451
452 #[test]
453 fn test_open_loop_resolved_commitment() {
454 let (detector, pool) = test_detector();
455 insert_episode(
456 &pool,
457 "e1",
458 "I need to update the documentation for the API",
459 3,
460 );
461 insert_episode(
462 &pool,
463 "e2",
464 "I finished the documentation update for the API",
465 0,
466 );
467 let loops = detector.detect_open_loops().unwrap();
468 assert!(loops.is_empty(), "should be resolved, but got: {:?}", loops);
469 }
470
471 #[test]
472 fn test_open_loop_too_recent_not_flagged() {
473 let (detector, pool) = test_detector();
474 let ts = (Utc::now() - chrono::TimeDelta::minutes(30)).to_rfc3339();
475 pool.with_conn(|conn| {
476 conn.execute(
477 "INSERT INTO episodes (id, session_id, role, content, timestamp)
478 VALUES ('e1', 'test-session', 'user', 'I need to review the pull request', ?1)",
479 [&ts],
480 )
481 .unwrap();
482 Ok(())
483 })
484 .unwrap();
485 let loops = detector.detect_open_loops().unwrap();
486 assert!(loops.is_empty(), "recent commitment should not be flagged");
487 }
488
489 #[test]
490 fn test_open_loop_generate_reminders() {
491 let (detector, pool) = test_detector();
492 insert_episode(
493 &pool,
494 "e1",
495 "I should refactor the authentication module",
496 5,
497 );
498 let reminders = detector.generate_reminders().unwrap();
499 assert_eq!(reminders.len(), 1);
500 assert!(reminders[0].content.contains("Open loop"));
501 assert!(reminders[0].content.contains("refactor"));
502 assert!(reminders[0].triggered_by.starts_with("open_loop:"));
503 }
504
505 #[test]
506 fn test_open_loop_max_reminders_cap() {
507 let (detector, pool) = test_detector();
508 for i in 0..8 {
509 insert_episode(
510 &pool,
511 &format!("e{i}"),
512 &format!("I need to handle task_{i:04} in the project"),
513 10 + i as i64,
514 );
515 }
516 let loops = detector.detect_open_loops().unwrap();
517 assert!(
518 loops.len() <= 5,
519 "should cap at max_reminders, got {}",
520 loops.len()
521 );
522 }
523
524 #[test]
525 fn test_open_loop_assistant_messages_ignored() {
526 let (detector, pool) = test_detector();
527 let ts = (Utc::now() - chrono::TimeDelta::hours(5)).to_rfc3339();
528 pool.with_conn(|conn| {
529 conn.execute(
530 "INSERT INTO episodes (id, session_id, role, content, timestamp)
531 VALUES ('e1', 'test-session', 'assistant', 'I will help you with that task', ?1)",
532 [&ts],
533 )
534 .unwrap();
535 Ok(())
536 })
537 .unwrap();
538 let loops = detector.detect_open_loops().unwrap();
539 assert!(loops.is_empty(), "assistant messages should be ignored");
540 }
541
542 #[test]
543 fn test_open_loop_with_agent_attribution() {
544 let (detector, pool) = test_detector();
545 let ts = (Utc::now() - chrono::TimeDelta::hours(5)).to_rfc3339();
546 pool.with_conn(|conn| {
547 conn.execute(
548 "INSERT INTO episodes (id, session_id, role, content, timestamp, agent)
549 VALUES ('e1', 'test-session', 'user', 'I need to deploy the staging server', ?1, 'devops-agent')",
550 [&ts],
551 )
552 .unwrap();
553 Ok(())
554 })
555 .unwrap();
556
557 let loops = detector.detect_open_loops().unwrap();
558 assert_eq!(loops.len(), 1);
559 assert_eq!(loops[0].agent.as_deref(), Some("devops-agent"));
560
561 let reminders = detector.generate_reminders().unwrap();
562 assert!(reminders[0].content.contains("devops-agent"));
563 }
564
565 #[test]
566 fn test_open_loop_non_ascii_no_panic() {
567 let (detector, pool) = test_detector();
568 let ts = (Utc::now() - chrono::TimeDelta::hours(5)).to_rfc3339();
569 pool.with_conn(|conn| {
570 conn.execute(
571 "INSERT INTO episodes (id, session_id, role, content, timestamp)
572 VALUES ('e1', 'test-session', 'user', 'I need to update the Straße config', ?1)",
573 [&ts],
574 )
575 .unwrap();
576 Ok(())
577 })
578 .unwrap();
579 let loops = detector.detect_open_loops().unwrap();
580 assert_eq!(loops.len(), 1);
581 }
582
583 fn sample_episodes() -> Vec<EpisodeRow> {
586 vec![
587 (
588 "e0".into(),
589 "I need to update the API docs".into(),
590 "2024-01-01T10:00:00+00:00".into(),
591 None,
592 ),
593 (
594 "e1".into(),
595 "The docs are done now".into(),
596 "2024-01-02T15:00:00+00:00".into(),
597 None,
598 ),
599 (
600 "e2".into(),
601 "I should fix the login bug".into(),
602 "2024-01-03T09:00:00+00:00".into(),
603 Some("dev-agent".into()),
604 ),
605 ]
606 }
607
608 #[test]
609 fn test_parse_commitment_clean_json() {
610 let episodes = sample_episodes();
611 let json = r#"[{"index":2,"topic":"fix login bug","resolved":false}]"#;
612 let loops = parse_commitment_response(json, &episodes, 5).unwrap();
613 assert_eq!(loops.len(), 1);
614 assert_eq!(loops[0].topic, "fix login bug");
615 assert_eq!(loops[0].agent.as_deref(), Some("dev-agent"));
616 }
617
618 #[test]
619 fn test_parse_commitment_embedded_json() {
620 let episodes = sample_episodes();
621 let raw = r#"Here are the results: [{"index":0,"topic":"update API docs","resolved":false}] done"#;
622 let loops = parse_commitment_response(raw, &episodes, 5).unwrap();
623 assert_eq!(loops.len(), 1);
624 assert_eq!(loops[0].topic, "update API docs");
625 }
626
627 #[test]
628 fn test_parse_commitment_resolved_filtered() {
629 let episodes = sample_episodes();
630 let json = r#"[{"index":0,"topic":"update docs","resolved":true},{"index":2,"topic":"fix bug","resolved":false}]"#;
631 let loops = parse_commitment_response(json, &episodes, 5).unwrap();
632 assert_eq!(loops.len(), 1);
633 assert_eq!(loops[0].topic, "fix bug");
634 }
635
636 #[test]
637 fn test_parse_commitment_empty_array() {
638 let episodes = sample_episodes();
639 let loops = parse_commitment_response("[]", &episodes, 5).unwrap();
640 assert!(loops.is_empty());
641 }
642
643 #[test]
644 fn test_parse_commitment_invalid() {
645 let episodes = sample_episodes();
646 assert!(parse_commitment_response("no json here", &episodes, 5).is_err());
647 }
648
649 #[test]
650 fn test_parse_commitment_out_of_bounds_index() {
651 let episodes = sample_episodes();
652 let json = r#"[{"index":99,"topic":"nonexistent","resolved":false}]"#;
653 let loops = parse_commitment_response(json, &episodes, 5).unwrap();
654 assert!(loops.is_empty());
655 }
656
657 #[tokio::test]
660 async fn test_detect_async_no_llm_equals_sync() {
661 let (detector, _) = test_detector();
662 let sync_result = detector.detect_open_loops().unwrap();
663 let async_result = detector.detect_open_loops_async().await.unwrap();
664 assert_eq!(sync_result.len(), async_result.len());
665 }
666
667 struct MockLlm {
670 response: String,
671 }
672
673 #[async_trait::async_trait]
674 impl cortex::LlmProvider for MockLlm {
675 async fn generate(
676 &self,
677 _messages: &[cortex::Message],
678 ) -> Result<cortex::Response, cortex::LlmError> {
679 Ok(cortex::Response::text(self.response.clone(), None))
680 }
681
682 async fn generate_stream(
683 &self,
684 _messages: &[cortex::Message],
685 ) -> Result<
686 std::pin::Pin<
687 Box<
688 dyn futures::Stream<Item = Result<cortex::ResponseChunk, cortex::LlmError>>
689 + Send,
690 >,
691 >,
692 cortex::LlmError,
693 > {
694 Err(cortex::LlmError::ProviderUnavailable("mock".to_string()))
695 }
696
697 async fn health_check(&self) -> bool {
698 true
699 }
700
701 fn name(&self) -> &str {
702 "mock"
703 }
704
705 fn model(&self) -> &str {
706 "mock"
707 }
708
709 async fn list_models(&self) -> Result<Vec<String>, cortex::LlmError> {
710 Ok(vec!["mock".to_string()])
711 }
712 }
713
714 #[tokio::test]
715 async fn test_detect_async_with_mock_llm() {
716 let pool = storage::SqlitePool::open_memory().unwrap();
717 pool.with_conn(|conn| {
718 conn.execute(
719 "INSERT INTO sessions (id, channel) VALUES ('test-session', 'test')",
720 [],
721 )
722 .unwrap();
723 Ok(())
724 })
725 .unwrap();
726
727 let ts = (Utc::now() - chrono::TimeDelta::hours(5)).to_rfc3339();
728 pool.with_conn(|conn| {
729 conn.execute(
730 "INSERT INTO episodes (id, session_id, role, content, timestamp)
731 VALUES ('e1', 'test-session', 'user', 'I need to deploy the staging fix', ?1)",
732 [&ts],
733 )
734 .unwrap();
735 Ok(())
736 })
737 .unwrap();
738
739 let mock = Arc::new(MockLlm {
740 response: r#"[{"index":0,"topic":"deploy staging fix","resolved":false}]"#.to_string(),
741 });
742
743 let detector = OpenLoopDetector::with_llm(
744 pool,
745 OpenLoopConfig {
746 scan_window_hours: 72,
747 resolution_window_hours: 1,
748 max_reminders: 5,
749 },
750 mock,
751 );
752
753 let loops = detector.detect_open_loops_async().await.unwrap();
754 assert_eq!(loops.len(), 1);
755 assert_eq!(loops[0].topic, "deploy staging fix");
756 }
757
758 #[tokio::test]
759 async fn test_detect_async_llm_bad_json_falls_back() {
760 let (_, pool) = test_detector();
761 insert_episode(
762 &pool,
763 "e1",
764 "I need to update the documentation for the API",
765 2,
766 );
767
768 let mock = Arc::new(MockLlm {
769 response: "Sorry, I can't understand the request".to_string(),
770 });
771
772 let detector = OpenLoopDetector::with_llm(
773 pool,
774 OpenLoopConfig {
775 scan_window_hours: 72,
776 resolution_window_hours: 1,
777 max_reminders: 5,
778 },
779 mock,
780 );
781
782 let loops = detector.detect_open_loops_async().await.unwrap();
783 assert_eq!(loops.len(), 1);
784 assert!(loops[0].topic.contains("update"));
785 }
786
787 struct SlowMockLlm;
788
789 #[async_trait::async_trait]
790 impl cortex::LlmProvider for SlowMockLlm {
791 async fn generate(
792 &self,
793 _messages: &[cortex::Message],
794 ) -> Result<cortex::Response, cortex::LlmError> {
795 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
796 Ok(cortex::Response::text("[]", None))
797 }
798
799 async fn generate_stream(
800 &self,
801 _messages: &[cortex::Message],
802 ) -> Result<
803 std::pin::Pin<
804 Box<
805 dyn futures::Stream<Item = Result<cortex::ResponseChunk, cortex::LlmError>>
806 + Send,
807 >,
808 >,
809 cortex::LlmError,
810 > {
811 Err(cortex::LlmError::ProviderUnavailable("mock".to_string()))
812 }
813
814 async fn health_check(&self) -> bool {
815 true
816 }
817 fn name(&self) -> &str {
818 "slow-mock"
819 }
820 fn model(&self) -> &str {
821 "slow-mock"
822 }
823 async fn list_models(&self) -> Result<Vec<String>, cortex::LlmError> {
824 Ok(vec!["slow-mock".to_string()])
825 }
826 }
827
828 #[tokio::test]
829 async fn test_detect_async_timeout_falls_back() {
830 let (_, pool) = test_detector();
831 insert_episode(
832 &pool,
833 "e1",
834 "I need to update the documentation for the API",
835 2,
836 );
837
838 let detector = OpenLoopDetector::with_llm(
839 pool,
840 OpenLoopConfig {
841 scan_window_hours: 72,
842 resolution_window_hours: 1,
843 max_reminders: 5,
844 },
845 Arc::new(SlowMockLlm),
846 );
847
848 let loops = detector.detect_open_loops_async().await.unwrap();
849 assert_eq!(loops.len(), 1);
850 assert!(loops[0].topic.contains("update"));
851 }
852
853 #[tokio::test]
854 async fn test_generate_reminders_async_formats_correctly() {
855 let (_, pool) = test_detector();
856 insert_episode(
857 &pool,
858 "e1",
859 "I should refactor the authentication module",
860 5,
861 );
862
863 let mock = Arc::new(MockLlm {
864 response: r#"[{"index":0,"topic":"refactor auth module","resolved":false}]"#
865 .to_string(),
866 });
867
868 let detector = OpenLoopDetector::with_llm(
869 pool,
870 OpenLoopConfig {
871 scan_window_hours: 72,
872 resolution_window_hours: 1,
873 max_reminders: 5,
874 },
875 mock,
876 );
877
878 let reminders = detector.generate_reminders_async().await.unwrap();
879 assert_eq!(reminders.len(), 1);
880 assert!(reminders[0].content.contains("Open loop"));
881 assert!(reminders[0].content.contains("refactor auth module"));
882 assert!(reminders[0].triggered_by.starts_with("open_loop:"));
883 }
884}