1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8 pool: &'a SqlitePool,
9 notifier: crate::notifications::NotificationSender,
10 project_path: Option<String>,
11}
12
13impl<'a> EventManager<'a> {
14 pub fn new(pool: &'a SqlitePool) -> Self {
15 Self {
16 pool,
17 notifier: crate::notifications::NotificationSender::new(None, None),
18 project_path: None,
19 }
20 }
21
22 pub fn with_mcp_notifier(
24 pool: &'a SqlitePool,
25 project_path: String,
26 mcp_notifier: tokio::sync::mpsc::UnboundedSender<String>,
27 ) -> Self {
28 Self {
29 pool,
30 notifier: crate::notifications::NotificationSender::new(None, Some(mcp_notifier)),
31 project_path: Some(project_path),
32 }
33 }
34
35 pub fn with_websocket(
37 pool: &'a SqlitePool,
38 ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
39 project_path: String,
40 ) -> Self {
41 Self {
42 pool,
43 notifier: crate::notifications::NotificationSender::new(Some(ws_state), None),
44 project_path: Some(project_path),
45 }
46 }
47
48 async fn notify_event_created(&self, event: &Event) {
50 use crate::dashboard::websocket::DatabaseOperationPayload;
51
52 let Some(project_path) = &self.project_path else {
53 return;
54 };
55
56 let event_json = match serde_json::to_value(event) {
57 Ok(json) => json,
58 Err(e) => {
59 tracing::warn!("Failed to serialize event for notification: {}", e);
60 return;
61 },
62 };
63
64 let payload =
65 DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
66 self.notifier.send(payload).await;
67 }
68
69 async fn notify_event_updated(&self, event: &Event) {
71 use crate::dashboard::websocket::DatabaseOperationPayload;
72
73 let Some(project_path) = &self.project_path else {
74 return;
75 };
76
77 let event_json = match serde_json::to_value(event) {
78 Ok(json) => json,
79 Err(e) => {
80 tracing::warn!("Failed to serialize event for notification: {}", e);
81 return;
82 },
83 };
84
85 let payload =
86 DatabaseOperationPayload::event_updated(event.id, event_json, project_path.clone());
87 self.notifier.send(payload).await;
88 }
89
90 async fn notify_event_deleted(&self, event_id: i64) {
92 use crate::dashboard::websocket::DatabaseOperationPayload;
93
94 let Some(project_path) = &self.project_path else {
95 return;
96 };
97
98 let payload = DatabaseOperationPayload::event_deleted(event_id, project_path.clone());
99 self.notifier.send(payload).await;
100 }
101
102 pub async fn add_event(
104 &self,
105 task_id: i64,
106 log_type: &str,
107 discussion_data: &str,
108 ) -> Result<Event> {
109 let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
111 .bind(task_id)
112 .fetch_one(self.pool)
113 .await?;
114
115 if !task_exists {
116 return Err(IntentError::TaskNotFound(task_id));
117 }
118
119 let now = Utc::now();
120
121 let result = sqlx::query(
122 r#"
123 INSERT INTO events (task_id, log_type, discussion_data, timestamp)
124 VALUES (?, ?, ?, ?)
125 "#,
126 )
127 .bind(task_id)
128 .bind(log_type)
129 .bind(discussion_data)
130 .bind(now)
131 .execute(self.pool)
132 .await?;
133
134 let id = result.last_insert_rowid();
135
136 let event = Event {
137 id,
138 task_id,
139 timestamp: now,
140 log_type: log_type.to_string(),
141 discussion_data: discussion_data.to_string(),
142 };
143
144 self.notify_event_created(&event).await;
146
147 Ok(event)
148 }
149
150 pub async fn update_event(
152 &self,
153 event_id: i64,
154 log_type: Option<&str>,
155 discussion_data: Option<&str>,
156 ) -> Result<Event> {
157 let existing_event: Option<Event> =
159 sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
160 .bind(event_id)
161 .fetch_optional(self.pool)
162 .await?;
163
164 let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
165 "Event {} not found",
166 event_id
167 )))?;
168
169 let new_log_type = log_type.unwrap_or(&existing_event.log_type);
171 let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
172
173 sqlx::query(
174 r#"
175 UPDATE events
176 SET log_type = ?, discussion_data = ?
177 WHERE id = ?
178 "#,
179 )
180 .bind(new_log_type)
181 .bind(new_discussion_data)
182 .bind(event_id)
183 .execute(self.pool)
184 .await?;
185
186 let updated_event = Event {
187 id: existing_event.id,
188 task_id: existing_event.task_id,
189 timestamp: existing_event.timestamp,
190 log_type: new_log_type.to_string(),
191 discussion_data: new_discussion_data.to_string(),
192 };
193
194 self.notify_event_updated(&updated_event).await;
196
197 Ok(updated_event)
198 }
199
200 pub async fn delete_event(&self, event_id: i64) -> Result<()> {
202 let event: Option<Event> = sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
204 .bind(event_id)
205 .fetch_optional(self.pool)
206 .await?;
207
208 let _event = event.ok_or(IntentError::InvalidInput(format!(
209 "Event {} not found",
210 event_id
211 )))?;
212
213 let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
215 .bind(event_id)
216 .execute(self.pool)
217 .await;
218
219 sqlx::query("DELETE FROM events WHERE id = ?")
221 .bind(event_id)
222 .execute(self.pool)
223 .await?;
224
225 self.notify_event_deleted(event_id).await;
227
228 Ok(())
229 }
230
231 pub async fn list_events(
233 &self,
234 task_id: Option<i64>,
235 limit: Option<i64>,
236 log_type: Option<String>,
237 since: Option<String>,
238 ) -> Result<Vec<Event>> {
239 if let Some(tid) = task_id {
241 let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
242 .bind(tid)
243 .fetch_one(self.pool)
244 .await?;
245
246 if !task_exists {
247 return Err(IntentError::TaskNotFound(tid));
248 }
249 }
250
251 let limit = limit.unwrap_or(50);
252
253 let since_timestamp = if let Some(duration_str) = since {
255 Some(crate::time_utils::parse_duration(&duration_str)?)
256 } else {
257 None
258 };
259
260 let mut query = String::from(crate::sql_constants::SELECT_EVENT_BASE);
262 let mut conditions = Vec::new();
263
264 if task_id.is_some() {
265 conditions.push("task_id = ?");
266 }
267
268 if log_type.is_some() {
269 conditions.push("log_type = ?");
270 }
271
272 if since_timestamp.is_some() {
273 conditions.push("timestamp >= ?");
274 }
275
276 if !conditions.is_empty() {
277 query.push_str(" AND ");
278 query.push_str(&conditions.join(" AND "));
279 }
280
281 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
282
283 let mut sql_query = sqlx::query_as::<_, Event>(&query);
285
286 if let Some(tid) = task_id {
287 sql_query = sql_query.bind(tid);
288 }
289
290 if let Some(ref typ) = log_type {
291 sql_query = sql_query.bind(typ);
292 }
293
294 if let Some(ts) = since_timestamp {
295 sql_query = sql_query.bind(ts);
296 }
297
298 sql_query = sql_query.bind(limit);
299
300 let events = sql_query.fetch_all(self.pool).await?;
301
302 Ok(events)
303 }
304
305 pub async fn search_events_fts5(
307 &self,
308 query: &str,
309 limit: Option<i64>,
310 ) -> Result<Vec<EventSearchResult>> {
311 let limit = limit.unwrap_or(20);
312
313 let results = sqlx::query(
315 r#"
316 SELECT
317 e.id,
318 e.task_id,
319 e.timestamp,
320 e.log_type,
321 e.discussion_data,
322 snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
323 FROM events_fts
324 INNER JOIN events e ON events_fts.rowid = e.id
325 WHERE events_fts MATCH ?
326 ORDER BY rank
327 LIMIT ?
328 "#,
329 )
330 .bind(query)
331 .bind(limit)
332 .fetch_all(self.pool)
333 .await?;
334
335 let mut search_results = Vec::new();
336 for row in results {
337 let event = Event {
338 id: row.get("id"),
339 task_id: row.get("task_id"),
340 timestamp: row.get("timestamp"),
341 log_type: row.get("log_type"),
342 discussion_data: row.get("discussion_data"),
343 };
344 let match_snippet: String = row.get("match_snippet");
345
346 search_results.push(EventSearchResult {
347 event,
348 match_snippet,
349 });
350 }
351
352 Ok(search_results)
353 }
354}
355
356#[derive(Debug)]
358pub struct EventSearchResult {
359 pub event: Event,
360 pub match_snippet: String,
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366 use crate::tasks::TaskManager;
367 use crate::test_utils::test_helpers::TestContext;
368
369 #[tokio::test]
370 async fn test_add_event() {
371 let ctx = TestContext::new().await;
372 let task_mgr = TaskManager::new(ctx.pool());
373 let event_mgr = EventManager::new(ctx.pool());
374
375 let task = task_mgr
376 .add_task("Test task", None, None, None)
377 .await
378 .unwrap();
379 let event = event_mgr
380 .add_event(task.id, "decision", "Test decision")
381 .await
382 .unwrap();
383
384 assert_eq!(event.task_id, task.id);
385 assert_eq!(event.log_type, "decision");
386 assert_eq!(event.discussion_data, "Test decision");
387 }
388
389 #[tokio::test]
390 async fn test_add_event_nonexistent_task() {
391 let ctx = TestContext::new().await;
392 let event_mgr = EventManager::new(ctx.pool());
393
394 let result = event_mgr.add_event(999, "decision", "Test").await;
395 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
396 }
397
398 #[tokio::test]
399 async fn test_list_events() {
400 let ctx = TestContext::new().await;
401 let task_mgr = TaskManager::new(ctx.pool());
402 let event_mgr = EventManager::new(ctx.pool());
403
404 let task = task_mgr
405 .add_task("Test task", None, None, None)
406 .await
407 .unwrap();
408
409 event_mgr
411 .add_event(task.id, "decision", "Decision 1")
412 .await
413 .unwrap();
414 event_mgr
415 .add_event(task.id, "blocker", "Blocker 1")
416 .await
417 .unwrap();
418 event_mgr
419 .add_event(task.id, "milestone", "Milestone 1")
420 .await
421 .unwrap();
422
423 let events = event_mgr
424 .list_events(Some(task.id), None, None, None)
425 .await
426 .unwrap();
427 assert_eq!(events.len(), 3);
428
429 assert_eq!(events[0].log_type, "milestone");
431 assert_eq!(events[1].log_type, "blocker");
432 assert_eq!(events[2].log_type, "decision");
433 }
434
435 #[tokio::test]
436 async fn test_list_events_with_limit() {
437 let ctx = TestContext::new().await;
438 let task_mgr = TaskManager::new(ctx.pool());
439 let event_mgr = EventManager::new(ctx.pool());
440
441 let task = task_mgr
442 .add_task("Test task", None, None, None)
443 .await
444 .unwrap();
445
446 for i in 0..5 {
448 event_mgr
449 .add_event(task.id, "test", &format!("Event {}", i))
450 .await
451 .unwrap();
452 }
453
454 let events = event_mgr
455 .list_events(Some(task.id), Some(3), None, None)
456 .await
457 .unwrap();
458 assert_eq!(events.len(), 3);
459 }
460
461 #[tokio::test]
462 async fn test_list_events_nonexistent_task() {
463 let ctx = TestContext::new().await;
464 let event_mgr = EventManager::new(ctx.pool());
465
466 let result = event_mgr.list_events(Some(999), None, None, None).await;
467 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
468 }
469
470 #[tokio::test]
471 async fn test_list_events_empty() {
472 let ctx = TestContext::new().await;
473 let task_mgr = TaskManager::new(ctx.pool());
474 let event_mgr = EventManager::new(ctx.pool());
475
476 let task = task_mgr
477 .add_task("Test task", None, None, None)
478 .await
479 .unwrap();
480
481 let events = event_mgr
482 .list_events(Some(task.id), None, None, None)
483 .await
484 .unwrap();
485 assert_eq!(events.len(), 0);
486 }
487
488 #[tokio::test]
489 async fn test_update_event() {
490 let ctx = TestContext::new().await;
491 let task_mgr = TaskManager::new(ctx.pool());
492 let event_mgr = EventManager::new(ctx.pool());
493
494 let task = task_mgr
495 .add_task("Test task", None, None, None)
496 .await
497 .unwrap();
498 let event = event_mgr
499 .add_event(task.id, "decision", "Initial decision")
500 .await
501 .unwrap();
502
503 let updated = event_mgr
505 .update_event(event.id, Some("milestone"), Some("Updated decision"))
506 .await
507 .unwrap();
508
509 assert_eq!(updated.id, event.id);
510 assert_eq!(updated.task_id, task.id);
511 assert_eq!(updated.log_type, "milestone");
512 assert_eq!(updated.discussion_data, "Updated decision");
513 }
514
515 #[tokio::test]
516 async fn test_update_event_partial() {
517 let ctx = TestContext::new().await;
518 let task_mgr = TaskManager::new(ctx.pool());
519 let event_mgr = EventManager::new(ctx.pool());
520
521 let task = task_mgr
522 .add_task("Test task", None, None, None)
523 .await
524 .unwrap();
525 let event = event_mgr
526 .add_event(task.id, "decision", "Initial decision")
527 .await
528 .unwrap();
529
530 let updated = event_mgr
532 .update_event(event.id, None, Some("Updated data only"))
533 .await
534 .unwrap();
535
536 assert_eq!(updated.log_type, "decision"); assert_eq!(updated.discussion_data, "Updated data only");
538 }
539
540 #[tokio::test]
541 async fn test_update_event_nonexistent() {
542 let ctx = TestContext::new().await;
543 let event_mgr = EventManager::new(ctx.pool());
544
545 let result = event_mgr
546 .update_event(999, Some("decision"), Some("Test"))
547 .await;
548
549 assert!(result.is_err());
550 assert!(matches!(result, Err(IntentError::InvalidInput(_))));
551 }
552
553 #[tokio::test]
554 async fn test_delete_event() {
555 let ctx = TestContext::new().await;
556 let task_mgr = TaskManager::new(ctx.pool());
557 let event_mgr = EventManager::new(ctx.pool());
558
559 let task = task_mgr
560 .add_task("Test task", None, None, None)
561 .await
562 .unwrap();
563 let event = event_mgr
564 .add_event(task.id, "decision", "To be deleted")
565 .await
566 .unwrap();
567
568 event_mgr.delete_event(event.id).await.unwrap();
570
571 let events = event_mgr
573 .list_events(Some(task.id), None, None, None)
574 .await
575 .unwrap();
576 assert_eq!(events.len(), 0);
577 }
578
579 #[tokio::test]
580 async fn test_delete_event_nonexistent() {
581 let ctx = TestContext::new().await;
582 let event_mgr = EventManager::new(ctx.pool());
583
584 let result = event_mgr.delete_event(999).await;
585 assert!(result.is_err());
586 assert!(matches!(result, Err(IntentError::InvalidInput(_))));
587 }
588
589 #[tokio::test]
590 async fn test_list_events_filter_by_type() {
591 let ctx = TestContext::new().await;
592 let task_mgr = TaskManager::new(ctx.pool());
593 let event_mgr = EventManager::new(ctx.pool());
594
595 let task = task_mgr
596 .add_task("Test task", None, None, None)
597 .await
598 .unwrap();
599
600 event_mgr
602 .add_event(task.id, "decision", "Decision 1")
603 .await
604 .unwrap();
605 event_mgr
606 .add_event(task.id, "blocker", "Blocker 1")
607 .await
608 .unwrap();
609 event_mgr
610 .add_event(task.id, "decision", "Decision 2")
611 .await
612 .unwrap();
613
614 let events = event_mgr
616 .list_events(Some(task.id), None, Some("decision".to_string()), None)
617 .await
618 .unwrap();
619
620 assert_eq!(events.len(), 2);
621 assert!(events.iter().all(|e| e.log_type == "decision"));
622 }
623
624 #[tokio::test]
625 async fn test_list_events_global() {
626 let ctx = TestContext::new().await;
627 let task_mgr = TaskManager::new(ctx.pool());
628 let event_mgr = EventManager::new(ctx.pool());
629
630 let task1 = task_mgr.add_task("Task 1", None, None, None).await.unwrap();
631 let task2 = task_mgr.add_task("Task 2", None, None, None).await.unwrap();
632
633 event_mgr
635 .add_event(task1.id, "decision", "Task 1 Decision")
636 .await
637 .unwrap();
638 event_mgr
639 .add_event(task2.id, "decision", "Task 2 Decision")
640 .await
641 .unwrap();
642
643 let events = event_mgr.list_events(None, None, None, None).await.unwrap();
645
646 assert!(events.len() >= 2); let task1_events: Vec<_> = events.iter().filter(|e| e.task_id == task1.id).collect();
648 let task2_events: Vec<_> = events.iter().filter(|e| e.task_id == task2.id).collect();
649
650 assert_eq!(task1_events.len(), 1);
651 assert_eq!(task2_events.len(), 1);
652 }
653}