Skip to main content

intent_engine/
events.rs

1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8    pool: &'a SqlitePool,
9    notifier: crate::notifications::NotificationSender,
10    cli_notifier: Option<crate::dashboard::cli_notifier::CliNotifier>,
11    project_path: Option<String>,
12}
13
14impl<'a> EventManager<'a> {
15    pub fn new(pool: &'a SqlitePool) -> Self {
16        Self {
17            pool,
18            notifier: crate::notifications::NotificationSender::new(None),
19            cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
20            project_path: None,
21        }
22    }
23
24    /// Create an EventManager with project path for CLI notifications
25    pub fn with_project_path(pool: &'a SqlitePool, project_path: String) -> Self {
26        Self {
27            pool,
28            notifier: crate::notifications::NotificationSender::new(None),
29            cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
30            project_path: Some(project_path),
31        }
32    }
33
34    /// Create an EventManager with WebSocket notification support
35    pub fn with_websocket(
36        pool: &'a SqlitePool,
37        ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
38        project_path: String,
39    ) -> Self {
40        Self {
41            pool,
42            notifier: crate::notifications::NotificationSender::new(Some(ws_state)),
43            cli_notifier: None, // Dashboard context doesn't need CLI notifier
44            project_path: Some(project_path),
45        }
46    }
47
48    /// Internal helper: Notify UI about event creation
49    async fn notify_event_created(&self, event: &Event) {
50        use crate::dashboard::websocket::DatabaseOperationPayload;
51
52        // WebSocket notification (Dashboard context)
53        if let Some(project_path) = &self.project_path {
54            let event_json = match serde_json::to_value(event) {
55                Ok(json) => json,
56                Err(e) => {
57                    tracing::warn!(error = %e, "Failed to serialize event for notification");
58                    return;
59                },
60            };
61
62            let payload =
63                DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
64            self.notifier.send(payload).await;
65        }
66
67        // CLI → Dashboard HTTP notification (CLI context)
68        if let Some(cli_notifier) = &self.cli_notifier {
69            cli_notifier
70                .notify_event_added(event.task_id, event.id, self.project_path.clone())
71                .await;
72        }
73    }
74
75    /// Internal helper: Notify UI about event update
76    async fn notify_event_updated(&self, event: &Event) {
77        use crate::dashboard::websocket::DatabaseOperationPayload;
78
79        let Some(project_path) = &self.project_path else {
80            return;
81        };
82
83        let event_json = match serde_json::to_value(event) {
84            Ok(json) => json,
85            Err(e) => {
86                tracing::warn!("Failed to serialize event for notification: {}", e);
87                return;
88            },
89        };
90
91        let payload =
92            DatabaseOperationPayload::event_updated(event.id, event_json, project_path.clone());
93        self.notifier.send(payload).await;
94    }
95
96    /// Internal helper: Notify UI about event deletion
97    async fn notify_event_deleted(&self, event_id: i64) {
98        use crate::dashboard::websocket::DatabaseOperationPayload;
99
100        let Some(project_path) = &self.project_path else {
101            return;
102        };
103
104        let payload = DatabaseOperationPayload::event_deleted(event_id, project_path.clone());
105        self.notifier.send(payload).await;
106    }
107
108    /// Add a new event
109    pub async fn add_event(
110        &self,
111        task_id: i64,
112        log_type: String,
113        discussion_data: String,
114    ) -> Result<Event> {
115        // Check if task exists
116        let task_exists: bool =
117            sqlx::query_scalar::<_, bool>(crate::sql_constants::CHECK_TASK_EXISTS)
118                .bind(task_id)
119                .fetch_one(self.pool)
120                .await?;
121
122        if !task_exists {
123            return Err(IntentError::TaskNotFound(task_id));
124        }
125
126        let now = Utc::now();
127
128        let result = sqlx::query(
129            r#"
130            INSERT INTO events (task_id, log_type, discussion_data, timestamp)
131            VALUES (?, ?, ?, ?)
132            "#,
133        )
134        .bind(task_id)
135        .bind(&log_type)
136        .bind(&discussion_data)
137        .bind(now)
138        .execute(self.pool)
139        .await?;
140
141        let id = result.last_insert_rowid();
142
143        let event = Event {
144            id,
145            task_id,
146            timestamp: now,
147            log_type,
148            discussion_data,
149        };
150
151        // Notify WebSocket clients about the new event
152        self.notify_event_created(&event).await;
153
154        Ok(event)
155    }
156
157    /// Update an existing event
158    pub async fn update_event(
159        &self,
160        event_id: i64,
161        log_type: Option<&str>,
162        discussion_data: Option<&str>,
163    ) -> Result<Event> {
164        // First, get the existing event to check if it exists
165        let existing_event: Option<Event> =
166            sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
167                .bind(event_id)
168                .fetch_optional(self.pool)
169                .await?;
170
171        let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
172            "Event {} not found",
173            event_id
174        )))?;
175
176        // Update only the fields that are provided
177        let new_log_type = log_type.unwrap_or(&existing_event.log_type);
178        let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
179
180        sqlx::query(
181            r#"
182            UPDATE events
183            SET log_type = ?, discussion_data = ?
184            WHERE id = ?
185            "#,
186        )
187        .bind(new_log_type)
188        .bind(new_discussion_data)
189        .bind(event_id)
190        .execute(self.pool)
191        .await?;
192
193        let updated_event = Event {
194            id: existing_event.id,
195            task_id: existing_event.task_id,
196            timestamp: existing_event.timestamp,
197            log_type: new_log_type.to_string(),
198            discussion_data: new_discussion_data.to_string(),
199        };
200
201        // Notify WebSocket clients about the update
202        self.notify_event_updated(&updated_event).await;
203
204        Ok(updated_event)
205    }
206
207    /// Delete an event
208    pub async fn delete_event(&self, event_id: i64) -> Result<()> {
209        // First, get the event to check if it exists and get task_id for notification
210        let event: Option<Event> = sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
211            .bind(event_id)
212            .fetch_optional(self.pool)
213            .await?;
214
215        let _event = event.ok_or(IntentError::InvalidInput(format!(
216            "Event {} not found",
217            event_id
218        )))?;
219
220        // Delete from FTS index first (if it exists)
221        let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
222            .bind(event_id)
223            .execute(self.pool)
224            .await;
225
226        // Delete the event
227        sqlx::query("DELETE FROM events WHERE id = ?")
228            .bind(event_id)
229            .execute(self.pool)
230            .await?;
231
232        // Notify WebSocket clients about the deletion
233        self.notify_event_deleted(event_id).await;
234
235        Ok(())
236    }
237
238    /// List events for a task (or globally if task_id is None)
239    pub async fn list_events(
240        &self,
241        task_id: Option<i64>,
242        limit: Option<i64>,
243        log_type: Option<String>,
244        since: Option<String>,
245    ) -> Result<Vec<Event>> {
246        // Check if task exists (only if task_id provided)
247        if let Some(tid) = task_id {
248            let task_exists: bool =
249                sqlx::query_scalar::<_, bool>(crate::sql_constants::CHECK_TASK_EXISTS)
250                    .bind(tid)
251                    .fetch_one(self.pool)
252                    .await?;
253
254            if !task_exists {
255                return Err(IntentError::TaskNotFound(tid));
256            }
257        }
258
259        let limit = limit.unwrap_or(50);
260
261        // Parse since duration if provided
262        let since_timestamp = if let Some(duration_str) = since {
263            Some(crate::time_utils::parse_duration(&duration_str)?)
264        } else {
265            None
266        };
267
268        // Build dynamic query based on filters
269        let mut query = String::from(crate::sql_constants::SELECT_EVENT_BASE);
270        let mut conditions = Vec::new();
271
272        if task_id.is_some() {
273            conditions.push("task_id = ?");
274        }
275
276        if log_type.is_some() {
277            conditions.push("log_type = ?");
278        }
279
280        if since_timestamp.is_some() {
281            conditions.push("timestamp >= ?");
282        }
283
284        if !conditions.is_empty() {
285            query.push_str(" AND ");
286            query.push_str(&conditions.join(" AND "));
287        }
288
289        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
290
291        // Build and execute query
292        let mut sql_query = sqlx::query_as::<_, Event>(&query);
293
294        if let Some(tid) = task_id {
295            sql_query = sql_query.bind(tid);
296        }
297
298        if let Some(ref typ) = log_type {
299            sql_query = sql_query.bind(typ);
300        }
301
302        if let Some(ts) = since_timestamp {
303            sql_query = sql_query.bind(ts);
304        }
305
306        sql_query = sql_query.bind(limit);
307
308        let events = sql_query.fetch_all(self.pool).await?;
309
310        Ok(events)
311    }
312
313    /// Search events using FTS5
314    pub async fn search_events_fts5(
315        &self,
316        query: &str,
317        limit: Option<i64>,
318    ) -> Result<Vec<EventSearchResult>> {
319        let limit = limit.unwrap_or(20);
320
321        // Use FTS5 to search events and get snippets
322        let results = sqlx::query(
323            r#"
324            SELECT
325                e.id,
326                e.task_id,
327                e.timestamp,
328                e.log_type,
329                e.discussion_data,
330                snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
331            FROM events_fts
332            INNER JOIN events e ON events_fts.rowid = e.id
333            WHERE events_fts MATCH ?
334            ORDER BY rank
335            LIMIT ?
336            "#,
337        )
338        .bind(query)
339        .bind(limit)
340        .fetch_all(self.pool)
341        .await?;
342
343        let mut search_results = Vec::new();
344        for row in results {
345            let event = Event {
346                id: row.get("id"),
347                task_id: row.get("task_id"),
348                timestamp: row.get("timestamp"),
349                log_type: row.get("log_type"),
350                discussion_data: row.get("discussion_data"),
351            };
352            let match_snippet: String = row.get("match_snippet");
353
354            search_results.push(EventSearchResult {
355                event,
356                match_snippet,
357            });
358        }
359
360        Ok(search_results)
361    }
362}
363
364/// Event search result with match snippet
365#[derive(Debug)]
366pub struct EventSearchResult {
367    pub event: Event,
368    pub match_snippet: String,
369}
370
371impl crate::backend::EventBackend for EventManager<'_> {
372    fn add_event(
373        &self,
374        task_id: i64,
375        log_type: String,
376        discussion_data: String,
377    ) -> impl std::future::Future<Output = Result<Event>> + Send {
378        self.add_event(task_id, log_type, discussion_data)
379    }
380
381    fn list_events(
382        &self,
383        task_id: Option<i64>,
384        limit: Option<i64>,
385        log_type: Option<String>,
386        since: Option<String>,
387    ) -> impl std::future::Future<Output = Result<Vec<Event>>> + Send {
388        self.list_events(task_id, limit, log_type, since)
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use crate::tasks::TaskManager;
396    use crate::test_utils::test_helpers::TestContext;
397
398    #[tokio::test]
399    async fn test_add_event() {
400        let ctx = TestContext::new().await;
401        let task_mgr = TaskManager::new(ctx.pool());
402        let event_mgr = EventManager::new(ctx.pool());
403
404        let task = task_mgr
405            .add_task("Test task".to_string(), None, None, None, None, None)
406            .await
407            .unwrap();
408        let event = event_mgr
409            .add_event(task.id, "decision".to_string(), "Test decision".to_string())
410            .await
411            .unwrap();
412
413        assert_eq!(event.task_id, task.id);
414        assert_eq!(event.log_type, "decision");
415        assert_eq!(event.discussion_data, "Test decision");
416    }
417
418    #[tokio::test]
419    async fn test_add_event_nonexistent_task() {
420        let ctx = TestContext::new().await;
421        let event_mgr = EventManager::new(ctx.pool());
422
423        let result = event_mgr
424            .add_event(999, "decision".to_string(), "Test".to_string())
425            .await;
426        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
427    }
428
429    #[tokio::test]
430    async fn test_list_events() {
431        let ctx = TestContext::new().await;
432        let task_mgr = TaskManager::new(ctx.pool());
433        let event_mgr = EventManager::new(ctx.pool());
434
435        let task = task_mgr
436            .add_task("Test task".to_string(), None, None, None, None, None)
437            .await
438            .unwrap();
439
440        // Add multiple events
441        event_mgr
442            .add_event(task.id, "decision".to_string(), "Decision 1".to_string())
443            .await
444            .unwrap();
445        event_mgr
446            .add_event(task.id, "blocker".to_string(), "Blocker 1".to_string())
447            .await
448            .unwrap();
449        event_mgr
450            .add_event(task.id, "milestone".to_string(), "Milestone 1".to_string())
451            .await
452            .unwrap();
453
454        let events = event_mgr
455            .list_events(Some(task.id), None, None, None)
456            .await
457            .unwrap();
458        assert_eq!(events.len(), 3);
459
460        // Events should be in reverse chronological order
461        assert_eq!(events[0].log_type, "milestone");
462        assert_eq!(events[1].log_type, "blocker");
463        assert_eq!(events[2].log_type, "decision");
464    }
465
466    #[tokio::test]
467    async fn test_list_events_with_limit() {
468        let ctx = TestContext::new().await;
469        let task_mgr = TaskManager::new(ctx.pool());
470        let event_mgr = EventManager::new(ctx.pool());
471
472        let task = task_mgr
473            .add_task("Test task".to_string(), None, None, None, None, None)
474            .await
475            .unwrap();
476
477        // Add 5 events
478        for i in 0..5 {
479            event_mgr
480                .add_event(task.id, "test".to_string(), format!("Event {}", i))
481                .await
482                .unwrap();
483        }
484
485        let events = event_mgr
486            .list_events(Some(task.id), Some(3), None, None)
487            .await
488            .unwrap();
489        assert_eq!(events.len(), 3);
490    }
491
492    #[tokio::test]
493    async fn test_list_events_nonexistent_task() {
494        let ctx = TestContext::new().await;
495        let event_mgr = EventManager::new(ctx.pool());
496
497        let result = event_mgr.list_events(Some(999), None, None, None).await;
498        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
499    }
500
501    #[tokio::test]
502    async fn test_list_events_empty() {
503        let ctx = TestContext::new().await;
504        let task_mgr = TaskManager::new(ctx.pool());
505        let event_mgr = EventManager::new(ctx.pool());
506
507        let task = task_mgr
508            .add_task("Test task".to_string(), None, None, None, None, None)
509            .await
510            .unwrap();
511
512        let events = event_mgr
513            .list_events(Some(task.id), None, None, None)
514            .await
515            .unwrap();
516        assert_eq!(events.len(), 0);
517    }
518
519    #[tokio::test]
520    async fn test_update_event() {
521        let ctx = TestContext::new().await;
522        let task_mgr = TaskManager::new(ctx.pool());
523        let event_mgr = EventManager::new(ctx.pool());
524
525        let task = task_mgr
526            .add_task("Test task".to_string(), None, None, None, None, None)
527            .await
528            .unwrap();
529        let event = event_mgr
530            .add_event(
531                task.id,
532                "decision".to_string(),
533                "Initial decision".to_string(),
534            )
535            .await
536            .unwrap();
537
538        // Update event type and data
539        let updated = event_mgr
540            .update_event(event.id, Some("milestone"), Some("Updated decision"))
541            .await
542            .unwrap();
543
544        assert_eq!(updated.id, event.id);
545        assert_eq!(updated.task_id, task.id);
546        assert_eq!(updated.log_type, "milestone");
547        assert_eq!(updated.discussion_data, "Updated decision");
548    }
549
550    #[tokio::test]
551    async fn test_update_event_partial() {
552        let ctx = TestContext::new().await;
553        let task_mgr = TaskManager::new(ctx.pool());
554        let event_mgr = EventManager::new(ctx.pool());
555
556        let task = task_mgr
557            .add_task("Test task".to_string(), None, None, None, None, None)
558            .await
559            .unwrap();
560        let event = event_mgr
561            .add_event(
562                task.id,
563                "decision".to_string(),
564                "Initial decision".to_string(),
565            )
566            .await
567            .unwrap();
568
569        // Update only discussion_data
570        let updated = event_mgr
571            .update_event(event.id, None, Some("Updated data only"))
572            .await
573            .unwrap();
574
575        assert_eq!(updated.log_type, "decision"); // Unchanged
576        assert_eq!(updated.discussion_data, "Updated data only");
577    }
578
579    #[tokio::test]
580    async fn test_update_event_nonexistent() {
581        let ctx = TestContext::new().await;
582        let event_mgr = EventManager::new(ctx.pool());
583
584        let result = event_mgr
585            .update_event(999, Some("decision"), Some("Test"))
586            .await;
587
588        assert!(result.is_err());
589        assert!(matches!(result, Err(IntentError::InvalidInput(_))));
590    }
591
592    #[tokio::test]
593    async fn test_delete_event() {
594        let ctx = TestContext::new().await;
595        let task_mgr = TaskManager::new(ctx.pool());
596        let event_mgr = EventManager::new(ctx.pool());
597
598        let task = task_mgr
599            .add_task("Test task".to_string(), None, None, None, None, None)
600            .await
601            .unwrap();
602        let event = event_mgr
603            .add_event(task.id, "decision".to_string(), "To be deleted".to_string())
604            .await
605            .unwrap();
606
607        // Delete the event
608        event_mgr.delete_event(event.id).await.unwrap();
609
610        // Verify it's deleted
611        let events = event_mgr
612            .list_events(Some(task.id), None, None, None)
613            .await
614            .unwrap();
615        assert_eq!(events.len(), 0);
616    }
617
618    #[tokio::test]
619    async fn test_delete_event_nonexistent() {
620        let ctx = TestContext::new().await;
621        let event_mgr = EventManager::new(ctx.pool());
622
623        let result = event_mgr.delete_event(999).await;
624        assert!(result.is_err());
625        assert!(matches!(result, Err(IntentError::InvalidInput(_))));
626    }
627
628    #[tokio::test]
629    async fn test_list_events_filter_by_type() {
630        let ctx = TestContext::new().await;
631        let task_mgr = TaskManager::new(ctx.pool());
632        let event_mgr = EventManager::new(ctx.pool());
633
634        let task = task_mgr
635            .add_task("Test task".to_string(), None, None, None, None, None)
636            .await
637            .unwrap();
638
639        // Add events of different types
640        event_mgr
641            .add_event(task.id, "decision".to_string(), "Decision 1".to_string())
642            .await
643            .unwrap();
644        event_mgr
645            .add_event(task.id, "blocker".to_string(), "Blocker 1".to_string())
646            .await
647            .unwrap();
648        event_mgr
649            .add_event(task.id, "decision".to_string(), "Decision 2".to_string())
650            .await
651            .unwrap();
652
653        // Filter by decision type
654        let events = event_mgr
655            .list_events(Some(task.id), None, Some("decision".to_string()), None)
656            .await
657            .unwrap();
658
659        assert_eq!(events.len(), 2);
660        assert!(events.iter().all(|e| e.log_type == "decision"));
661    }
662
663    #[tokio::test]
664    async fn test_list_events_global() {
665        let ctx = TestContext::new().await;
666        let task_mgr = TaskManager::new(ctx.pool());
667        let event_mgr = EventManager::new(ctx.pool());
668
669        let task1 = task_mgr
670            .add_task("Task 1".to_string(), None, None, None, None, None)
671            .await
672            .unwrap();
673        let task2 = task_mgr
674            .add_task("Task 2".to_string(), None, None, None, None, None)
675            .await
676            .unwrap();
677
678        // Add events to both tasks
679        event_mgr
680            .add_event(
681                task1.id,
682                "decision".to_string(),
683                "Task 1 Decision".to_string(),
684            )
685            .await
686            .unwrap();
687        event_mgr
688            .add_event(
689                task2.id,
690                "decision".to_string(),
691                "Task 2 Decision".to_string(),
692            )
693            .await
694            .unwrap();
695
696        // List all events globally (task_id = None)
697        let events = event_mgr.list_events(None, None, None, None).await.unwrap();
698
699        assert!(events.len() >= 2); // At least our 2 events
700        let task1_events: Vec<_> = events.iter().filter(|e| e.task_id == task1.id).collect();
701        let task2_events: Vec<_> = events.iter().filter(|e| e.task_id == task2.id).collect();
702
703        assert_eq!(task1_events.len(), 1);
704        assert_eq!(task2_events.len(), 1);
705    }
706}