intent_engine/
events.rs

1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8    pool: &'a SqlitePool,
9    notifier: crate::notifications::NotificationSender,
10    cli_notifier: Option<crate::dashboard::cli_notifier::CliNotifier>,
11    project_path: Option<String>,
12}
13
14impl<'a> EventManager<'a> {
15    pub fn new(pool: &'a SqlitePool) -> Self {
16        Self {
17            pool,
18            notifier: crate::notifications::NotificationSender::new(None),
19            cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
20            project_path: None,
21        }
22    }
23
24    /// Create an EventManager with project path for CLI notifications
25    pub fn with_project_path(pool: &'a SqlitePool, project_path: String) -> Self {
26        Self {
27            pool,
28            notifier: crate::notifications::NotificationSender::new(None),
29            cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
30            project_path: Some(project_path),
31        }
32    }
33
34    /// Create an EventManager with WebSocket notification support
35    pub fn with_websocket(
36        pool: &'a SqlitePool,
37        ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
38        project_path: String,
39    ) -> Self {
40        Self {
41            pool,
42            notifier: crate::notifications::NotificationSender::new(Some(ws_state)),
43            cli_notifier: None, // Dashboard context doesn't need CLI notifier
44            project_path: Some(project_path),
45        }
46    }
47
48    /// Internal helper: Notify UI about event creation
49    async fn notify_event_created(&self, event: &Event) {
50        use crate::dashboard::websocket::DatabaseOperationPayload;
51
52        // WebSocket notification (Dashboard context)
53        if let Some(project_path) = &self.project_path {
54            let event_json = match serde_json::to_value(event) {
55                Ok(json) => json,
56                Err(e) => {
57                    tracing::warn!(error = %e, "Failed to serialize event for notification");
58                    return;
59                },
60            };
61
62            let payload =
63                DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
64            self.notifier.send(payload).await;
65        }
66
67        // CLI → Dashboard HTTP notification (CLI context)
68        if let Some(cli_notifier) = &self.cli_notifier {
69            cli_notifier
70                .notify_event_added(event.task_id, event.id, self.project_path.clone())
71                .await;
72        }
73    }
74
75    /// Internal helper: Notify UI about event update
76    async fn notify_event_updated(&self, event: &Event) {
77        use crate::dashboard::websocket::DatabaseOperationPayload;
78
79        let Some(project_path) = &self.project_path else {
80            return;
81        };
82
83        let event_json = match serde_json::to_value(event) {
84            Ok(json) => json,
85            Err(e) => {
86                tracing::warn!("Failed to serialize event for notification: {}", e);
87                return;
88            },
89        };
90
91        let payload =
92            DatabaseOperationPayload::event_updated(event.id, event_json, project_path.clone());
93        self.notifier.send(payload).await;
94    }
95
96    /// Internal helper: Notify UI about event deletion
97    async fn notify_event_deleted(&self, event_id: i64) {
98        use crate::dashboard::websocket::DatabaseOperationPayload;
99
100        let Some(project_path) = &self.project_path else {
101            return;
102        };
103
104        let payload = DatabaseOperationPayload::event_deleted(event_id, project_path.clone());
105        self.notifier.send(payload).await;
106    }
107
108    /// Add a new event
109    pub async fn add_event(
110        &self,
111        task_id: i64,
112        log_type: &str,
113        discussion_data: &str,
114    ) -> Result<Event> {
115        // Check if task exists
116        let task_exists: bool =
117            sqlx::query_scalar::<_, bool>(crate::sql_constants::CHECK_TASK_EXISTS)
118                .bind(task_id)
119                .fetch_one(self.pool)
120                .await?;
121
122        if !task_exists {
123            return Err(IntentError::TaskNotFound(task_id));
124        }
125
126        let now = Utc::now();
127
128        let result = sqlx::query(
129            r#"
130            INSERT INTO events (task_id, log_type, discussion_data, timestamp)
131            VALUES (?, ?, ?, ?)
132            "#,
133        )
134        .bind(task_id)
135        .bind(log_type)
136        .bind(discussion_data)
137        .bind(now)
138        .execute(self.pool)
139        .await?;
140
141        let id = result.last_insert_rowid();
142
143        let event = Event {
144            id,
145            task_id,
146            timestamp: now,
147            log_type: log_type.to_string(),
148            discussion_data: discussion_data.to_string(),
149        };
150
151        // Notify WebSocket clients about the new event
152        self.notify_event_created(&event).await;
153
154        Ok(event)
155    }
156
157    /// Update an existing event
158    pub async fn update_event(
159        &self,
160        event_id: i64,
161        log_type: Option<&str>,
162        discussion_data: Option<&str>,
163    ) -> Result<Event> {
164        // First, get the existing event to check if it exists
165        let existing_event: Option<Event> =
166            sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
167                .bind(event_id)
168                .fetch_optional(self.pool)
169                .await?;
170
171        let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
172            "Event {} not found",
173            event_id
174        )))?;
175
176        // Update only the fields that are provided
177        let new_log_type = log_type.unwrap_or(&existing_event.log_type);
178        let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
179
180        sqlx::query(
181            r#"
182            UPDATE events
183            SET log_type = ?, discussion_data = ?
184            WHERE id = ?
185            "#,
186        )
187        .bind(new_log_type)
188        .bind(new_discussion_data)
189        .bind(event_id)
190        .execute(self.pool)
191        .await?;
192
193        let updated_event = Event {
194            id: existing_event.id,
195            task_id: existing_event.task_id,
196            timestamp: existing_event.timestamp,
197            log_type: new_log_type.to_string(),
198            discussion_data: new_discussion_data.to_string(),
199        };
200
201        // Notify WebSocket clients about the update
202        self.notify_event_updated(&updated_event).await;
203
204        Ok(updated_event)
205    }
206
207    /// Delete an event
208    pub async fn delete_event(&self, event_id: i64) -> Result<()> {
209        // First, get the event to check if it exists and get task_id for notification
210        let event: Option<Event> = sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
211            .bind(event_id)
212            .fetch_optional(self.pool)
213            .await?;
214
215        let _event = event.ok_or(IntentError::InvalidInput(format!(
216            "Event {} not found",
217            event_id
218        )))?;
219
220        // Delete from FTS index first (if it exists)
221        let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
222            .bind(event_id)
223            .execute(self.pool)
224            .await;
225
226        // Delete the event
227        sqlx::query("DELETE FROM events WHERE id = ?")
228            .bind(event_id)
229            .execute(self.pool)
230            .await?;
231
232        // Notify WebSocket clients about the deletion
233        self.notify_event_deleted(event_id).await;
234
235        Ok(())
236    }
237
238    /// List events for a task (or globally if task_id is None)
239    pub async fn list_events(
240        &self,
241        task_id: Option<i64>,
242        limit: Option<i64>,
243        log_type: Option<String>,
244        since: Option<String>,
245    ) -> Result<Vec<Event>> {
246        // Check if task exists (only if task_id provided)
247        if let Some(tid) = task_id {
248            let task_exists: bool =
249                sqlx::query_scalar::<_, bool>(crate::sql_constants::CHECK_TASK_EXISTS)
250                    .bind(tid)
251                    .fetch_one(self.pool)
252                    .await?;
253
254            if !task_exists {
255                return Err(IntentError::TaskNotFound(tid));
256            }
257        }
258
259        let limit = limit.unwrap_or(50);
260
261        // Parse since duration if provided
262        let since_timestamp = if let Some(duration_str) = since {
263            Some(crate::time_utils::parse_duration(&duration_str)?)
264        } else {
265            None
266        };
267
268        // Build dynamic query based on filters
269        let mut query = String::from(crate::sql_constants::SELECT_EVENT_BASE);
270        let mut conditions = Vec::new();
271
272        if task_id.is_some() {
273            conditions.push("task_id = ?");
274        }
275
276        if log_type.is_some() {
277            conditions.push("log_type = ?");
278        }
279
280        if since_timestamp.is_some() {
281            conditions.push("timestamp >= ?");
282        }
283
284        if !conditions.is_empty() {
285            query.push_str(" AND ");
286            query.push_str(&conditions.join(" AND "));
287        }
288
289        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
290
291        // Build and execute query
292        let mut sql_query = sqlx::query_as::<_, Event>(&query);
293
294        if let Some(tid) = task_id {
295            sql_query = sql_query.bind(tid);
296        }
297
298        if let Some(ref typ) = log_type {
299            sql_query = sql_query.bind(typ);
300        }
301
302        if let Some(ts) = since_timestamp {
303            sql_query = sql_query.bind(ts);
304        }
305
306        sql_query = sql_query.bind(limit);
307
308        let events = sql_query.fetch_all(self.pool).await?;
309
310        Ok(events)
311    }
312
313    /// Search events using FTS5
314    pub async fn search_events_fts5(
315        &self,
316        query: &str,
317        limit: Option<i64>,
318    ) -> Result<Vec<EventSearchResult>> {
319        let limit = limit.unwrap_or(20);
320
321        // Use FTS5 to search events and get snippets
322        let results = sqlx::query(
323            r#"
324            SELECT
325                e.id,
326                e.task_id,
327                e.timestamp,
328                e.log_type,
329                e.discussion_data,
330                snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
331            FROM events_fts
332            INNER JOIN events e ON events_fts.rowid = e.id
333            WHERE events_fts MATCH ?
334            ORDER BY rank
335            LIMIT ?
336            "#,
337        )
338        .bind(query)
339        .bind(limit)
340        .fetch_all(self.pool)
341        .await?;
342
343        let mut search_results = Vec::new();
344        for row in results {
345            let event = Event {
346                id: row.get("id"),
347                task_id: row.get("task_id"),
348                timestamp: row.get("timestamp"),
349                log_type: row.get("log_type"),
350                discussion_data: row.get("discussion_data"),
351            };
352            let match_snippet: String = row.get("match_snippet");
353
354            search_results.push(EventSearchResult {
355                event,
356                match_snippet,
357            });
358        }
359
360        Ok(search_results)
361    }
362}
363
364/// Event search result with match snippet
365#[derive(Debug)]
366pub struct EventSearchResult {
367    pub event: Event,
368    pub match_snippet: String,
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::tasks::TaskManager;
375    use crate::test_utils::test_helpers::TestContext;
376
377    #[tokio::test]
378    async fn test_add_event() {
379        let ctx = TestContext::new().await;
380        let task_mgr = TaskManager::new(ctx.pool());
381        let event_mgr = EventManager::new(ctx.pool());
382
383        let task = task_mgr
384            .add_task("Test task", None, None, None)
385            .await
386            .unwrap();
387        let event = event_mgr
388            .add_event(task.id, "decision", "Test decision")
389            .await
390            .unwrap();
391
392        assert_eq!(event.task_id, task.id);
393        assert_eq!(event.log_type, "decision");
394        assert_eq!(event.discussion_data, "Test decision");
395    }
396
397    #[tokio::test]
398    async fn test_add_event_nonexistent_task() {
399        let ctx = TestContext::new().await;
400        let event_mgr = EventManager::new(ctx.pool());
401
402        let result = event_mgr.add_event(999, "decision", "Test").await;
403        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
404    }
405
406    #[tokio::test]
407    async fn test_list_events() {
408        let ctx = TestContext::new().await;
409        let task_mgr = TaskManager::new(ctx.pool());
410        let event_mgr = EventManager::new(ctx.pool());
411
412        let task = task_mgr
413            .add_task("Test task", None, None, None)
414            .await
415            .unwrap();
416
417        // Add multiple events
418        event_mgr
419            .add_event(task.id, "decision", "Decision 1")
420            .await
421            .unwrap();
422        event_mgr
423            .add_event(task.id, "blocker", "Blocker 1")
424            .await
425            .unwrap();
426        event_mgr
427            .add_event(task.id, "milestone", "Milestone 1")
428            .await
429            .unwrap();
430
431        let events = event_mgr
432            .list_events(Some(task.id), None, None, None)
433            .await
434            .unwrap();
435        assert_eq!(events.len(), 3);
436
437        // Events should be in reverse chronological order
438        assert_eq!(events[0].log_type, "milestone");
439        assert_eq!(events[1].log_type, "blocker");
440        assert_eq!(events[2].log_type, "decision");
441    }
442
443    #[tokio::test]
444    async fn test_list_events_with_limit() {
445        let ctx = TestContext::new().await;
446        let task_mgr = TaskManager::new(ctx.pool());
447        let event_mgr = EventManager::new(ctx.pool());
448
449        let task = task_mgr
450            .add_task("Test task", None, None, None)
451            .await
452            .unwrap();
453
454        // Add 5 events
455        for i in 0..5 {
456            event_mgr
457                .add_event(task.id, "test", &format!("Event {}", i))
458                .await
459                .unwrap();
460        }
461
462        let events = event_mgr
463            .list_events(Some(task.id), Some(3), None, None)
464            .await
465            .unwrap();
466        assert_eq!(events.len(), 3);
467    }
468
469    #[tokio::test]
470    async fn test_list_events_nonexistent_task() {
471        let ctx = TestContext::new().await;
472        let event_mgr = EventManager::new(ctx.pool());
473
474        let result = event_mgr.list_events(Some(999), None, None, None).await;
475        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
476    }
477
478    #[tokio::test]
479    async fn test_list_events_empty() {
480        let ctx = TestContext::new().await;
481        let task_mgr = TaskManager::new(ctx.pool());
482        let event_mgr = EventManager::new(ctx.pool());
483
484        let task = task_mgr
485            .add_task("Test task", None, None, None)
486            .await
487            .unwrap();
488
489        let events = event_mgr
490            .list_events(Some(task.id), None, None, None)
491            .await
492            .unwrap();
493        assert_eq!(events.len(), 0);
494    }
495
496    #[tokio::test]
497    async fn test_update_event() {
498        let ctx = TestContext::new().await;
499        let task_mgr = TaskManager::new(ctx.pool());
500        let event_mgr = EventManager::new(ctx.pool());
501
502        let task = task_mgr
503            .add_task("Test task", None, None, None)
504            .await
505            .unwrap();
506        let event = event_mgr
507            .add_event(task.id, "decision", "Initial decision")
508            .await
509            .unwrap();
510
511        // Update event type and data
512        let updated = event_mgr
513            .update_event(event.id, Some("milestone"), Some("Updated decision"))
514            .await
515            .unwrap();
516
517        assert_eq!(updated.id, event.id);
518        assert_eq!(updated.task_id, task.id);
519        assert_eq!(updated.log_type, "milestone");
520        assert_eq!(updated.discussion_data, "Updated decision");
521    }
522
523    #[tokio::test]
524    async fn test_update_event_partial() {
525        let ctx = TestContext::new().await;
526        let task_mgr = TaskManager::new(ctx.pool());
527        let event_mgr = EventManager::new(ctx.pool());
528
529        let task = task_mgr
530            .add_task("Test task", None, None, None)
531            .await
532            .unwrap();
533        let event = event_mgr
534            .add_event(task.id, "decision", "Initial decision")
535            .await
536            .unwrap();
537
538        // Update only discussion_data
539        let updated = event_mgr
540            .update_event(event.id, None, Some("Updated data only"))
541            .await
542            .unwrap();
543
544        assert_eq!(updated.log_type, "decision"); // Unchanged
545        assert_eq!(updated.discussion_data, "Updated data only");
546    }
547
548    #[tokio::test]
549    async fn test_update_event_nonexistent() {
550        let ctx = TestContext::new().await;
551        let event_mgr = EventManager::new(ctx.pool());
552
553        let result = event_mgr
554            .update_event(999, Some("decision"), Some("Test"))
555            .await;
556
557        assert!(result.is_err());
558        assert!(matches!(result, Err(IntentError::InvalidInput(_))));
559    }
560
561    #[tokio::test]
562    async fn test_delete_event() {
563        let ctx = TestContext::new().await;
564        let task_mgr = TaskManager::new(ctx.pool());
565        let event_mgr = EventManager::new(ctx.pool());
566
567        let task = task_mgr
568            .add_task("Test task", None, None, None)
569            .await
570            .unwrap();
571        let event = event_mgr
572            .add_event(task.id, "decision", "To be deleted")
573            .await
574            .unwrap();
575
576        // Delete the event
577        event_mgr.delete_event(event.id).await.unwrap();
578
579        // Verify it's deleted
580        let events = event_mgr
581            .list_events(Some(task.id), None, None, None)
582            .await
583            .unwrap();
584        assert_eq!(events.len(), 0);
585    }
586
587    #[tokio::test]
588    async fn test_delete_event_nonexistent() {
589        let ctx = TestContext::new().await;
590        let event_mgr = EventManager::new(ctx.pool());
591
592        let result = event_mgr.delete_event(999).await;
593        assert!(result.is_err());
594        assert!(matches!(result, Err(IntentError::InvalidInput(_))));
595    }
596
597    #[tokio::test]
598    async fn test_list_events_filter_by_type() {
599        let ctx = TestContext::new().await;
600        let task_mgr = TaskManager::new(ctx.pool());
601        let event_mgr = EventManager::new(ctx.pool());
602
603        let task = task_mgr
604            .add_task("Test task", None, None, None)
605            .await
606            .unwrap();
607
608        // Add events of different types
609        event_mgr
610            .add_event(task.id, "decision", "Decision 1")
611            .await
612            .unwrap();
613        event_mgr
614            .add_event(task.id, "blocker", "Blocker 1")
615            .await
616            .unwrap();
617        event_mgr
618            .add_event(task.id, "decision", "Decision 2")
619            .await
620            .unwrap();
621
622        // Filter by decision type
623        let events = event_mgr
624            .list_events(Some(task.id), None, Some("decision".to_string()), None)
625            .await
626            .unwrap();
627
628        assert_eq!(events.len(), 2);
629        assert!(events.iter().all(|e| e.log_type == "decision"));
630    }
631
632    #[tokio::test]
633    async fn test_list_events_global() {
634        let ctx = TestContext::new().await;
635        let task_mgr = TaskManager::new(ctx.pool());
636        let event_mgr = EventManager::new(ctx.pool());
637
638        let task1 = task_mgr.add_task("Task 1", None, None, None).await.unwrap();
639        let task2 = task_mgr.add_task("Task 2", None, None, None).await.unwrap();
640
641        // Add events to both tasks
642        event_mgr
643            .add_event(task1.id, "decision", "Task 1 Decision")
644            .await
645            .unwrap();
646        event_mgr
647            .add_event(task2.id, "decision", "Task 2 Decision")
648            .await
649            .unwrap();
650
651        // List all events globally (task_id = None)
652        let events = event_mgr.list_events(None, None, None, None).await.unwrap();
653
654        assert!(events.len() >= 2); // At least our 2 events
655        let task1_events: Vec<_> = events.iter().filter(|e| e.task_id == task1.id).collect();
656        let task2_events: Vec<_> = events.iter().filter(|e| e.task_id == task2.id).collect();
657
658        assert_eq!(task1_events.len(), 1);
659        assert_eq!(task2_events.len(), 1);
660    }
661}