intent_engine/
events.rs

1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8    pool: &'a SqlitePool,
9    notifier: crate::notifications::NotificationSender,
10    project_path: Option<String>,
11}
12
13impl<'a> EventManager<'a> {
14    pub fn new(pool: &'a SqlitePool) -> Self {
15        Self {
16            pool,
17            notifier: crate::notifications::NotificationSender::new(None, None),
18            project_path: None,
19        }
20    }
21
22    /// Create an EventManager with MCP notification support
23    pub fn with_mcp_notifier(
24        pool: &'a SqlitePool,
25        project_path: String,
26        mcp_notifier: tokio::sync::mpsc::UnboundedSender<String>,
27    ) -> Self {
28        Self {
29            pool,
30            notifier: crate::notifications::NotificationSender::new(None, Some(mcp_notifier)),
31            project_path: Some(project_path),
32        }
33    }
34
35    /// Create an EventManager with WebSocket notification support
36    pub fn with_websocket(
37        pool: &'a SqlitePool,
38        ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
39        project_path: String,
40    ) -> Self {
41        Self {
42            pool,
43            notifier: crate::notifications::NotificationSender::new(Some(ws_state), None),
44            project_path: Some(project_path),
45        }
46    }
47
48    /// Internal helper: Notify UI about event creation
49    async fn notify_event_created(&self, event: &Event) {
50        use crate::dashboard::websocket::DatabaseOperationPayload;
51
52        let Some(project_path) = &self.project_path else {
53            return;
54        };
55
56        let event_json = match serde_json::to_value(event) {
57            Ok(json) => json,
58            Err(e) => {
59                tracing::warn!("Failed to serialize event for notification: {}", e);
60                return;
61            },
62        };
63
64        let payload =
65            DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
66        self.notifier.send(payload).await;
67    }
68
69    /// Internal helper: Notify UI about event update
70    async fn notify_event_updated(&self, event: &Event) {
71        use crate::dashboard::websocket::DatabaseOperationPayload;
72
73        let Some(project_path) = &self.project_path else {
74            return;
75        };
76
77        let event_json = match serde_json::to_value(event) {
78            Ok(json) => json,
79            Err(e) => {
80                tracing::warn!("Failed to serialize event for notification: {}", e);
81                return;
82            },
83        };
84
85        let payload =
86            DatabaseOperationPayload::event_updated(event.id, event_json, project_path.clone());
87        self.notifier.send(payload).await;
88    }
89
90    /// Internal helper: Notify UI about event deletion
91    async fn notify_event_deleted(&self, event_id: i64) {
92        use crate::dashboard::websocket::DatabaseOperationPayload;
93
94        let Some(project_path) = &self.project_path else {
95            return;
96        };
97
98        let payload = DatabaseOperationPayload::event_deleted(event_id, project_path.clone());
99        self.notifier.send(payload).await;
100    }
101
102    /// Add a new event
103    pub async fn add_event(
104        &self,
105        task_id: i64,
106        log_type: &str,
107        discussion_data: &str,
108    ) -> Result<Event> {
109        // Check if task exists
110        let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
111            .bind(task_id)
112            .fetch_one(self.pool)
113            .await?;
114
115        if !task_exists {
116            return Err(IntentError::TaskNotFound(task_id));
117        }
118
119        let now = Utc::now();
120
121        let result = sqlx::query(
122            r#"
123            INSERT INTO events (task_id, log_type, discussion_data, timestamp)
124            VALUES (?, ?, ?, ?)
125            "#,
126        )
127        .bind(task_id)
128        .bind(log_type)
129        .bind(discussion_data)
130        .bind(now)
131        .execute(self.pool)
132        .await?;
133
134        let id = result.last_insert_rowid();
135
136        let event = Event {
137            id,
138            task_id,
139            timestamp: now,
140            log_type: log_type.to_string(),
141            discussion_data: discussion_data.to_string(),
142        };
143
144        // Notify WebSocket clients about the new event
145        self.notify_event_created(&event).await;
146
147        Ok(event)
148    }
149
150    /// Update an existing event
151    pub async fn update_event(
152        &self,
153        event_id: i64,
154        log_type: Option<&str>,
155        discussion_data: Option<&str>,
156    ) -> Result<Event> {
157        // First, get the existing event to check if it exists
158        let existing_event: Option<Event> =
159            sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
160                .bind(event_id)
161                .fetch_optional(self.pool)
162                .await?;
163
164        let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
165            "Event {} not found",
166            event_id
167        )))?;
168
169        // Update only the fields that are provided
170        let new_log_type = log_type.unwrap_or(&existing_event.log_type);
171        let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
172
173        sqlx::query(
174            r#"
175            UPDATE events
176            SET log_type = ?, discussion_data = ?
177            WHERE id = ?
178            "#,
179        )
180        .bind(new_log_type)
181        .bind(new_discussion_data)
182        .bind(event_id)
183        .execute(self.pool)
184        .await?;
185
186        let updated_event = Event {
187            id: existing_event.id,
188            task_id: existing_event.task_id,
189            timestamp: existing_event.timestamp,
190            log_type: new_log_type.to_string(),
191            discussion_data: new_discussion_data.to_string(),
192        };
193
194        // Notify WebSocket clients about the update
195        self.notify_event_updated(&updated_event).await;
196
197        Ok(updated_event)
198    }
199
200    /// Delete an event
201    pub async fn delete_event(&self, event_id: i64) -> Result<()> {
202        // First, get the event to check if it exists and get task_id for notification
203        let event: Option<Event> = sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
204            .bind(event_id)
205            .fetch_optional(self.pool)
206            .await?;
207
208        let _event = event.ok_or(IntentError::InvalidInput(format!(
209            "Event {} not found",
210            event_id
211        )))?;
212
213        // Delete from FTS index first (if it exists)
214        let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
215            .bind(event_id)
216            .execute(self.pool)
217            .await;
218
219        // Delete the event
220        sqlx::query("DELETE FROM events WHERE id = ?")
221            .bind(event_id)
222            .execute(self.pool)
223            .await?;
224
225        // Notify WebSocket clients about the deletion
226        self.notify_event_deleted(event_id).await;
227
228        Ok(())
229    }
230
231    /// List events for a task (or globally if task_id is None)
232    pub async fn list_events(
233        &self,
234        task_id: Option<i64>,
235        limit: Option<i64>,
236        log_type: Option<String>,
237        since: Option<String>,
238    ) -> Result<Vec<Event>> {
239        // Check if task exists (only if task_id provided)
240        if let Some(tid) = task_id {
241            let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
242                .bind(tid)
243                .fetch_one(self.pool)
244                .await?;
245
246            if !task_exists {
247                return Err(IntentError::TaskNotFound(tid));
248            }
249        }
250
251        let limit = limit.unwrap_or(50);
252
253        // Parse since duration if provided
254        let since_timestamp = if let Some(duration_str) = since {
255            Some(crate::time_utils::parse_duration(&duration_str)?)
256        } else {
257            None
258        };
259
260        // Build dynamic query based on filters
261        let mut query = String::from(crate::sql_constants::SELECT_EVENT_BASE);
262        let mut conditions = Vec::new();
263
264        if task_id.is_some() {
265            conditions.push("task_id = ?");
266        }
267
268        if log_type.is_some() {
269            conditions.push("log_type = ?");
270        }
271
272        if since_timestamp.is_some() {
273            conditions.push("timestamp >= ?");
274        }
275
276        if !conditions.is_empty() {
277            query.push_str(" AND ");
278            query.push_str(&conditions.join(" AND "));
279        }
280
281        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
282
283        // Build and execute query
284        let mut sql_query = sqlx::query_as::<_, Event>(&query);
285
286        if let Some(tid) = task_id {
287            sql_query = sql_query.bind(tid);
288        }
289
290        if let Some(ref typ) = log_type {
291            sql_query = sql_query.bind(typ);
292        }
293
294        if let Some(ts) = since_timestamp {
295            sql_query = sql_query.bind(ts);
296        }
297
298        sql_query = sql_query.bind(limit);
299
300        let events = sql_query.fetch_all(self.pool).await?;
301
302        Ok(events)
303    }
304
305    /// Search events using FTS5
306    pub async fn search_events_fts5(
307        &self,
308        query: &str,
309        limit: Option<i64>,
310    ) -> Result<Vec<EventSearchResult>> {
311        let limit = limit.unwrap_or(20);
312
313        // Use FTS5 to search events and get snippets
314        let results = sqlx::query(
315            r#"
316            SELECT
317                e.id,
318                e.task_id,
319                e.timestamp,
320                e.log_type,
321                e.discussion_data,
322                snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
323            FROM events_fts
324            INNER JOIN events e ON events_fts.rowid = e.id
325            WHERE events_fts MATCH ?
326            ORDER BY rank
327            LIMIT ?
328            "#,
329        )
330        .bind(query)
331        .bind(limit)
332        .fetch_all(self.pool)
333        .await?;
334
335        let mut search_results = Vec::new();
336        for row in results {
337            let event = Event {
338                id: row.get("id"),
339                task_id: row.get("task_id"),
340                timestamp: row.get("timestamp"),
341                log_type: row.get("log_type"),
342                discussion_data: row.get("discussion_data"),
343            };
344            let match_snippet: String = row.get("match_snippet");
345
346            search_results.push(EventSearchResult {
347                event,
348                match_snippet,
349            });
350        }
351
352        Ok(search_results)
353    }
354}
355
356/// Event search result with match snippet
357#[derive(Debug)]
358pub struct EventSearchResult {
359    pub event: Event,
360    pub match_snippet: String,
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366    use crate::tasks::TaskManager;
367    use crate::test_utils::test_helpers::TestContext;
368
369    #[tokio::test]
370    async fn test_add_event() {
371        let ctx = TestContext::new().await;
372        let task_mgr = TaskManager::new(ctx.pool());
373        let event_mgr = EventManager::new(ctx.pool());
374
375        let task = task_mgr
376            .add_task("Test task", None, None, None)
377            .await
378            .unwrap();
379        let event = event_mgr
380            .add_event(task.id, "decision", "Test decision")
381            .await
382            .unwrap();
383
384        assert_eq!(event.task_id, task.id);
385        assert_eq!(event.log_type, "decision");
386        assert_eq!(event.discussion_data, "Test decision");
387    }
388
389    #[tokio::test]
390    async fn test_add_event_nonexistent_task() {
391        let ctx = TestContext::new().await;
392        let event_mgr = EventManager::new(ctx.pool());
393
394        let result = event_mgr.add_event(999, "decision", "Test").await;
395        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
396    }
397
398    #[tokio::test]
399    async fn test_list_events() {
400        let ctx = TestContext::new().await;
401        let task_mgr = TaskManager::new(ctx.pool());
402        let event_mgr = EventManager::new(ctx.pool());
403
404        let task = task_mgr
405            .add_task("Test task", None, None, None)
406            .await
407            .unwrap();
408
409        // Add multiple events
410        event_mgr
411            .add_event(task.id, "decision", "Decision 1")
412            .await
413            .unwrap();
414        event_mgr
415            .add_event(task.id, "blocker", "Blocker 1")
416            .await
417            .unwrap();
418        event_mgr
419            .add_event(task.id, "milestone", "Milestone 1")
420            .await
421            .unwrap();
422
423        let events = event_mgr
424            .list_events(Some(task.id), None, None, None)
425            .await
426            .unwrap();
427        assert_eq!(events.len(), 3);
428
429        // Events should be in reverse chronological order
430        assert_eq!(events[0].log_type, "milestone");
431        assert_eq!(events[1].log_type, "blocker");
432        assert_eq!(events[2].log_type, "decision");
433    }
434
435    #[tokio::test]
436    async fn test_list_events_with_limit() {
437        let ctx = TestContext::new().await;
438        let task_mgr = TaskManager::new(ctx.pool());
439        let event_mgr = EventManager::new(ctx.pool());
440
441        let task = task_mgr
442            .add_task("Test task", None, None, None)
443            .await
444            .unwrap();
445
446        // Add 5 events
447        for i in 0..5 {
448            event_mgr
449                .add_event(task.id, "test", &format!("Event {}", i))
450                .await
451                .unwrap();
452        }
453
454        let events = event_mgr
455            .list_events(Some(task.id), Some(3), None, None)
456            .await
457            .unwrap();
458        assert_eq!(events.len(), 3);
459    }
460
461    #[tokio::test]
462    async fn test_list_events_nonexistent_task() {
463        let ctx = TestContext::new().await;
464        let event_mgr = EventManager::new(ctx.pool());
465
466        let result = event_mgr.list_events(Some(999), None, None, None).await;
467        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
468    }
469
470    #[tokio::test]
471    async fn test_list_events_empty() {
472        let ctx = TestContext::new().await;
473        let task_mgr = TaskManager::new(ctx.pool());
474        let event_mgr = EventManager::new(ctx.pool());
475
476        let task = task_mgr
477            .add_task("Test task", None, None, None)
478            .await
479            .unwrap();
480
481        let events = event_mgr
482            .list_events(Some(task.id), None, None, None)
483            .await
484            .unwrap();
485        assert_eq!(events.len(), 0);
486    }
487
488    #[tokio::test]
489    async fn test_update_event() {
490        let ctx = TestContext::new().await;
491        let task_mgr = TaskManager::new(ctx.pool());
492        let event_mgr = EventManager::new(ctx.pool());
493
494        let task = task_mgr
495            .add_task("Test task", None, None, None)
496            .await
497            .unwrap();
498        let event = event_mgr
499            .add_event(task.id, "decision", "Initial decision")
500            .await
501            .unwrap();
502
503        // Update event type and data
504        let updated = event_mgr
505            .update_event(event.id, Some("milestone"), Some("Updated decision"))
506            .await
507            .unwrap();
508
509        assert_eq!(updated.id, event.id);
510        assert_eq!(updated.task_id, task.id);
511        assert_eq!(updated.log_type, "milestone");
512        assert_eq!(updated.discussion_data, "Updated decision");
513    }
514
515    #[tokio::test]
516    async fn test_update_event_partial() {
517        let ctx = TestContext::new().await;
518        let task_mgr = TaskManager::new(ctx.pool());
519        let event_mgr = EventManager::new(ctx.pool());
520
521        let task = task_mgr
522            .add_task("Test task", None, None, None)
523            .await
524            .unwrap();
525        let event = event_mgr
526            .add_event(task.id, "decision", "Initial decision")
527            .await
528            .unwrap();
529
530        // Update only discussion_data
531        let updated = event_mgr
532            .update_event(event.id, None, Some("Updated data only"))
533            .await
534            .unwrap();
535
536        assert_eq!(updated.log_type, "decision"); // Unchanged
537        assert_eq!(updated.discussion_data, "Updated data only");
538    }
539
540    #[tokio::test]
541    async fn test_update_event_nonexistent() {
542        let ctx = TestContext::new().await;
543        let event_mgr = EventManager::new(ctx.pool());
544
545        let result = event_mgr
546            .update_event(999, Some("decision"), Some("Test"))
547            .await;
548
549        assert!(result.is_err());
550        assert!(matches!(result, Err(IntentError::InvalidInput(_))));
551    }
552
553    #[tokio::test]
554    async fn test_delete_event() {
555        let ctx = TestContext::new().await;
556        let task_mgr = TaskManager::new(ctx.pool());
557        let event_mgr = EventManager::new(ctx.pool());
558
559        let task = task_mgr
560            .add_task("Test task", None, None, None)
561            .await
562            .unwrap();
563        let event = event_mgr
564            .add_event(task.id, "decision", "To be deleted")
565            .await
566            .unwrap();
567
568        // Delete the event
569        event_mgr.delete_event(event.id).await.unwrap();
570
571        // Verify it's deleted
572        let events = event_mgr
573            .list_events(Some(task.id), None, None, None)
574            .await
575            .unwrap();
576        assert_eq!(events.len(), 0);
577    }
578
579    #[tokio::test]
580    async fn test_delete_event_nonexistent() {
581        let ctx = TestContext::new().await;
582        let event_mgr = EventManager::new(ctx.pool());
583
584        let result = event_mgr.delete_event(999).await;
585        assert!(result.is_err());
586        assert!(matches!(result, Err(IntentError::InvalidInput(_))));
587    }
588
589    #[tokio::test]
590    async fn test_list_events_filter_by_type() {
591        let ctx = TestContext::new().await;
592        let task_mgr = TaskManager::new(ctx.pool());
593        let event_mgr = EventManager::new(ctx.pool());
594
595        let task = task_mgr
596            .add_task("Test task", None, None, None)
597            .await
598            .unwrap();
599
600        // Add events of different types
601        event_mgr
602            .add_event(task.id, "decision", "Decision 1")
603            .await
604            .unwrap();
605        event_mgr
606            .add_event(task.id, "blocker", "Blocker 1")
607            .await
608            .unwrap();
609        event_mgr
610            .add_event(task.id, "decision", "Decision 2")
611            .await
612            .unwrap();
613
614        // Filter by decision type
615        let events = event_mgr
616            .list_events(Some(task.id), None, Some("decision".to_string()), None)
617            .await
618            .unwrap();
619
620        assert_eq!(events.len(), 2);
621        assert!(events.iter().all(|e| e.log_type == "decision"));
622    }
623
624    #[tokio::test]
625    async fn test_list_events_global() {
626        let ctx = TestContext::new().await;
627        let task_mgr = TaskManager::new(ctx.pool());
628        let event_mgr = EventManager::new(ctx.pool());
629
630        let task1 = task_mgr.add_task("Task 1", None, None, None).await.unwrap();
631        let task2 = task_mgr.add_task("Task 2", None, None, None).await.unwrap();
632
633        // Add events to both tasks
634        event_mgr
635            .add_event(task1.id, "decision", "Task 1 Decision")
636            .await
637            .unwrap();
638        event_mgr
639            .add_event(task2.id, "decision", "Task 2 Decision")
640            .await
641            .unwrap();
642
643        // List all events globally (task_id = None)
644        let events = event_mgr.list_events(None, None, None, None).await.unwrap();
645
646        assert!(events.len() >= 2); // At least our 2 events
647        let task1_events: Vec<_> = events.iter().filter(|e| e.task_id == task1.id).collect();
648        let task2_events: Vec<_> = events.iter().filter(|e| e.task_id == task2.id).collect();
649
650        assert_eq!(task1_events.len(), 1);
651        assert_eq!(task2_events.len(), 1);
652    }
653}