intent_engine/
events.rs

1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8    pool: &'a SqlitePool,
9    ws_state: Option<Arc<crate::dashboard::websocket::WebSocketState>>,
10    project_path: Option<String>,
11    mcp_notifier: Option<tokio::sync::mpsc::UnboundedSender<String>>,
12}
13
14impl<'a> EventManager<'a> {
15    pub fn new(pool: &'a SqlitePool) -> Self {
16        Self {
17            pool,
18            ws_state: None,
19            project_path: None,
20            mcp_notifier: None,
21        }
22    }
23
24    /// Create an EventManager with MCP notification support
25    pub fn with_mcp_notifier(
26        pool: &'a SqlitePool,
27        project_path: String,
28        mcp_notifier: tokio::sync::mpsc::UnboundedSender<String>,
29    ) -> Self {
30        Self {
31            pool,
32            ws_state: None,
33            project_path: Some(project_path),
34            mcp_notifier: Some(mcp_notifier),
35        }
36    }
37
38    /// Create an EventManager with WebSocket notification support
39    pub fn with_websocket(
40        pool: &'a SqlitePool,
41        ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
42        project_path: String,
43    ) -> Self {
44        Self {
45            pool,
46            ws_state: Some(ws_state),
47            project_path: Some(project_path),
48            mcp_notifier: None,
49        }
50    }
51
52    /// Internal helper: Notify UI about event creation
53    async fn notify_event_created(&self, event: &Event) {
54        use crate::dashboard::websocket::{DatabaseOperationPayload, ProtocolMessage};
55
56        // Prepare notification payload
57        let event_json = match serde_json::to_value(event) {
58            Ok(json) => json,
59            Err(e) => {
60                tracing::warn!("Failed to serialize event for notification: {}", e);
61                return;
62            },
63        };
64
65        let project_path = match &self.project_path {
66            Some(path) => path.clone(),
67            None => return, // No project path configured
68        };
69
70        let payload =
71            DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
72        let msg = ProtocolMessage::new("db_operation", payload);
73        let json = match msg.to_json() {
74            Ok(j) => j,
75            Err(e) => {
76                tracing::warn!("Failed to serialize notification message: {}", e);
77                return;
78            },
79        };
80
81        // Send via Dashboard WebSocket (if available)
82        if let Some(ws) = &self.ws_state {
83            ws.broadcast_to_ui(&json).await;
84        }
85
86        // Send via MCP WebSocket (if available) - non-blocking
87        if let Some(notifier) = &self.mcp_notifier {
88            if let Err(e) = notifier.send(json) {
89                tracing::debug!("Failed to send MCP notification (channel closed): {}", e);
90            }
91        }
92    }
93
94    /// Internal helper: Notify UI about event update
95    async fn notify_event_updated(&self, event: &Event) {
96        if let (Some(ws), Some(path)) = (&self.ws_state, &self.project_path) {
97            use crate::dashboard::websocket::{DatabaseOperationPayload, ProtocolMessage};
98
99            if let Ok(event_json) = serde_json::to_value(event) {
100                let payload =
101                    DatabaseOperationPayload::event_updated(event.id, event_json, path.clone());
102                let msg = ProtocolMessage::new("event_updated", payload);
103                if let Ok(json) = msg.to_json() {
104                    ws.broadcast_to_ui(&json).await;
105                }
106            }
107        }
108    }
109
110    /// Internal helper: Notify UI about event deletion
111    async fn notify_event_deleted(&self, event_id: i64) {
112        if let (Some(ws), Some(path)) = (&self.ws_state, &self.project_path) {
113            use crate::dashboard::websocket::{DatabaseOperationPayload, ProtocolMessage};
114
115            let payload = DatabaseOperationPayload::event_deleted(event_id, path.clone());
116            let msg = ProtocolMessage::new("event_deleted", payload);
117            if let Ok(json) = msg.to_json() {
118                ws.broadcast_to_ui(&json).await;
119            }
120        }
121    }
122
123    /// Add a new event
124    pub async fn add_event(
125        &self,
126        task_id: i64,
127        log_type: &str,
128        discussion_data: &str,
129    ) -> Result<Event> {
130        // Check if task exists
131        let task_exists: bool =
132            sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?)")
133                .bind(task_id)
134                .fetch_one(self.pool)
135                .await?;
136
137        if !task_exists {
138            return Err(IntentError::TaskNotFound(task_id));
139        }
140
141        let now = Utc::now();
142
143        let result = sqlx::query(
144            r#"
145            INSERT INTO events (task_id, log_type, discussion_data, timestamp)
146            VALUES (?, ?, ?, ?)
147            "#,
148        )
149        .bind(task_id)
150        .bind(log_type)
151        .bind(discussion_data)
152        .bind(now)
153        .execute(self.pool)
154        .await?;
155
156        let id = result.last_insert_rowid();
157
158        let event = Event {
159            id,
160            task_id,
161            timestamp: now,
162            log_type: log_type.to_string(),
163            discussion_data: discussion_data.to_string(),
164        };
165
166        // Notify WebSocket clients about the new event
167        self.notify_event_created(&event).await;
168
169        Ok(event)
170    }
171
172    /// Update an existing event
173    pub async fn update_event(
174        &self,
175        event_id: i64,
176        log_type: Option<&str>,
177        discussion_data: Option<&str>,
178    ) -> Result<Event> {
179        // First, get the existing event to check if it exists
180        let existing_event: Option<Event> = sqlx::query_as(
181            "SELECT id, task_id, timestamp, log_type, discussion_data FROM events WHERE id = ?",
182        )
183        .bind(event_id)
184        .fetch_optional(self.pool)
185        .await?;
186
187        let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
188            "Event {} not found",
189            event_id
190        )))?;
191
192        // Update only the fields that are provided
193        let new_log_type = log_type.unwrap_or(&existing_event.log_type);
194        let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
195
196        sqlx::query(
197            r#"
198            UPDATE events
199            SET log_type = ?, discussion_data = ?
200            WHERE id = ?
201            "#,
202        )
203        .bind(new_log_type)
204        .bind(new_discussion_data)
205        .bind(event_id)
206        .execute(self.pool)
207        .await?;
208
209        let updated_event = Event {
210            id: existing_event.id,
211            task_id: existing_event.task_id,
212            timestamp: existing_event.timestamp,
213            log_type: new_log_type.to_string(),
214            discussion_data: new_discussion_data.to_string(),
215        };
216
217        // Notify WebSocket clients about the update
218        self.notify_event_updated(&updated_event).await;
219
220        Ok(updated_event)
221    }
222
223    /// Delete an event
224    pub async fn delete_event(&self, event_id: i64) -> Result<()> {
225        // First, get the event to check if it exists and get task_id for notification
226        let event: Option<Event> = sqlx::query_as(
227            "SELECT id, task_id, timestamp, log_type, discussion_data FROM events WHERE id = ?",
228        )
229        .bind(event_id)
230        .fetch_optional(self.pool)
231        .await?;
232
233        let _event = event.ok_or(IntentError::InvalidInput(format!(
234            "Event {} not found",
235            event_id
236        )))?;
237
238        // Delete from FTS index first (if it exists)
239        let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
240            .bind(event_id)
241            .execute(self.pool)
242            .await;
243
244        // Delete the event
245        sqlx::query("DELETE FROM events WHERE id = ?")
246            .bind(event_id)
247            .execute(self.pool)
248            .await?;
249
250        // Notify WebSocket clients about the deletion
251        self.notify_event_deleted(event_id).await;
252
253        Ok(())
254    }
255
256    /// List events for a task (or globally if task_id is None)
257    pub async fn list_events(
258        &self,
259        task_id: Option<i64>,
260        limit: Option<i64>,
261        log_type: Option<String>,
262        since: Option<String>,
263    ) -> Result<Vec<Event>> {
264        // Check if task exists (only if task_id provided)
265        if let Some(tid) = task_id {
266            let task_exists: bool =
267                sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?)")
268                    .bind(tid)
269                    .fetch_one(self.pool)
270                    .await?;
271
272            if !task_exists {
273                return Err(IntentError::TaskNotFound(tid));
274            }
275        }
276
277        let limit = limit.unwrap_or(50);
278
279        // Parse since duration if provided
280        let since_timestamp = if let Some(duration_str) = since {
281            Some(Self::parse_duration(&duration_str)?)
282        } else {
283            None
284        };
285
286        // Build dynamic query based on filters
287        let mut query = String::from(
288            "SELECT id, task_id, timestamp, log_type, discussion_data FROM events WHERE 1=1",
289        );
290        let mut conditions = Vec::new();
291
292        if task_id.is_some() {
293            conditions.push("task_id = ?");
294        }
295
296        if log_type.is_some() {
297            conditions.push("log_type = ?");
298        }
299
300        if since_timestamp.is_some() {
301            conditions.push("timestamp >= ?");
302        }
303
304        if !conditions.is_empty() {
305            query.push_str(" AND ");
306            query.push_str(&conditions.join(" AND "));
307        }
308
309        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
310
311        // Build and execute query
312        let mut sql_query = sqlx::query_as::<_, Event>(&query);
313
314        if let Some(tid) = task_id {
315            sql_query = sql_query.bind(tid);
316        }
317
318        if let Some(ref typ) = log_type {
319            sql_query = sql_query.bind(typ);
320        }
321
322        if let Some(ts) = since_timestamp {
323            sql_query = sql_query.bind(ts);
324        }
325
326        sql_query = sql_query.bind(limit);
327
328        let events = sql_query.fetch_all(self.pool).await?;
329
330        Ok(events)
331    }
332
333    /// Parse duration string (e.g., "7d", "24h", "30m") into a DateTime
334    fn parse_duration(duration: &str) -> Result<chrono::DateTime<Utc>> {
335        let len = duration.len();
336        if len < 2 {
337            return Err(IntentError::InvalidInput(
338                "Duration must be in format like '7d', '24h', or '30m'".to_string(),
339            ));
340        }
341
342        let (num_str, unit) = duration.split_at(len - 1);
343        let num: i64 = num_str.parse().map_err(|_| {
344            IntentError::InvalidInput(format!("Invalid number in duration: {}", num_str))
345        })?;
346
347        let now = Utc::now();
348        let result = match unit {
349            "d" => now - chrono::Duration::days(num),
350            "h" => now - chrono::Duration::hours(num),
351            "m" => now - chrono::Duration::minutes(num),
352            "s" => now - chrono::Duration::seconds(num),
353            _ => {
354                return Err(IntentError::InvalidInput(format!(
355                    "Invalid duration unit '{}'. Use 'd' (days), 'h' (hours), 'm' (minutes), or 's' (seconds)",
356                    unit
357                )))
358            }
359        };
360
361        Ok(result)
362    }
363
364    /// Search events using FTS5
365    pub async fn search_events_fts5(
366        &self,
367        query: &str,
368        limit: Option<i64>,
369    ) -> Result<Vec<EventSearchResult>> {
370        let limit = limit.unwrap_or(20);
371
372        // Use FTS5 to search events and get snippets
373        let results = sqlx::query(
374            r#"
375            SELECT
376                e.id,
377                e.task_id,
378                e.timestamp,
379                e.log_type,
380                e.discussion_data,
381                snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
382            FROM events_fts
383            INNER JOIN events e ON events_fts.rowid = e.id
384            WHERE events_fts MATCH ?
385            ORDER BY rank
386            LIMIT ?
387            "#,
388        )
389        .bind(query)
390        .bind(limit)
391        .fetch_all(self.pool)
392        .await?;
393
394        let mut search_results = Vec::new();
395        for row in results {
396            let event = Event {
397                id: row.get("id"),
398                task_id: row.get("task_id"),
399                timestamp: row.get("timestamp"),
400                log_type: row.get("log_type"),
401                discussion_data: row.get("discussion_data"),
402            };
403            let match_snippet: String = row.get("match_snippet");
404
405            search_results.push(EventSearchResult {
406                event,
407                match_snippet,
408            });
409        }
410
411        Ok(search_results)
412    }
413}
414
415/// Event search result with match snippet
416#[derive(Debug)]
417pub struct EventSearchResult {
418    pub event: Event,
419    pub match_snippet: String,
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425    use crate::tasks::TaskManager;
426    use crate::test_utils::test_helpers::TestContext;
427
428    #[tokio::test]
429    async fn test_add_event() {
430        let ctx = TestContext::new().await;
431        let task_mgr = TaskManager::new(ctx.pool());
432        let event_mgr = EventManager::new(ctx.pool());
433
434        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
435        let event = event_mgr
436            .add_event(task.id, "decision", "Test decision")
437            .await
438            .unwrap();
439
440        assert_eq!(event.task_id, task.id);
441        assert_eq!(event.log_type, "decision");
442        assert_eq!(event.discussion_data, "Test decision");
443    }
444
445    #[tokio::test]
446    async fn test_add_event_nonexistent_task() {
447        let ctx = TestContext::new().await;
448        let event_mgr = EventManager::new(ctx.pool());
449
450        let result = event_mgr.add_event(999, "decision", "Test").await;
451        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
452    }
453
454    #[tokio::test]
455    async fn test_list_events() {
456        let ctx = TestContext::new().await;
457        let task_mgr = TaskManager::new(ctx.pool());
458        let event_mgr = EventManager::new(ctx.pool());
459
460        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
461
462        // Add multiple events
463        event_mgr
464            .add_event(task.id, "decision", "Decision 1")
465            .await
466            .unwrap();
467        event_mgr
468            .add_event(task.id, "blocker", "Blocker 1")
469            .await
470            .unwrap();
471        event_mgr
472            .add_event(task.id, "milestone", "Milestone 1")
473            .await
474            .unwrap();
475
476        let events = event_mgr
477            .list_events(Some(task.id), None, None, None)
478            .await
479            .unwrap();
480        assert_eq!(events.len(), 3);
481
482        // Events should be in reverse chronological order
483        assert_eq!(events[0].log_type, "milestone");
484        assert_eq!(events[1].log_type, "blocker");
485        assert_eq!(events[2].log_type, "decision");
486    }
487
488    #[tokio::test]
489    async fn test_list_events_with_limit() {
490        let ctx = TestContext::new().await;
491        let task_mgr = TaskManager::new(ctx.pool());
492        let event_mgr = EventManager::new(ctx.pool());
493
494        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
495
496        // Add 5 events
497        for i in 0..5 {
498            event_mgr
499                .add_event(task.id, "test", &format!("Event {}", i))
500                .await
501                .unwrap();
502        }
503
504        let events = event_mgr
505            .list_events(Some(task.id), Some(3), None, None)
506            .await
507            .unwrap();
508        assert_eq!(events.len(), 3);
509    }
510
511    #[tokio::test]
512    async fn test_list_events_nonexistent_task() {
513        let ctx = TestContext::new().await;
514        let event_mgr = EventManager::new(ctx.pool());
515
516        let result = event_mgr.list_events(Some(999), None, None, None).await;
517        assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
518    }
519
520    #[tokio::test]
521    async fn test_list_events_empty() {
522        let ctx = TestContext::new().await;
523        let task_mgr = TaskManager::new(ctx.pool());
524        let event_mgr = EventManager::new(ctx.pool());
525
526        let task = task_mgr.add_task("Test task", None, None).await.unwrap();
527
528        let events = event_mgr
529            .list_events(Some(task.id), None, None, None)
530            .await
531            .unwrap();
532        assert_eq!(events.len(), 0);
533    }
534}