intent_engine/
events.rs

1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8    pool: &'a SqlitePool,
9    notifier: crate::notifications::NotificationSender,
10    cli_notifier: Option<crate::dashboard::cli_notifier::CliNotifier>,
11    project_path: Option<String>,
12}
13
14impl<'a> EventManager<'a> {
15    pub fn new(pool: &'a SqlitePool) -> Self {
16        Self {
17            pool,
18            notifier: crate::notifications::NotificationSender::new(None),
19            cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
20            project_path: None,
21        }
22    }
23
24    /// Create an EventManager with project path for CLI notifications
25    pub fn with_project_path(pool: &'a SqlitePool, project_path: String) -> Self {
26        Self {
27            pool,
28            notifier: crate::notifications::NotificationSender::new(None),
29            cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
30            project_path: Some(project_path),
31        }
32    }
33
34    /// Create an EventManager with WebSocket notification support
35    pub fn with_websocket(
36        pool: &'a SqlitePool,
37        ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
38        project_path: String,
39    ) -> Self {
40        Self {
41            pool,
42            notifier: crate::notifications::NotificationSender::new(Some(ws_state)),
43            cli_notifier: None, // Dashboard context doesn't need CLI notifier
44            project_path: Some(project_path),
45        }
46    }
47
48    /// Internal helper: Notify UI about event creation
49    async fn notify_event_created(&self, event: &Event) {
50        use crate::dashboard::websocket::DatabaseOperationPayload;
51
52        // WebSocket notification (Dashboard context)
53        if let Some(project_path) = &self.project_path {
54            let event_json = match serde_json::to_value(event) {
55                Ok(json) => json,
56                Err(e) => {
57                    tracing::warn!("Failed to serialize event for notification: {}", e);
58                    return;
59                },
60            };
61
62            let payload =
63                DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
64            self.notifier.send(payload).await;
65        }
66
67        // CLI → Dashboard HTTP notification (CLI context)
68        if let Some(cli_notifier) = &self.cli_notifier {
69            cli_notifier
70                .notify_event_added(event.task_id, event.id, self.project_path.clone())
71                .await;
72        }
73    }
74
75    /// Internal helper: Notify UI about event update
76    async fn notify_event_updated(&self, event: &Event) {
77        use crate::dashboard::websocket::DatabaseOperationPayload;
78
79        let Some(project_path) = &self.project_path else {
80            return;
81        };
82
83        let event_json = match serde_json::to_value(event) {
84            Ok(json) => json,
85            Err(e) => {
86                tracing::warn!("Failed to serialize event for notification: {}", e);
87                return;
88            },
89        };
90
91        let payload =
92            DatabaseOperationPayload::event_updated(event.id, event_json, project_path.clone());
93        self.notifier.send(payload).await;
94    }
95
96    /// Internal helper: Notify UI about event deletion
97    async fn notify_event_deleted(&self, event_id: i64) {
98        use crate::dashboard::websocket::DatabaseOperationPayload;
99
100        let Some(project_path) = &self.project_path else {
101            return;
102        };
103
104        let payload = DatabaseOperationPayload::event_deleted(event_id, project_path.clone());
105        self.notifier.send(payload).await;
106    }
107
108    /// Add a new event
109    pub async fn add_event(
110        &self,
111        task_id: i64,
112        log_type: &str,
113        discussion_data: &str,
114    ) -> Result<Event> {
115        // Check if task exists
116        let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
117            .bind(task_id)
118            .fetch_one(self.pool)
119            .await?;
120
121        if !task_exists {
122            return Err(IntentError::TaskNotFound(task_id));
123        }
124
125        let now = Utc::now();
126
127        let result = sqlx::query(
128            r#"
129            INSERT INTO events (task_id, log_type, discussion_data, timestamp)
130            VALUES (?, ?, ?, ?)
131            "#,
132        )
133        .bind(task_id)
134        .bind(log_type)
135        .bind(discussion_data)
136        .bind(now)
137        .execute(self.pool)
138        .await?;
139
140        let id = result.last_insert_rowid();
141
142        let event = Event {
143            id,
144            task_id,
145            timestamp: now,
146            log_type: log_type.to_string(),
147            discussion_data: discussion_data.to_string(),
148        };
149
150        // Notify WebSocket clients about the new event
151        self.notify_event_created(&event).await;
152
153        Ok(event)
154    }
155
156    /// Update an existing event
157    pub async fn update_event(
158        &self,
159        event_id: i64,
160        log_type: Option<&str>,
161        discussion_data: Option<&str>,
162    ) -> Result<Event> {
163        // First, get the existing event to check if it exists
164        let existing_event: Option<Event> =
165            sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
166                .bind(event_id)
167                .fetch_optional(self.pool)
168                .await?;
169
170        let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
171            "Event {} not found",
172            event_id
173        )))?;
174
175        // Update only the fields that are provided
176        let new_log_type = log_type.unwrap_or(&existing_event.log_type);
177        let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
178
179        sqlx::query(
180            r#"
181            UPDATE events
182            SET log_type = ?, discussion_data = ?
183            WHERE id = ?
184            "#,
185        )
186        .bind(new_log_type)
187        .bind(new_discussion_data)
188        .bind(event_id)
189        .execute(self.pool)
190        .await?;
191
192        let updated_event = Event {
193            id: existing_event.id,
194            task_id: existing_event.task_id,
195            timestamp: existing_event.timestamp,
196            log_type: new_log_type.to_string(),
197            discussion_data: new_discussion_data.to_string(),
198        };
199
200        // Notify WebSocket clients about the update
201        self.notify_event_updated(&updated_event).await;
202
203        Ok(updated_event)
204    }
205
206    /// Delete an event
207    pub async fn delete_event(&self, event_id: i64) -> Result<()> {
208        // First, get the event to check if it exists and get task_id for notification
209        let event: Option<Event> = sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
210            .bind(event_id)
211            .fetch_optional(self.pool)
212            .await?;
213
214        let _event = event.ok_or(IntentError::InvalidInput(format!(
215            "Event {} not found",
216            event_id
217        )))?;
218
219        // Delete from FTS index first (if it exists)
220        let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
221            .bind(event_id)
222            .execute(self.pool)
223            .await;
224
225        // Delete the event
226        sqlx::query("DELETE FROM events WHERE id = ?")
227            .bind(event_id)
228            .execute(self.pool)
229            .await?;
230
231        // Notify WebSocket clients about the deletion
232        self.notify_event_deleted(event_id).await;
233
234        Ok(())
235    }
236
237    /// List events for a task (or globally if task_id is None)
238    pub async fn list_events(
239        &self,
240        task_id: Option<i64>,
241        limit: Option<i64>,
242        log_type: Option<String>,
243        since: Option<String>,
244    ) -> Result<Vec<Event>> {
245        // Check if task exists (only if task_id provided)
246        if let Some(tid) = task_id {
247            let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
248                .bind(tid)
249                .fetch_one(self.pool)
250                .await?;
251
252            if !task_exists {
253                return Err(IntentError::TaskNotFound(tid));
254            }
255        }
256
257        let limit = limit.unwrap_or(50);
258
259        // Parse since duration if provided
260        let since_timestamp = if let Some(duration_str) = since {
261            Some(crate::time_utils::parse_duration(&duration_str)?)
262        } else {
263            None
264        };
265
266        // Build dynamic query based on filters
267        let mut query = String::from(crate::sql_constants::SELECT_EVENT_BASE);
268        let mut conditions = Vec::new();
269
270        if task_id.is_some() {
271            conditions.push("task_id = ?");
272        }
273
274        if log_type.is_some() {
275            conditions.push("log_type = ?");
276        }
277
278        if since_timestamp.is_some() {
279            conditions.push("timestamp >= ?");
280        }
281
282        if !conditions.is_empty() {
283            query.push_str(" AND ");
284            query.push_str(&conditions.join(" AND "));
285        }
286
287        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
288
289        // Build and execute query
290        let mut sql_query = sqlx::query_as::<_, Event>(&query);
291
292        if let Some(tid) = task_id {
293            sql_query = sql_query.bind(tid);
294        }
295
296        if let Some(ref typ) = log_type {
297            sql_query = sql_query.bind(typ);
298        }
299
300        if let Some(ts) = since_timestamp {
301            sql_query = sql_query.bind(ts);
302        }
303
304        sql_query = sql_query.bind(limit);
305
306        let events = sql_query.fetch_all(self.pool).await?;
307
308        Ok(events)
309    }
310
311    /// Search events using FTS5
312    pub async fn search_events_fts5(
313        &self,
314        query: &str,
315        limit: Option<i64>,
316    ) -> Result<Vec<EventSearchResult>> {
317        let limit = limit.unwrap_or(20);
318
319        // Use FTS5 to search events and get snippets
320        let results = sqlx::query(
321            r#"
322            SELECT
323                e.id,
324                e.task_id,
325                e.timestamp,
326                e.log_type,
327                e.discussion_data,
328                snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
329            FROM events_fts
330            INNER JOIN events e ON events_fts.rowid = e.id
331            WHERE events_fts MATCH ?
332            ORDER BY rank
333            LIMIT ?
334            "#,
335        )
336        .bind(query)
337        .bind(limit)
338        .fetch_all(self.pool)
339        .await?;
340
341        let mut search_results = Vec::new();
342        for row in results {
343            let event = Event {
344                id: row.get("id"),
345                task_id: row.get("task_id"),
346                timestamp: row.get("timestamp"),
347                log_type: row.get("log_type"),
348                discussion_data: row.get("discussion_data"),
349            };
350            let match_snippet: String = row.get("match_snippet");
351
352            search_results.push(EventSearchResult {
353                event,
354                match_snippet,
355            });
356        }
357
358        Ok(search_results)
359    }
360}
361
362/// Event search result with match snippet
363#[derive(Debug)]
364pub struct EventSearchResult {
365    pub event: Event,
366    pub match_snippet: String,
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use crate::tasks::TaskManager;
373    use crate::test_utils::test_helpers::TestContext;
374
375    #[tokio::test]
376    async fn test_add_event() {
377        let ctx = TestContext::new().await;
378        let task_mgr = TaskManager::new(ctx.pool());
379        let event_mgr = EventManager::new(ctx.pool());
380
381        let task = task_mgr
382            .add_task("Test task", None, None, None)
383            .await
384            .unwrap();
385        let event = event_mgr
386            .add_event(task.id, "decision", "Test decision")
387            .await
388            .unwrap();
389
390        assert_eq!(event.task_id, task.id);
391        assert_eq!(event.log_type, "decision");
392        assert_eq!(event.discussion_data, "Test decision");
393    }
394
395    #[tokio::test]
396    async fn test_add_event_nonexistent_task() {
397        let ctx = TestContext::new().await;
398        let event_mgr = EventManager::new(ctx.pool());
399
400        let result = event_mgr.add_event(999, "decision", "Test").await;
401        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
402    }
403
404    #[tokio::test]
405    async fn test_list_events() {
406        let ctx = TestContext::new().await;
407        let task_mgr = TaskManager::new(ctx.pool());
408        let event_mgr = EventManager::new(ctx.pool());
409
410        let task = task_mgr
411            .add_task("Test task", None, None, None)
412            .await
413            .unwrap();
414
415        // Add multiple events
416        event_mgr
417            .add_event(task.id, "decision", "Decision 1")
418            .await
419            .unwrap();
420        event_mgr
421            .add_event(task.id, "blocker", "Blocker 1")
422            .await
423            .unwrap();
424        event_mgr
425            .add_event(task.id, "milestone", "Milestone 1")
426            .await
427            .unwrap();
428
429        let events = event_mgr
430            .list_events(Some(task.id), None, None, None)
431            .await
432            .unwrap();
433        assert_eq!(events.len(), 3);
434
435        // Events should be in reverse chronological order
436        assert_eq!(events[0].log_type, "milestone");
437        assert_eq!(events[1].log_type, "blocker");
438        assert_eq!(events[2].log_type, "decision");
439    }
440
441    #[tokio::test]
442    async fn test_list_events_with_limit() {
443        let ctx = TestContext::new().await;
444        let task_mgr = TaskManager::new(ctx.pool());
445        let event_mgr = EventManager::new(ctx.pool());
446
447        let task = task_mgr
448            .add_task("Test task", None, None, None)
449            .await
450            .unwrap();
451
452        // Add 5 events
453        for i in 0..5 {
454            event_mgr
455                .add_event(task.id, "test", &format!("Event {}", i))
456                .await
457                .unwrap();
458        }
459
460        let events = event_mgr
461            .list_events(Some(task.id), Some(3), None, None)
462            .await
463            .unwrap();
464        assert_eq!(events.len(), 3);
465    }
466
467    #[tokio::test]
468    async fn test_list_events_nonexistent_task() {
469        let ctx = TestContext::new().await;
470        let event_mgr = EventManager::new(ctx.pool());
471
472        let result = event_mgr.list_events(Some(999), None, None, None).await;
473        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
474    }
475
476    #[tokio::test]
477    async fn test_list_events_empty() {
478        let ctx = TestContext::new().await;
479        let task_mgr = TaskManager::new(ctx.pool());
480        let event_mgr = EventManager::new(ctx.pool());
481
482        let task = task_mgr
483            .add_task("Test task", None, None, None)
484            .await
485            .unwrap();
486
487        let events = event_mgr
488            .list_events(Some(task.id), None, None, None)
489            .await
490            .unwrap();
491        assert_eq!(events.len(), 0);
492    }
493
494    #[tokio::test]
495    async fn test_update_event() {
496        let ctx = TestContext::new().await;
497        let task_mgr = TaskManager::new(ctx.pool());
498        let event_mgr = EventManager::new(ctx.pool());
499
500        let task = task_mgr
501            .add_task("Test task", None, None, None)
502            .await
503            .unwrap();
504        let event = event_mgr
505            .add_event(task.id, "decision", "Initial decision")
506            .await
507            .unwrap();
508
509        // Update event type and data
510        let updated = event_mgr
511            .update_event(event.id, Some("milestone"), Some("Updated decision"))
512            .await
513            .unwrap();
514
515        assert_eq!(updated.id, event.id);
516        assert_eq!(updated.task_id, task.id);
517        assert_eq!(updated.log_type, "milestone");
518        assert_eq!(updated.discussion_data, "Updated decision");
519    }
520
521    #[tokio::test]
522    async fn test_update_event_partial() {
523        let ctx = TestContext::new().await;
524        let task_mgr = TaskManager::new(ctx.pool());
525        let event_mgr = EventManager::new(ctx.pool());
526
527        let task = task_mgr
528            .add_task("Test task", None, None, None)
529            .await
530            .unwrap();
531        let event = event_mgr
532            .add_event(task.id, "decision", "Initial decision")
533            .await
534            .unwrap();
535
536        // Update only discussion_data
537        let updated = event_mgr
538            .update_event(event.id, None, Some("Updated data only"))
539            .await
540            .unwrap();
541
542        assert_eq!(updated.log_type, "decision"); // Unchanged
543        assert_eq!(updated.discussion_data, "Updated data only");
544    }
545
546    #[tokio::test]
547    async fn test_update_event_nonexistent() {
548        let ctx = TestContext::new().await;
549        let event_mgr = EventManager::new(ctx.pool());
550
551        let result = event_mgr
552            .update_event(999, Some("decision"), Some("Test"))
553            .await;
554
555        assert!(result.is_err());
556        assert!(matches!(result, Err(IntentError::InvalidInput(_))));
557    }
558
559    #[tokio::test]
560    async fn test_delete_event() {
561        let ctx = TestContext::new().await;
562        let task_mgr = TaskManager::new(ctx.pool());
563        let event_mgr = EventManager::new(ctx.pool());
564
565        let task = task_mgr
566            .add_task("Test task", None, None, None)
567            .await
568            .unwrap();
569        let event = event_mgr
570            .add_event(task.id, "decision", "To be deleted")
571            .await
572            .unwrap();
573
574        // Delete the event
575        event_mgr.delete_event(event.id).await.unwrap();
576
577        // Verify it's deleted
578        let events = event_mgr
579            .list_events(Some(task.id), None, None, None)
580            .await
581            .unwrap();
582        assert_eq!(events.len(), 0);
583    }
584
585    #[tokio::test]
586    async fn test_delete_event_nonexistent() {
587        let ctx = TestContext::new().await;
588        let event_mgr = EventManager::new(ctx.pool());
589
590        let result = event_mgr.delete_event(999).await;
591        assert!(result.is_err());
592        assert!(matches!(result, Err(IntentError::InvalidInput(_))));
593    }
594
595    #[tokio::test]
596    async fn test_list_events_filter_by_type() {
597        let ctx = TestContext::new().await;
598        let task_mgr = TaskManager::new(ctx.pool());
599        let event_mgr = EventManager::new(ctx.pool());
600
601        let task = task_mgr
602            .add_task("Test task", None, None, None)
603            .await
604            .unwrap();
605
606        // Add events of different types
607        event_mgr
608            .add_event(task.id, "decision", "Decision 1")
609            .await
610            .unwrap();
611        event_mgr
612            .add_event(task.id, "blocker", "Blocker 1")
613            .await
614            .unwrap();
615        event_mgr
616            .add_event(task.id, "decision", "Decision 2")
617            .await
618            .unwrap();
619
620        // Filter by decision type
621        let events = event_mgr
622            .list_events(Some(task.id), None, Some("decision".to_string()), None)
623            .await
624            .unwrap();
625
626        assert_eq!(events.len(), 2);
627        assert!(events.iter().all(|e| e.log_type == "decision"));
628    }
629
630    #[tokio::test]
631    async fn test_list_events_global() {
632        let ctx = TestContext::new().await;
633        let task_mgr = TaskManager::new(ctx.pool());
634        let event_mgr = EventManager::new(ctx.pool());
635
636        let task1 = task_mgr.add_task("Task 1", None, None, None).await.unwrap();
637        let task2 = task_mgr.add_task("Task 2", None, None, None).await.unwrap();
638
639        // Add events to both tasks
640        event_mgr
641            .add_event(task1.id, "decision", "Task 1 Decision")
642            .await
643            .unwrap();
644        event_mgr
645            .add_event(task2.id, "decision", "Task 2 Decision")
646            .await
647            .unwrap();
648
649        // List all events globally (task_id = None)
650        let events = event_mgr.list_events(None, None, None, None).await.unwrap();
651
652        assert!(events.len() >= 2); // At least our 2 events
653        let task1_events: Vec<_> = events.iter().filter(|e| e.task_id == task1.id).collect();
654        let task2_events: Vec<_> = events.iter().filter(|e| e.task_id == task2.id).collect();
655
656        assert_eq!(task1_events.len(), 1);
657        assert_eq!(task2_events.len(), 1);
658    }
659}