Skip to main content

brainos_ganglia/
openloop.rs

1//! Open-loop detection — finds unresolved commitments in episodic memory.
2
3use std::sync::Arc;
4
5use chrono::{DateTime, Utc};
6use serde::Deserialize;
7use storage::SqlitePool;
8
9use crate::{GangliaError, ProactiveMessage};
10
11/// Configuration for open-loop (unresolved commitment) detection.
12#[derive(Debug, Clone)]
13pub struct OpenLoopConfig {
14    /// How many hours back to scan for commitments.
15    pub scan_window_hours: u32,
16    /// Hours after a commitment before it's flagged as unresolved.
17    pub resolution_window_hours: u32,
18    /// Maximum reminders to generate per check cycle.
19    pub max_reminders: usize,
20}
21
22impl Default for OpenLoopConfig {
23    fn default() -> Self {
24        Self {
25            scan_window_hours: 72,
26            resolution_window_hours: 24,
27            max_reminders: 3,
28        }
29    }
30}
31
32/// An unresolved commitment found in episodic memory.
33#[derive(Debug, Clone)]
34pub struct OpenLoop {
35    pub commitment: String,
36    pub topic: String,
37    pub committed_at: String,
38    pub agent: Option<String>,
39}
40
41/// Detects unresolved commitments ("open loops") in episodic memory.
42///
43/// Scans for commitment phrases like "I need to", "remind me to", etc.
44/// and checks whether a subsequent episode references the same topic.
45/// If no resolution is found within `resolution_window_hours`, a
46/// reminder is surfaced.
47///
48/// When constructed with `with_llm()`, `detect_open_loops_async()` uses
49/// LLM-driven analysis with automatic fallback to keyword heuristics.
50pub struct OpenLoopDetector {
51    db: SqlitePool,
52    config: OpenLoopConfig,
53    llm: Option<Arc<dyn cortex::LlmProvider>>,
54}
55
56/// Phrases that signal a user commitment or intention.
57const COMMITMENT_PHRASES: &[&str] = &[
58    "i'll",
59    "i will",
60    "i need to",
61    "i should",
62    "i must",
63    "remind me to",
64    "don't forget",
65    "need to remember",
66    "going to",
67    "plan to",
68    "want to",
69    "have to",
70    "todo",
71    "to-do",
72];
73
74/// Words that signal a commitment has been resolved.
75const RESOLUTION_MARKERS: &[&str] = &[
76    "done",
77    "finished",
78    "completed",
79    "did it",
80    "checked off",
81    "resolved",
82    "took care",
83    "handled",
84    "sorted",
85    "already",
86];
87
88/// A fetched episode row: (id, content, timestamp, agent).
89type EpisodeRow = (String, String, String, Option<String>);
90
91impl OpenLoopDetector {
92    pub fn new(db: SqlitePool, config: OpenLoopConfig) -> Self {
93        Self {
94            db,
95            config,
96            llm: None,
97        }
98    }
99
100    /// Create an OpenLoopDetector backed by the given LLM provider.
101    pub fn with_llm(
102        db: SqlitePool,
103        config: OpenLoopConfig,
104        llm: Arc<dyn cortex::LlmProvider>,
105    ) -> Self {
106        Self {
107            db,
108            config,
109            llm: Some(llm),
110        }
111    }
112
113    /// Fetch user episodes within the scan window.
114    fn fetch_episodes(&self) -> Result<Vec<EpisodeRow>, GangliaError> {
115        let scan_cutoff =
116            Utc::now() - chrono::TimeDelta::hours(self.config.scan_window_hours as i64);
117        let scan_str = scan_cutoff.to_rfc3339();
118
119        self.db
120            .with_conn(|conn| {
121                let mut stmt = conn.prepare(
122                    "SELECT id, content, timestamp, agent FROM episodes
123                 WHERE timestamp >= ?1 AND role = 'user'
124                 ORDER BY timestamp ASC",
125                )?;
126                let rows = stmt
127                    .query_map([&scan_str], |row| {
128                        Ok((
129                            row.get::<_, String>(0)?,
130                            row.get::<_, String>(1)?,
131                            row.get::<_, String>(2)?,
132                            row.get::<_, Option<String>>(3)?,
133                        ))
134                    })?
135                    .collect::<Result<Vec<_>, _>>()?;
136                Ok(rows)
137            })
138            .map_err(Into::into)
139    }
140
141    /// Scan episodic memory for unresolved commitments (keyword heuristic).
142    pub fn detect_open_loops(&self) -> Result<Vec<OpenLoop>, GangliaError> {
143        let now = Utc::now();
144        let resolution_cutoff =
145            now - chrono::TimeDelta::hours(self.config.resolution_window_hours as i64);
146
147        let episodes = self.fetch_episodes()?;
148
149        let mut open_loops = Vec::new();
150
151        for (id, content, timestamp, agent) in &episodes {
152            let lower = content.to_lowercase();
153
154            // Extract topic from commitment phrase.
155            // Search and slice from the same string (`lower`) to avoid byte offset
156            // mismatch when lowercasing changes string length (e.g. ß → ss).
157            let topic = COMMITMENT_PHRASES.iter().find_map(|phrase| {
158                lower.find(phrase).map(|pos| {
159                    let after = &lower[pos + phrase.len()..];
160                    let trimmed = after
161                        .trim()
162                        .trim_start_matches(|c: char| !c.is_alphanumeric());
163                    trimmed
164                        .split_whitespace()
165                        .take(8)
166                        .collect::<Vec<_>>()
167                        .join(" ")
168                })
169            });
170
171            let topic = match topic {
172                Some(t) if t.len() >= 3 => t,
173                _ => continue,
174            };
175
176            // Only flag commitments that are old enough
177            let committed_dt = DateTime::parse_from_rfc3339(timestamp)
178                .map(|d| d.with_timezone(&Utc))
179                .unwrap_or(now);
180            if committed_dt > resolution_cutoff {
181                continue;
182            }
183
184            // Extract meaningful keywords from the topic for matching
185            let topic_words: Vec<String> = topic
186                .split_whitespace()
187                .map(brain::normalize_keyword)
188                .filter(|w| w.len() >= 4)
189                .collect();
190            if topic_words.is_empty() {
191                continue;
192            }
193
194            // Check if any later episode resolves this commitment
195            let resolved = episodes.iter().any(|(eid, econtent, ets, _)| {
196                if eid == id {
197                    return false;
198                }
199                let edt = DateTime::parse_from_rfc3339(ets)
200                    .map(|d| d.with_timezone(&Utc))
201                    .unwrap_or(now);
202                if edt <= committed_dt {
203                    return false;
204                }
205                let elower = econtent.to_lowercase();
206
207                let has_topic_ref = topic_words
208                    .iter()
209                    .filter(|w| elower.contains(w.as_str()))
210                    .count()
211                    >= topic_words.len().clamp(1, 2);
212
213                let has_resolution_marker = RESOLUTION_MARKERS.iter().any(|m| elower.contains(m));
214
215                (has_topic_ref && has_resolution_marker)
216                    || topic_words
217                        .iter()
218                        .filter(|w| elower.contains(w.as_str()))
219                        .count()
220                        >= topic_words.len().max(2)
221            });
222
223            if !resolved {
224                open_loops.push(OpenLoop {
225                    commitment: content.clone(),
226                    topic: topic.clone(),
227                    committed_at: timestamp.clone(),
228                    agent: agent.clone(),
229                });
230            }
231        }
232
233        open_loops.truncate(self.config.max_reminders);
234        Ok(open_loops)
235    }
236
237    /// Detect open loops using LLM when available, keyword fallback otherwise.
238    pub async fn detect_open_loops_async(&self) -> Result<Vec<OpenLoop>, GangliaError> {
239        if self.llm.is_none() {
240            return self.detect_open_loops();
241        }
242
243        let episodes = self.fetch_episodes()?;
244        if episodes.is_empty() {
245            return Ok(Vec::new());
246        }
247
248        let llm = match self.llm.as_ref() {
249            Some(l) => l,
250            None => return self.detect_open_loops(),
251        };
252        let timeout = tokio::time::Duration::from_millis(2000);
253        match tokio::time::timeout(timeout, self.detect_with_llm(llm, &episodes)).await {
254            Ok(Ok(loops)) => Ok(loops),
255            Ok(Err(e)) => {
256                tracing::debug!("LLM open-loop detection failed: {e}, falling back to keywords");
257                self.detect_open_loops()
258            }
259            Err(_) => {
260                tracing::debug!("LLM open-loop detection timed out, falling back to keywords");
261                self.detect_open_loops()
262            }
263        }
264    }
265
266    /// Ask the LLM to identify unresolved commitments.
267    async fn detect_with_llm(
268        &self,
269        llm: &Arc<dyn cortex::LlmProvider>,
270        episodes: &[EpisodeRow],
271    ) -> Result<Vec<OpenLoop>, cortex::LlmError> {
272        let resolution_cutoff =
273            Utc::now() - chrono::TimeDelta::hours(self.config.resolution_window_hours as i64);
274        let cutoff_str = resolution_cutoff.to_rfc3339();
275
276        let mut message_lines = String::new();
277        for (i, (_id, content, ts, _agent)) in episodes.iter().enumerate() {
278            message_lines.push_str(&format!("[{i}] ({ts}) {content}\n"));
279        }
280
281        let prompt = format!(
282            "Analyze these user messages for unresolved commitments.\n\
283             A commitment is when the user says they will/need/should/plan to do something.\n\
284             A commitment is resolved if a later message indicates it was done/finished/completed.\n\
285             Only flag commitments older than: {cutoff_str}\n\
286             Return ONLY JSON array: [{{\"index\":N,\"topic\":\"brief description\",\"resolved\":false}}]\n\
287             Return [] if none found.\n\
288             Messages:\n{message_lines}"
289        );
290
291        let messages = vec![cortex::Message::user(prompt)];
292
293        let response = llm.generate(&messages).await?;
294        let parsed =
295            parse_commitment_response(&response.content, episodes, self.config.max_reminders)?;
296        Ok(parsed)
297    }
298
299    /// Generate proactive reminder messages for unresolved commitments (keyword heuristic).
300    pub fn generate_reminders(&self) -> Result<Vec<ProactiveMessage>, GangliaError> {
301        let loops = self.detect_open_loops()?;
302        Ok(format_reminders(loops))
303    }
304
305    /// Generate proactive reminder messages using LLM when available, keyword fallback otherwise.
306    pub async fn generate_reminders_async(&self) -> Result<Vec<ProactiveMessage>, GangliaError> {
307        let loops = self.detect_open_loops_async().await?;
308        Ok(format_reminders(loops))
309    }
310}
311
312/// Format open loops into proactive reminder messages.
313fn format_reminders(loops: Vec<OpenLoop>) -> Vec<ProactiveMessage> {
314    loops
315        .into_iter()
316        .map(|ol| {
317            let content = if let Some(ref agent) = ol.agent {
318                format!(
319                    "Open loop from {}: you mentioned \"{}\" — still pending. Want to follow up?",
320                    agent, ol.topic
321                )
322            } else {
323                format!(
324                    "Open loop: you mentioned \"{}\" — still pending. Want to follow up?",
325                    ol.topic
326                )
327            };
328            ProactiveMessage {
329                content,
330                triggered_by: format!("open_loop:{}", ol.topic),
331                created_at: Utc::now(),
332                agent: ol.agent,
333            }
334        })
335        .collect()
336}
337
338/// Parse LLM response into OpenLoop entries. Tries direct JSON, then finds `[...]`.
339fn parse_commitment_response(
340    raw: &str,
341    episodes: &[EpisodeRow],
342    max_reminders: usize,
343) -> Result<Vec<OpenLoop>, cortex::LlmError> {
344    #[derive(Deserialize)]
345    struct CommitmentEntry {
346        index: usize,
347        topic: String,
348        #[serde(default)]
349        resolved: bool,
350    }
351
352    let trimmed = raw.trim();
353
354    let entries: Vec<CommitmentEntry> = if let Ok(parsed) =
355        serde_json::from_str::<Vec<CommitmentEntry>>(trimmed)
356    {
357        parsed
358    } else if let Some(start) = trimmed.find('[') {
359        if let Some(end) = trimmed.rfind(']') {
360            serde_json::from_str::<Vec<CommitmentEntry>>(&trimmed[start..=end]).map_err(|e| {
361                cortex::LlmError::InvalidFormat(format!("Could not parse commitment JSON: {e}"))
362            })?
363        } else {
364            return Err(cortex::LlmError::InvalidFormat(
365                "No closing bracket in commitment response".to_string(),
366            ));
367        }
368    } else {
369        return Err(cortex::LlmError::InvalidFormat(
370            "No JSON array in commitment response".to_string(),
371        ));
372    };
373
374    let mut loops: Vec<OpenLoop> = entries
375        .into_iter()
376        .filter(|e| !e.resolved && e.index < episodes.len())
377        .map(|e| {
378            let (_, content, ts, agent) = &episodes[e.index];
379            OpenLoop {
380                commitment: content.clone(),
381                topic: e.topic,
382                committed_at: ts.clone(),
383                agent: agent.clone(),
384            }
385        })
386        .collect();
387
388    loops.truncate(max_reminders);
389    Ok(loops)
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395
396    fn test_detector() -> (OpenLoopDetector, SqlitePool) {
397        let pool = storage::SqlitePool::open_memory().unwrap();
398        pool.with_conn(|conn| {
399            conn.execute(
400                "INSERT INTO sessions (id, channel) VALUES ('test-session', 'test')",
401                [],
402            )
403            .unwrap();
404            Ok(())
405        })
406        .unwrap();
407        let config = OpenLoopConfig {
408            scan_window_hours: 72,
409            resolution_window_hours: 1,
410            max_reminders: 5,
411        };
412        let detector = OpenLoopDetector::new(pool.clone(), config);
413        (detector, pool)
414    }
415
416    fn insert_episode(pool: &SqlitePool, id: &str, content: &str, hours_ago: i64) {
417        let ts = (Utc::now() - chrono::TimeDelta::hours(hours_ago)).to_rfc3339();
418        pool.with_conn(|conn| {
419            conn.execute(
420                "INSERT INTO episodes (id, session_id, role, content, timestamp)
421                 VALUES (?1, 'test-session', 'user', ?2, ?3)",
422                rusqlite::params![id, content, ts],
423            )
424            .unwrap();
425            Ok(())
426        })
427        .unwrap();
428    }
429
430    #[test]
431    fn test_open_loop_no_episodes() {
432        let (detector, _) = test_detector();
433        let loops = detector.detect_open_loops().unwrap();
434        assert!(loops.is_empty());
435    }
436
437    #[test]
438    fn test_open_loop_unresolved_commitment() {
439        let (detector, pool) = test_detector();
440        insert_episode(
441            &pool,
442            "e1",
443            "I need to update the documentation for the API",
444            2,
445        );
446        let loops = detector.detect_open_loops().unwrap();
447        assert_eq!(loops.len(), 1);
448        assert!(loops[0].topic.contains("update"));
449        assert!(loops[0].topic.contains("documentation"));
450    }
451
452    #[test]
453    fn test_open_loop_resolved_commitment() {
454        let (detector, pool) = test_detector();
455        insert_episode(
456            &pool,
457            "e1",
458            "I need to update the documentation for the API",
459            3,
460        );
461        insert_episode(
462            &pool,
463            "e2",
464            "I finished the documentation update for the API",
465            0,
466        );
467        let loops = detector.detect_open_loops().unwrap();
468        assert!(loops.is_empty(), "should be resolved, but got: {:?}", loops);
469    }
470
471    #[test]
472    fn test_open_loop_too_recent_not_flagged() {
473        let (detector, pool) = test_detector();
474        let ts = (Utc::now() - chrono::TimeDelta::minutes(30)).to_rfc3339();
475        pool.with_conn(|conn| {
476            conn.execute(
477                "INSERT INTO episodes (id, session_id, role, content, timestamp)
478                 VALUES ('e1', 'test-session', 'user', 'I need to review the pull request', ?1)",
479                [&ts],
480            )
481            .unwrap();
482            Ok(())
483        })
484        .unwrap();
485        let loops = detector.detect_open_loops().unwrap();
486        assert!(loops.is_empty(), "recent commitment should not be flagged");
487    }
488
489    #[test]
490    fn test_open_loop_generate_reminders() {
491        let (detector, pool) = test_detector();
492        insert_episode(
493            &pool,
494            "e1",
495            "I should refactor the authentication module",
496            5,
497        );
498        let reminders = detector.generate_reminders().unwrap();
499        assert_eq!(reminders.len(), 1);
500        assert!(reminders[0].content.contains("Open loop"));
501        assert!(reminders[0].content.contains("refactor"));
502        assert!(reminders[0].triggered_by.starts_with("open_loop:"));
503    }
504
505    #[test]
506    fn test_open_loop_max_reminders_cap() {
507        let (detector, pool) = test_detector();
508        for i in 0..8 {
509            insert_episode(
510                &pool,
511                &format!("e{i}"),
512                &format!("I need to handle task_{i:04} in the project"),
513                10 + i as i64,
514            );
515        }
516        let loops = detector.detect_open_loops().unwrap();
517        assert!(
518            loops.len() <= 5,
519            "should cap at max_reminders, got {}",
520            loops.len()
521        );
522    }
523
524    #[test]
525    fn test_open_loop_assistant_messages_ignored() {
526        let (detector, pool) = test_detector();
527        let ts = (Utc::now() - chrono::TimeDelta::hours(5)).to_rfc3339();
528        pool.with_conn(|conn| {
529            conn.execute(
530                "INSERT INTO episodes (id, session_id, role, content, timestamp)
531                 VALUES ('e1', 'test-session', 'assistant', 'I will help you with that task', ?1)",
532                [&ts],
533            )
534            .unwrap();
535            Ok(())
536        })
537        .unwrap();
538        let loops = detector.detect_open_loops().unwrap();
539        assert!(loops.is_empty(), "assistant messages should be ignored");
540    }
541
542    #[test]
543    fn test_open_loop_with_agent_attribution() {
544        let (detector, pool) = test_detector();
545        let ts = (Utc::now() - chrono::TimeDelta::hours(5)).to_rfc3339();
546        pool.with_conn(|conn| {
547            conn.execute(
548                "INSERT INTO episodes (id, session_id, role, content, timestamp, agent)
549                 VALUES ('e1', 'test-session', 'user', 'I need to deploy the staging server', ?1, 'devops-agent')",
550                [&ts],
551            )
552            .unwrap();
553            Ok(())
554        })
555        .unwrap();
556
557        let loops = detector.detect_open_loops().unwrap();
558        assert_eq!(loops.len(), 1);
559        assert_eq!(loops[0].agent.as_deref(), Some("devops-agent"));
560
561        let reminders = detector.generate_reminders().unwrap();
562        assert!(reminders[0].content.contains("devops-agent"));
563    }
564
565    #[test]
566    fn test_open_loop_non_ascii_no_panic() {
567        let (detector, pool) = test_detector();
568        let ts = (Utc::now() - chrono::TimeDelta::hours(5)).to_rfc3339();
569        pool.with_conn(|conn| {
570            conn.execute(
571                "INSERT INTO episodes (id, session_id, role, content, timestamp)
572                 VALUES ('e1', 'test-session', 'user', 'I need to update the Straße config', ?1)",
573                [&ts],
574            )
575            .unwrap();
576            Ok(())
577        })
578        .unwrap();
579        let loops = detector.detect_open_loops().unwrap();
580        assert_eq!(loops.len(), 1);
581    }
582
583    // ── JSON parser tests ───────────────────────────────────────────────────
584
585    fn sample_episodes() -> Vec<EpisodeRow> {
586        vec![
587            (
588                "e0".into(),
589                "I need to update the API docs".into(),
590                "2024-01-01T10:00:00+00:00".into(),
591                None,
592            ),
593            (
594                "e1".into(),
595                "The docs are done now".into(),
596                "2024-01-02T15:00:00+00:00".into(),
597                None,
598            ),
599            (
600                "e2".into(),
601                "I should fix the login bug".into(),
602                "2024-01-03T09:00:00+00:00".into(),
603                Some("dev-agent".into()),
604            ),
605        ]
606    }
607
608    #[test]
609    fn test_parse_commitment_clean_json() {
610        let episodes = sample_episodes();
611        let json = r#"[{"index":2,"topic":"fix login bug","resolved":false}]"#;
612        let loops = parse_commitment_response(json, &episodes, 5).unwrap();
613        assert_eq!(loops.len(), 1);
614        assert_eq!(loops[0].topic, "fix login bug");
615        assert_eq!(loops[0].agent.as_deref(), Some("dev-agent"));
616    }
617
618    #[test]
619    fn test_parse_commitment_embedded_json() {
620        let episodes = sample_episodes();
621        let raw = r#"Here are the results: [{"index":0,"topic":"update API docs","resolved":false}] done"#;
622        let loops = parse_commitment_response(raw, &episodes, 5).unwrap();
623        assert_eq!(loops.len(), 1);
624        assert_eq!(loops[0].topic, "update API docs");
625    }
626
627    #[test]
628    fn test_parse_commitment_resolved_filtered() {
629        let episodes = sample_episodes();
630        let json = r#"[{"index":0,"topic":"update docs","resolved":true},{"index":2,"topic":"fix bug","resolved":false}]"#;
631        let loops = parse_commitment_response(json, &episodes, 5).unwrap();
632        assert_eq!(loops.len(), 1);
633        assert_eq!(loops[0].topic, "fix bug");
634    }
635
636    #[test]
637    fn test_parse_commitment_empty_array() {
638        let episodes = sample_episodes();
639        let loops = parse_commitment_response("[]", &episodes, 5).unwrap();
640        assert!(loops.is_empty());
641    }
642
643    #[test]
644    fn test_parse_commitment_invalid() {
645        let episodes = sample_episodes();
646        assert!(parse_commitment_response("no json here", &episodes, 5).is_err());
647    }
648
649    #[test]
650    fn test_parse_commitment_out_of_bounds_index() {
651        let episodes = sample_episodes();
652        let json = r#"[{"index":99,"topic":"nonexistent","resolved":false}]"#;
653        let loops = parse_commitment_response(json, &episodes, 5).unwrap();
654        assert!(loops.is_empty());
655    }
656
657    // ── Async tests ─────────────────────────────────────────────────────────
658
659    #[tokio::test]
660    async fn test_detect_async_no_llm_equals_sync() {
661        let (detector, _) = test_detector();
662        let sync_result = detector.detect_open_loops().unwrap();
663        let async_result = detector.detect_open_loops_async().await.unwrap();
664        assert_eq!(sync_result.len(), async_result.len());
665    }
666
667    // ── Mock LLM tests ─────────────────────────────────────────────────────
668
669    struct MockLlm {
670        response: String,
671    }
672
673    #[async_trait::async_trait]
674    impl cortex::LlmProvider for MockLlm {
675        async fn generate(
676            &self,
677            _messages: &[cortex::Message],
678        ) -> Result<cortex::Response, cortex::LlmError> {
679            Ok(cortex::Response::text(self.response.clone(), None))
680        }
681
682        async fn generate_stream(
683            &self,
684            _messages: &[cortex::Message],
685        ) -> Result<
686            std::pin::Pin<
687                Box<
688                    dyn futures::Stream<Item = Result<cortex::ResponseChunk, cortex::LlmError>>
689                        + Send,
690                >,
691            >,
692            cortex::LlmError,
693        > {
694            Err(cortex::LlmError::ProviderUnavailable("mock".to_string()))
695        }
696
697        async fn health_check(&self) -> bool {
698            true
699        }
700
701        fn name(&self) -> &str {
702            "mock"
703        }
704
705        fn model(&self) -> &str {
706            "mock"
707        }
708
709        async fn list_models(&self) -> Result<Vec<String>, cortex::LlmError> {
710            Ok(vec!["mock".to_string()])
711        }
712    }
713
714    #[tokio::test]
715    async fn test_detect_async_with_mock_llm() {
716        let pool = storage::SqlitePool::open_memory().unwrap();
717        pool.with_conn(|conn| {
718            conn.execute(
719                "INSERT INTO sessions (id, channel) VALUES ('test-session', 'test')",
720                [],
721            )
722            .unwrap();
723            Ok(())
724        })
725        .unwrap();
726
727        let ts = (Utc::now() - chrono::TimeDelta::hours(5)).to_rfc3339();
728        pool.with_conn(|conn| {
729            conn.execute(
730                "INSERT INTO episodes (id, session_id, role, content, timestamp)
731                 VALUES ('e1', 'test-session', 'user', 'I need to deploy the staging fix', ?1)",
732                [&ts],
733            )
734            .unwrap();
735            Ok(())
736        })
737        .unwrap();
738
739        let mock = Arc::new(MockLlm {
740            response: r#"[{"index":0,"topic":"deploy staging fix","resolved":false}]"#.to_string(),
741        });
742
743        let detector = OpenLoopDetector::with_llm(
744            pool,
745            OpenLoopConfig {
746                scan_window_hours: 72,
747                resolution_window_hours: 1,
748                max_reminders: 5,
749            },
750            mock,
751        );
752
753        let loops = detector.detect_open_loops_async().await.unwrap();
754        assert_eq!(loops.len(), 1);
755        assert_eq!(loops[0].topic, "deploy staging fix");
756    }
757
758    #[tokio::test]
759    async fn test_detect_async_llm_bad_json_falls_back() {
760        let (_, pool) = test_detector();
761        insert_episode(
762            &pool,
763            "e1",
764            "I need to update the documentation for the API",
765            2,
766        );
767
768        let mock = Arc::new(MockLlm {
769            response: "Sorry, I can't understand the request".to_string(),
770        });
771
772        let detector = OpenLoopDetector::with_llm(
773            pool,
774            OpenLoopConfig {
775                scan_window_hours: 72,
776                resolution_window_hours: 1,
777                max_reminders: 5,
778            },
779            mock,
780        );
781
782        let loops = detector.detect_open_loops_async().await.unwrap();
783        assert_eq!(loops.len(), 1);
784        assert!(loops[0].topic.contains("update"));
785    }
786
787    struct SlowMockLlm;
788
789    #[async_trait::async_trait]
790    impl cortex::LlmProvider for SlowMockLlm {
791        async fn generate(
792            &self,
793            _messages: &[cortex::Message],
794        ) -> Result<cortex::Response, cortex::LlmError> {
795            tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
796            Ok(cortex::Response::text("[]", None))
797        }
798
799        async fn generate_stream(
800            &self,
801            _messages: &[cortex::Message],
802        ) -> Result<
803            std::pin::Pin<
804                Box<
805                    dyn futures::Stream<Item = Result<cortex::ResponseChunk, cortex::LlmError>>
806                        + Send,
807                >,
808            >,
809            cortex::LlmError,
810        > {
811            Err(cortex::LlmError::ProviderUnavailable("mock".to_string()))
812        }
813
814        async fn health_check(&self) -> bool {
815            true
816        }
817        fn name(&self) -> &str {
818            "slow-mock"
819        }
820        fn model(&self) -> &str {
821            "slow-mock"
822        }
823        async fn list_models(&self) -> Result<Vec<String>, cortex::LlmError> {
824            Ok(vec!["slow-mock".to_string()])
825        }
826    }
827
828    #[tokio::test]
829    async fn test_detect_async_timeout_falls_back() {
830        let (_, pool) = test_detector();
831        insert_episode(
832            &pool,
833            "e1",
834            "I need to update the documentation for the API",
835            2,
836        );
837
838        let detector = OpenLoopDetector::with_llm(
839            pool,
840            OpenLoopConfig {
841                scan_window_hours: 72,
842                resolution_window_hours: 1,
843                max_reminders: 5,
844            },
845            Arc::new(SlowMockLlm),
846        );
847
848        let loops = detector.detect_open_loops_async().await.unwrap();
849        assert_eq!(loops.len(), 1);
850        assert!(loops[0].topic.contains("update"));
851    }
852
853    #[tokio::test]
854    async fn test_generate_reminders_async_formats_correctly() {
855        let (_, pool) = test_detector();
856        insert_episode(
857            &pool,
858            "e1",
859            "I should refactor the authentication module",
860            5,
861        );
862
863        let mock = Arc::new(MockLlm {
864            response: r#"[{"index":0,"topic":"refactor auth module","resolved":false}]"#
865                .to_string(),
866        });
867
868        let detector = OpenLoopDetector::with_llm(
869            pool,
870            OpenLoopConfig {
871                scan_window_hours: 72,
872                resolution_window_hours: 1,
873                max_reminders: 5,
874            },
875            mock,
876        );
877
878        let reminders = detector.generate_reminders_async().await.unwrap();
879        assert_eq!(reminders.len(), 1);
880        assert!(reminders[0].content.contains("Open loop"));
881        assert!(reminders[0].content.contains("refactor auth module"));
882        assert!(reminders[0].triggered_by.starts_with("open_loop:"));
883    }
884}