intent_engine/
events.rs

1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8    pool: &'a SqlitePool,
9    notifier: crate::notifications::NotificationSender,
10    project_path: Option<String>,
11}
12
13impl<'a> EventManager<'a> {
14    pub fn new(pool: &'a SqlitePool) -> Self {
15        Self {
16            pool,
17            notifier: crate::notifications::NotificationSender::new(None, None),
18            project_path: None,
19        }
20    }
21
22    /// Create an EventManager with MCP notification support
23    pub fn with_mcp_notifier(
24        pool: &'a SqlitePool,
25        project_path: String,
26        mcp_notifier: tokio::sync::mpsc::UnboundedSender<String>,
27    ) -> Self {
28        Self {
29            pool,
30            notifier: crate::notifications::NotificationSender::new(None, Some(mcp_notifier)),
31            project_path: Some(project_path),
32        }
33    }
34
35    /// Create an EventManager with WebSocket notification support
36    pub fn with_websocket(
37        pool: &'a SqlitePool,
38        ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
39        project_path: String,
40    ) -> Self {
41        Self {
42            pool,
43            notifier: crate::notifications::NotificationSender::new(Some(ws_state), None),
44            project_path: Some(project_path),
45        }
46    }
47
48    /// Internal helper: Notify UI about event creation
49    async fn notify_event_created(&self, event: &Event) {
50        use crate::dashboard::websocket::DatabaseOperationPayload;
51
52        let Some(project_path) = &self.project_path else {
53            return;
54        };
55
56        let event_json = match serde_json::to_value(event) {
57            Ok(json) => json,
58            Err(e) => {
59                tracing::warn!("Failed to serialize event for notification: {}", e);
60                return;
61            },
62        };
63
64        let payload =
65            DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
66        self.notifier.send(payload).await;
67    }
68
69    /// Internal helper: Notify UI about event update
70    async fn notify_event_updated(&self, event: &Event) {
71        use crate::dashboard::websocket::DatabaseOperationPayload;
72
73        let Some(project_path) = &self.project_path else {
74            return;
75        };
76
77        let event_json = match serde_json::to_value(event) {
78            Ok(json) => json,
79            Err(e) => {
80                tracing::warn!("Failed to serialize event for notification: {}", e);
81                return;
82            },
83        };
84
85        let payload =
86            DatabaseOperationPayload::event_updated(event.id, event_json, project_path.clone());
87        self.notifier.send(payload).await;
88    }
89
90    /// Internal helper: Notify UI about event deletion
91    async fn notify_event_deleted(&self, event_id: i64) {
92        use crate::dashboard::websocket::DatabaseOperationPayload;
93
94        let Some(project_path) = &self.project_path else {
95            return;
96        };
97
98        let payload = DatabaseOperationPayload::event_deleted(event_id, project_path.clone());
99        self.notifier.send(payload).await;
100    }
101
102    /// Add a new event
103    pub async fn add_event(
104        &self,
105        task_id: i64,
106        log_type: &str,
107        discussion_data: &str,
108    ) -> Result<Event> {
109        // Check if task exists
110        let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
111            .bind(task_id)
112            .fetch_one(self.pool)
113            .await?;
114
115        if !task_exists {
116            return Err(IntentError::TaskNotFound(task_id));
117        }
118
119        let now = Utc::now();
120
121        let result = sqlx::query(
122            r#"
123            INSERT INTO events (task_id, log_type, discussion_data, timestamp)
124            VALUES (?, ?, ?, ?)
125            "#,
126        )
127        .bind(task_id)
128        .bind(log_type)
129        .bind(discussion_data)
130        .bind(now)
131        .execute(self.pool)
132        .await?;
133
134        let id = result.last_insert_rowid();
135
136        let event = Event {
137            id,
138            task_id,
139            timestamp: now,
140            log_type: log_type.to_string(),
141            discussion_data: discussion_data.to_string(),
142        };
143
144        // Notify WebSocket clients about the new event
145        self.notify_event_created(&event).await;
146
147        Ok(event)
148    }
149
150    /// Update an existing event
151    pub async fn update_event(
152        &self,
153        event_id: i64,
154        log_type: Option<&str>,
155        discussion_data: Option<&str>,
156    ) -> Result<Event> {
157        // First, get the existing event to check if it exists
158        let existing_event: Option<Event> =
159            sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
160                .bind(event_id)
161                .fetch_optional(self.pool)
162                .await?;
163
164        let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
165            "Event {} not found",
166            event_id
167        )))?;
168
169        // Update only the fields that are provided
170        let new_log_type = log_type.unwrap_or(&existing_event.log_type);
171        let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
172
173        sqlx::query(
174            r#"
175            UPDATE events
176            SET log_type = ?, discussion_data = ?
177            WHERE id = ?
178            "#,
179        )
180        .bind(new_log_type)
181        .bind(new_discussion_data)
182        .bind(event_id)
183        .execute(self.pool)
184        .await?;
185
186        let updated_event = Event {
187            id: existing_event.id,
188            task_id: existing_event.task_id,
189            timestamp: existing_event.timestamp,
190            log_type: new_log_type.to_string(),
191            discussion_data: new_discussion_data.to_string(),
192        };
193
194        // Notify WebSocket clients about the update
195        self.notify_event_updated(&updated_event).await;
196
197        Ok(updated_event)
198    }
199
200    /// Delete an event
201    pub async fn delete_event(&self, event_id: i64) -> Result<()> {
202        // First, get the event to check if it exists and get task_id for notification
203        let event: Option<Event> = sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
204            .bind(event_id)
205            .fetch_optional(self.pool)
206            .await?;
207
208        let _event = event.ok_or(IntentError::InvalidInput(format!(
209            "Event {} not found",
210            event_id
211        )))?;
212
213        // Delete from FTS index first (if it exists)
214        let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
215            .bind(event_id)
216            .execute(self.pool)
217            .await;
218
219        // Delete the event
220        sqlx::query("DELETE FROM events WHERE id = ?")
221            .bind(event_id)
222            .execute(self.pool)
223            .await?;
224
225        // Notify WebSocket clients about the deletion
226        self.notify_event_deleted(event_id).await;
227
228        Ok(())
229    }
230
231    /// List events for a task (or globally if task_id is None)
232    pub async fn list_events(
233        &self,
234        task_id: Option<i64>,
235        limit: Option<i64>,
236        log_type: Option<String>,
237        since: Option<String>,
238    ) -> Result<Vec<Event>> {
239        // Check if task exists (only if task_id provided)
240        if let Some(tid) = task_id {
241            let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
242                .bind(tid)
243                .fetch_one(self.pool)
244                .await?;
245
246            if !task_exists {
247                return Err(IntentError::TaskNotFound(tid));
248            }
249        }
250
251        let limit = limit.unwrap_or(50);
252
253        // Parse since duration if provided
254        let since_timestamp = if let Some(duration_str) = since {
255            Some(crate::time_utils::parse_duration(&duration_str)?)
256        } else {
257            None
258        };
259
260        // Build dynamic query based on filters
261        let mut query = String::from(crate::sql_constants::SELECT_EVENT_BASE);
262        let mut conditions = Vec::new();
263
264        if task_id.is_some() {
265            conditions.push("task_id = ?");
266        }
267
268        if log_type.is_some() {
269            conditions.push("log_type = ?");
270        }
271
272        if since_timestamp.is_some() {
273            conditions.push("timestamp >= ?");
274        }
275
276        if !conditions.is_empty() {
277            query.push_str(" AND ");
278            query.push_str(&conditions.join(" AND "));
279        }
280
281        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
282
283        // Build and execute query
284        let mut sql_query = sqlx::query_as::<_, Event>(&query);
285
286        if let Some(tid) = task_id {
287            sql_query = sql_query.bind(tid);
288        }
289
290        if let Some(ref typ) = log_type {
291            sql_query = sql_query.bind(typ);
292        }
293
294        if let Some(ts) = since_timestamp {
295            sql_query = sql_query.bind(ts);
296        }
297
298        sql_query = sql_query.bind(limit);
299
300        let events = sql_query.fetch_all(self.pool).await?;
301
302        Ok(events)
303    }
304
305    /// Search events using FTS5
306    pub async fn search_events_fts5(
307        &self,
308        query: &str,
309        limit: Option<i64>,
310    ) -> Result<Vec<EventSearchResult>> {
311        let limit = limit.unwrap_or(20);
312
313        // Use FTS5 to search events and get snippets
314        let results = sqlx::query(
315            r#"
316            SELECT
317                e.id,
318                e.task_id,
319                e.timestamp,
320                e.log_type,
321                e.discussion_data,
322                snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
323            FROM events_fts
324            INNER JOIN events e ON events_fts.rowid = e.id
325            WHERE events_fts MATCH ?
326            ORDER BY rank
327            LIMIT ?
328            "#,
329        )
330        .bind(query)
331        .bind(limit)
332        .fetch_all(self.pool)
333        .await?;
334
335        let mut search_results = Vec::new();
336        for row in results {
337            let event = Event {
338                id: row.get("id"),
339                task_id: row.get("task_id"),
340                timestamp: row.get("timestamp"),
341                log_type: row.get("log_type"),
342                discussion_data: row.get("discussion_data"),
343            };
344            let match_snippet: String = row.get("match_snippet");
345
346            search_results.push(EventSearchResult {
347                event,
348                match_snippet,
349            });
350        }
351
352        Ok(search_results)
353    }
354}
355
356/// Event search result with match snippet
357#[derive(Debug)]
358pub struct EventSearchResult {
359    pub event: Event,
360    pub match_snippet: String,
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366    use crate::tasks::TaskManager;
367    use crate::test_utils::test_helpers::TestContext;
368
369    #[tokio::test]
370    async fn test_add_event() {
371        let ctx = TestContext::new().await;
372        let task_mgr = TaskManager::new(ctx.pool());
373        let event_mgr = EventManager::new(ctx.pool());
374
375        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
376        let event = event_mgr
377            .add_event(task.id, "decision", "Test decision")
378            .await
379            .unwrap();
380
381        assert_eq!(event.task_id, task.id);
382        assert_eq!(event.log_type, "decision");
383        assert_eq!(event.discussion_data, "Test decision");
384    }
385
386    #[tokio::test]
387    async fn test_add_event_nonexistent_task() {
388        let ctx = TestContext::new().await;
389        let event_mgr = EventManager::new(ctx.pool());
390
391        let result = event_mgr.add_event(999, "decision", "Test").await;
392        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
393    }
394
395    #[tokio::test]
396    async fn test_list_events() {
397        let ctx = TestContext::new().await;
398        let task_mgr = TaskManager::new(ctx.pool());
399        let event_mgr = EventManager::new(ctx.pool());
400
401        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
402
403        // Add multiple events
404        event_mgr
405            .add_event(task.id, "decision", "Decision 1")
406            .await
407            .unwrap();
408        event_mgr
409            .add_event(task.id, "blocker", "Blocker 1")
410            .await
411            .unwrap();
412        event_mgr
413            .add_event(task.id, "milestone", "Milestone 1")
414            .await
415            .unwrap();
416
417        let events = event_mgr
418            .list_events(Some(task.id), None, None, None)
419            .await
420            .unwrap();
421        assert_eq!(events.len(), 3);
422
423        // Events should be in reverse chronological order
424        assert_eq!(events[0].log_type, "milestone");
425        assert_eq!(events[1].log_type, "blocker");
426        assert_eq!(events[2].log_type, "decision");
427    }
428
429    #[tokio::test]
430    async fn test_list_events_with_limit() {
431        let ctx = TestContext::new().await;
432        let task_mgr = TaskManager::new(ctx.pool());
433        let event_mgr = EventManager::new(ctx.pool());
434
435        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
436
437        // Add 5 events
438        for i in 0..5 {
439            event_mgr
440                .add_event(task.id, "test", &format!("Event {}", i))
441                .await
442                .unwrap();
443        }
444
445        let events = event_mgr
446            .list_events(Some(task.id), Some(3), None, None)
447            .await
448            .unwrap();
449        assert_eq!(events.len(), 3);
450    }
451
452    #[tokio::test]
453    async fn test_list_events_nonexistent_task() {
454        let ctx = TestContext::new().await;
455        let event_mgr = EventManager::new(ctx.pool());
456
457        let result = event_mgr.list_events(Some(999), None, None, None).await;
458        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
459    }
460
461    #[tokio::test]
462    async fn test_list_events_empty() {
463        let ctx = TestContext::new().await;
464        let task_mgr = TaskManager::new(ctx.pool());
465        let event_mgr = EventManager::new(ctx.pool());
466
467        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
468
469        let events = event_mgr
470            .list_events(Some(task.id), None, None, None)
471            .await
472            .unwrap();
473        assert_eq!(events.len(), 0);
474    }
475
476    #[tokio::test]
477    async fn test_update_event() {
478        let ctx = TestContext::new().await;
479        let task_mgr = TaskManager::new(ctx.pool());
480        let event_mgr = EventManager::new(ctx.pool());
481
482        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
483        let event = event_mgr
484            .add_event(task.id, "decision", "Initial decision")
485            .await
486            .unwrap();
487
488        // Update event type and data
489        let updated = event_mgr
490            .update_event(event.id, Some("milestone"), Some("Updated decision"))
491            .await
492            .unwrap();
493
494        assert_eq!(updated.id, event.id);
495        assert_eq!(updated.task_id, task.id);
496        assert_eq!(updated.log_type, "milestone");
497        assert_eq!(updated.discussion_data, "Updated decision");
498    }
499
500    #[tokio::test]
501    async fn test_update_event_partial() {
502        let ctx = TestContext::new().await;
503        let task_mgr = TaskManager::new(ctx.pool());
504        let event_mgr = EventManager::new(ctx.pool());
505
506        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
507        let event = event_mgr
508            .add_event(task.id, "decision", "Initial decision")
509            .await
510            .unwrap();
511
512        // Update only discussion_data
513        let updated = event_mgr
514            .update_event(event.id, None, Some("Updated data only"))
515            .await
516            .unwrap();
517
518        assert_eq!(updated.log_type, "decision"); // Unchanged
519        assert_eq!(updated.discussion_data, "Updated data only");
520    }
521
522    #[tokio::test]
523    async fn test_update_event_nonexistent() {
524        let ctx = TestContext::new().await;
525        let event_mgr = EventManager::new(ctx.pool());
526
527        let result = event_mgr
528            .update_event(999, Some("decision"), Some("Test"))
529            .await;
530
531        assert!(result.is_err());
532        assert!(matches!(result, Err(IntentError::InvalidInput(_))));
533    }
534
535    #[tokio::test]
536    async fn test_delete_event() {
537        let ctx = TestContext::new().await;
538        let task_mgr = TaskManager::new(ctx.pool());
539        let event_mgr = EventManager::new(ctx.pool());
540
541        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
542        let event = event_mgr
543            .add_event(task.id, "decision", "To be deleted")
544            .await
545            .unwrap();
546
547        // Delete the event
548        event_mgr.delete_event(event.id).await.unwrap();
549
550        // Verify it's deleted
551        let events = event_mgr
552            .list_events(Some(task.id), None, None, None)
553            .await
554            .unwrap();
555        assert_eq!(events.len(), 0);
556    }
557
558    #[tokio::test]
559    async fn test_delete_event_nonexistent() {
560        let ctx = TestContext::new().await;
561        let event_mgr = EventManager::new(ctx.pool());
562
563        let result = event_mgr.delete_event(999).await;
564        assert!(result.is_err());
565        assert!(matches!(result, Err(IntentError::InvalidInput(_))));
566    }
567
568    #[tokio::test]
569    async fn test_list_events_filter_by_type() {
570        let ctx = TestContext::new().await;
571        let task_mgr = TaskManager::new(ctx.pool());
572        let event_mgr = EventManager::new(ctx.pool());
573
574        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
575
576        // Add events of different types
577        event_mgr
578            .add_event(task.id, "decision", "Decision 1")
579            .await
580            .unwrap();
581        event_mgr
582            .add_event(task.id, "blocker", "Blocker 1")
583            .await
584            .unwrap();
585        event_mgr
586            .add_event(task.id, "decision", "Decision 2")
587            .await
588            .unwrap();
589
590        // Filter by decision type
591        let events = event_mgr
592            .list_events(Some(task.id), None, Some("decision".to_string()), None)
593            .await
594            .unwrap();
595
596        assert_eq!(events.len(), 2);
597        assert!(events.iter().all(|e| e.log_type == "decision"));
598    }
599
600    #[tokio::test]
601    async fn test_list_events_global() {
602        let ctx = TestContext::new().await;
603        let task_mgr = TaskManager::new(ctx.pool());
604        let event_mgr = EventManager::new(ctx.pool());
605
606        let task1 = task_mgr.add_task("Task 1", None, None).await.unwrap();
607        let task2 = task_mgr.add_task("Task 2", None, None).await.unwrap();
608
609        // Add events to both tasks
610        event_mgr
611            .add_event(task1.id, "decision", "Task 1 Decision")
612            .await
613            .unwrap();
614        event_mgr
615            .add_event(task2.id, "decision", "Task 2 Decision")
616            .await
617            .unwrap();
618
619        // List all events globally (task_id = None)
620        let events = event_mgr.list_events(None, None, None, None).await.unwrap();
621
622        assert!(events.len() >= 2); // At least our 2 events
623        let task1_events: Vec<_> = events.iter().filter(|e| e.task_id == task1.id).collect();
624        let task2_events: Vec<_> = events.iter().filter(|e| e.task_id == task2.id).collect();
625
626        assert_eq!(task1_events.len(), 1);
627        assert_eq!(task2_events.len(), 1);
628    }
629}